kafka-dance-api/app.js

176 lines
4.4 KiB
JavaScript

const { Kafka } = require('kafkajs');
const { readConfig, storeConfig } = require('./config')
const express = require('express');
const cors = require('cors')
const ws = require('ws')
const path = require('path');
const cookieParser = require('cookie-parser');
const logger = require('morgan');
const query = require('./query')
const config = readConfig()
console.log('CONFIG', config)
const buildCluster = clusterConfig => {
const kafkaConfig = {...clusterConfig}
delete kafkaConfig.clusterName
const kafka = new Kafka(kafkaConfig)
const admin = kafka.admin()
admin.connect().catch(console.error)
return {
kafka,
admin,
config: clusterConfig
}
}
const clusters =
Object.fromEntries(Object.entries(config.clusters)
.map(([clusterName, clusterData]) => [clusterName, buildCluster(clusterData)]))
console.log('CLUSTERS', clusters)
const app = express();
app.use(cors({
origin: 'http://localhost:5173'
}))
app.use(logger('dev'));
app.use(express.json());
app.use(express.urlencoded({ extended: false }));
app.use(cookieParser());
app.use(express.static(path.join(__dirname, 'public')));
const consumers = new Map()
require('./safely-exit')(consumers, clusters)
const router = express.Router()
/* GET topics listing. */
router.get('/topics/:cluster', async (req, res, _next) => {
const legalName = topicName => !topicName.startsWith("__")
const topicList = await clusters[req.params.cluster]?.admin.listTopics()
if (topicList) {
res.send(topicList.filter(legalName))
} else {
res.send([])
//res.status(400).send({error: 'Invalid cluster name'})
}
})
const passwordPlaceholder = 'XXXXXXXXXXXXXXXXXXXXXXXXXXXX'
router.get('/clusters', async (req, res, _next) => {
res.send(Object.fromEntries(
Object.entries(clusters).map(([key, value]) => {
value = JSON.parse(JSON.stringify(value))
value.config.sasl.password = passwordPlaceholder
return [key, value.config]
})))
})
router.post('/clusters', async (req, res, _next) => {
console.log('/clusters post body', req.body)
res.send('')
})
router.put('/clusters', async (req, res, _next) => {
console.log('/clusters put body', req.body)
const hasPlaceholderPassword = req.body.sasl.password === passwordPlaceholder
console.log('Has placeholder password:', hasPlaceholderPassword)
const clusterName = req.body.clusterName
if (hasPlaceholderPassword) {
req.body.password = config.clusters[clusterName].password
}
config.clusters[clusterName] = req.body
clusters[clusterName] = buildCluster(req.body)
res.send('')
await storeConfig(config)
})
app.use(router)
const realTimeSearch = async ({ cluster, searchCode, immutable, socket, topic }) => {
return query.realTimeMessageSearch({
kafka: clusters[cluster].kafka,
topic,
searchCode,
immutable,
onMessage: message => {
socket.send(JSON.stringify({
type: 'message',
message
}))
}
})
}
const search = async ({ cluster, searchCode, immutable, socket, topic, maxItems }) => {
return query.searchMessages({
kafka: clusters[cluster].kafka,
topic,
maxItems,
searchCode,
immutable,
onDone: messages => {
console.log(messages.length + ' messages')
socket.send(JSON.stringify({
type: 'complete',
message: messages
}))
},
onBatchDone: count => {
socket.send(JSON.stringify({
type: 'item_count',
message: count
}))
}
})
}
const wsServer = new ws.WebSocketServer({
noServer: true
})
wsServer.on('connection', socket => {
socket.on('close', async () => {
await killConsumer(consumers.get(socket))
consumers.delete(socket)
})
socket.on('message', async message => {
socket.send('Loading...')
message = JSON.parse(message)
const currentMode = message.mode === 'realTime' ? realTimeSearch : search
const run = async () => consumers.set(socket, await currentMode({
cluster: clusters[message.cluster].kafka,
searchCode: message.searchCode,
immutable: message.immutable,
topic: message.topic,
maxItems: message.maxItems,
socket
}))
run().catch(async e => {
console.error('run() error occurred!', e.toString())
await killConsumer(consumers.get(socket))
// Try again ONCE on failure
run().catch(e => socket.send('ERROR: ' + e))
})
})
})
const killConsumer = async consumer => {
await consumer?.stop()
await consumer?.disconnect()
}
module.exports = {
app,
wsServer
}