const { Kafka } = require('kafkajs'); const { readConfig, storeConfig } = require('./config') const express = require('express'); const cors = require('cors') const ws = require('ws') const path = require('path'); const cookieParser = require('cookie-parser'); const logger = require('morgan'); const query = require('./query') const { buildEscape } = require('./safely-exit') const config = readConfig() console.log('CONFIG', config) const c = object => { console.log(object) return object } const buildCluster = (clusterConfig, connect = true) => { const kafkaConfig = {...clusterConfig} delete kafkaConfig.clusterName const kafka = new Kafka(kafkaConfig) const admin = kafka.admin() admin.connect().catch(console.error) return c({ kafka, admin, config: clusterConfig }) } const clusters = Object.fromEntries(Object.entries(config.clusters) .map(([clusterName, clusterData]) => [clusterName, buildCluster(clusterData)])) console.log('CLUSTERS', clusters) const app = express(); app.use(cors({ origin: 'http://localhost:5173' })) app.use(logger('dev')); app.use(express.json()); app.use(express.urlencoded({ extended: false })); app.use(cookieParser()); app.use(express.static(path.join(__dirname, 'public'))); const router = express.Router() /* GET topics listing. */ router.get('/topics/:cluster', async (req, res, _next) => { const legalName = topicName => !topicName.startsWith("__") const topicList = await clusters[req.params.cluster]?.admin.listTopics() if (topicList) { res.send(topicList.filter(legalName)) } else { res.send([]) //res.status(400).send({error: 'Invalid cluster name'}) } }) const passwordPlaceholder = 'XXXXXXXXXXXXXXXXXXXXXXXXXXXX' const getClusterData = () => Object.fromEntries( Object.entries(clusters).map(([key, value]) => { value = JSON.parse(JSON.stringify(value)) value.config.sasl.password = passwordPlaceholder return [key, value.config] })) router.get('/clusters', async (req, res, _next) => { res.send(getClusterData()) }) router.post('/clusters', async (req, res, _next) => { console.log('/clusters post body', req.body) const clusterName = req.body.clusterName config.clusters[clusterName] = req.body clusters[clusterName] = buildCluster(req.body) res.send(getClusterData()) await storeConfig(config) }) router.post('/clusters/delete', async (req, res, _next) => { console.log('/clusters/delete post body', req.body) const clusterName = req.body.clusterName // TODO: Disconnect delete clusters[clusterName] delete config.clusters[clusterName] await storeConfig(config) res.send(getClusterData()) }) router.put('/clusters', async (req, res, _next) => { console.log('/clusters put body', req.body) const hasPlaceholderPassword = req.body.sasl.password === passwordPlaceholder console.log('Has placeholder password:', hasPlaceholderPassword) const clusterName = req.body.clusterName if (hasPlaceholderPassword) { req.body.password = config.clusters[clusterName].password } config.clusters[clusterName] = req.body clusters[clusterName] = buildCluster(req.body) res.send('') //res.send(getClusterData()) await storeConfig(config) }) app.use(router) const realTimeSearch = async ({ kafka, searchCode, immutable, socket, topic }) => query.realTimeMessageSearch({ kafka, topic, searchCode, immutable, onMessage: message => { socket.send(JSON.stringify({ type: 'message', message })) } }) const search = async ({ kafka, searchCode, immutable, socket, topic, maxItems }) => query.searchMessages({ kafka, topic, maxItems, searchCode, immutable, onDone: messages => { console.log(messages.length + ' messages') socket.send(JSON.stringify({ type: 'complete', message: messages })) }, onBatchDone: count => { socket.send(JSON.stringify({ type: 'item_count', message: count })) } }) const wsServer = new ws.WebSocketServer({ noServer: true }) const consumers = new Map() buildEscape(consumers, clusters) wsServer.on('connection', socket => { socket.on('close', async () => { await killConsumer(consumers.get(socket)) consumers.delete(socket) }) socket.on('message', async message => { socket.send('Loading...') message = JSON.parse(message) const currentMode = message.mode === 'realTime' ? realTimeSearch : search console.log('CLUSTERS before run', clusters) console.log('message.cluster', message.cluster) const cluster = clusters[message.cluster] console.log('clusters[message.cluster]', cluster) const run = async () => consumers.set(socket, await currentMode({ kafka: cluster.kafka, searchCode: message.searchCode, immutable: message.immutable, topic: message.topic, maxItems: message.maxItems, socket })) run().catch(async e => { console.error('run() error occurred!', e.toString()) await killConsumer(consumers.get(socket)) // Try again ONCE on failure run().catch(ee => socket.send('ERROR: ' + ee)) }) }) }) const killConsumer = async consumer => { await consumer?.stop() await consumer?.disconnect() } module.exports = { app, wsServer }