kafka-dance-api/app.js

218 lines
5.9 KiB
JavaScript

const { Kafka } = require('kafkajs');
const { readConfig, storeConfig } = require('./config')
const express = require('express');
const cors = require('cors')
const ws = require('ws')
const path = require('path');
const cookieParser = require('cookie-parser');
const logger = require('morgan');
const query = require('./query')
const { buildEscape } = require('./safely-exit')
const config = readConfig()
console.log('CONFIG', config)
/**
* Log out an object and return it.
*
* @param object The object to log
* @param f Optional param to specify an alternative log function. E.g. console.error
* @returns {*}
*/
const c = (object, f = console.log) => {
f(object)
return object;
}
const buildCluster = (clusterConfig, connect = true) => {
const kafkaConfig = {...clusterConfig}
delete kafkaConfig.clusterName // new Kafka() tries to use this same value
const kafka = new Kafka(kafkaConfig)
const admin = kafka.admin()
const cluster = {
kafka,
admin,
config: clusterConfig
}
admin.connect().catch(e => console.error(cluster.error = e.toString()))
return cluster
}
const clusters =
Object.fromEntries(Object.entries(config.clusters)
.map(([clusterName, clusterData]) => [clusterName, buildCluster(clusterData)]))
console.log('CLUSTERS', clusters)
const app = express();
app.use(cors({
origin: config.frontendUrl,
}))
app.use(logger('dev'));
app.use(express.json());
app.use(express.urlencoded({ extended: false }));
app.use(cookieParser());
app.use(express.static(path.join(__dirname, 'public')));
const router = express.Router()
process.on('unhandledRejection', console.error)
/* GET topics listing. */
router.get('/topics/:cluster', async (req, res, _next) => {
const legalName = topicName => !topicName.startsWith("__")
try {
const topicList = (await clusters[req.params.cluster]?.admin.listTopics() || [])
res.send(topicList.filter(legalName))
} catch (e) {
res.status(502).send({
error: `Could not connect to cluster '${req.params.cluster}'`,
errorDetails: e.toString()
})
}
})
const passwordPlaceholder = 'XXXXXXXXXXXXXXXXXXXXXXXXXXXX'
const getClusterData = () =>
Object.fromEntries(
Object.entries(clusters).map(([key, value]) => {
value = JSON.parse(JSON.stringify(value))
if (value.config.sasl?.password) {
value.config.sasl.password = passwordPlaceholder
}
return [key, value.config]
}))
router.get('/clusters', async (req, res, _next) => {
console.log('/clusters')
res.send(getClusterData())
})
router.post('/clusters', async (req, res, _next) => {
console.log('/clusters post body', req.body)
const clusterName = req.body.clusterName
config.clusters[clusterName] = req.body
clusters[clusterName] = buildCluster(req.body)
res.send(getClusterData())
await storeConfig(config)
})
router.post('/clusters/delete', async (req, res, _next) => {
const clusterName = req.body.clusterName
// Kill all consumers connected to this cluster and notify those clients via their websocket
consumers.forEach(([{ consumer, cluster }, socket]) => {
if (clusters[clusterName] !== cluster) {
// TODO This reference equality may not be sufficient?
return
}
killConsumer(consumer)
const sendDeletedMessage = buildSocketMessageSender({ type: 'cluster_deleted', socket })
sendDeletedMessage(clusterName)
})
delete clusters[clusterName]
delete config.clusters[clusterName]
await storeConfig(config)
res.send(getClusterData())
})
router.put('/clusters', async (req, res, _next) => {
const hasPlaceholderPassword = req.body.sasl.password === passwordPlaceholder
const clusterName = req.body.clusterName
if (hasPlaceholderPassword) {
req.body.password = config.clusters[clusterName].password
}
config.clusters[clusterName] = req.body
clusters[clusterName] = buildCluster(req.body)
res.send('')
await storeConfig(config)
})
app.use(router)
const buildSocketMessageSender = ({ type, socket }) => message => {
socket.send(JSON.stringify({
type,
message
}))
}
const realTimeSearch = async ({ kafka, socket, topic }) =>
query.realTimeMessageSearch({
kafka,
topic,
onMessage: buildSocketMessageSender({ socket, type: 'message' })
});
const oneShotSearch = async ({ kafka, socket, topic, maxItems }) =>
query.searchMessages({
kafka,
topic,
maxItems,
onDone: buildSocketMessageSender({ socket, type: 'count' }),
onBatchDone: buildSocketMessageSender({ socket, type: 'item_count' })
})
/** @type {WebSocketServer} */
const wsServer = new ws.WebSocketServer({
noServer: true
})
/** @type {Map<WebSocket, ({consumer: kafka.Consumer, cluster: { kafka, admin, config }})>} */
const consumers = new Map()
buildEscape(consumers, clusters)
wsServer.on('connection', socket => {
socket.send('CONNECTED')
socket.on('close', async () => {
await killConsumer(consumers.get(socket).consumer)
consumers.delete(socket)
})
socket.on('message', async message => {
message = JSON.parse(message)
if (message.mode === 'kill') {
console.log('KILLING SOCKET')
await killConsumer(consumers.get(socket).consumer)
consumers.delete(socket)
return
}
const startSearch = message.mode === 'realTime' ? realTimeSearch : oneShotSearch
const cluster = clusters[message.cluster]
const run = async () => {
const consumerCluster = {
consumer: await startSearch({
kafka: cluster.kafka,
topic: message.topic,
maxItems: message.maxItems,
socket
}),
cluster
}
consumers.set(socket, consumerCluster);
}
run().catch(async e => {
console.error('run() error occurred!', e.toString())
await killConsumer(consumers.get(socket).consumer)
// Try again ONCE on failure
run().catch(ee => socket.send('ERROR: ' + ee))
})
})
})
const killConsumer = async consumer => {
await consumer?.stop()
await consumer?.disconnect()
}
module.exports = {
app,
wsServer
}