210 lines
5.4 KiB
JavaScript
210 lines
5.4 KiB
JavaScript
const { Kafka } = require('kafkajs');
|
|
|
|
const { readConfig, storeConfig } = require('./config')
|
|
const express = require('express');
|
|
const cors = require('cors')
|
|
const ws = require('ws')
|
|
const path = require('path');
|
|
const cookieParser = require('cookie-parser');
|
|
const logger = require('morgan');
|
|
const query = require('./query')
|
|
const { buildEscape } = require('./safely-exit')
|
|
|
|
const config = readConfig()
|
|
console.log('CONFIG', config)
|
|
|
|
const c = object => {
|
|
console.log(object)
|
|
return object
|
|
}
|
|
|
|
const buildCluster = (clusterConfig, connect = true) => {
|
|
const kafkaConfig = {...clusterConfig}
|
|
delete kafkaConfig.clusterName
|
|
|
|
const kafka = new Kafka(kafkaConfig)
|
|
const admin = kafka.admin()
|
|
admin.connect().catch(console.error)
|
|
return c({
|
|
kafka,
|
|
admin,
|
|
config: clusterConfig
|
|
})
|
|
}
|
|
|
|
const clusters =
|
|
Object.fromEntries(Object.entries(config.clusters)
|
|
.map(([clusterName, clusterData]) => [clusterName, buildCluster(clusterData)]))
|
|
console.log('CLUSTERS', clusters)
|
|
|
|
const app = express();
|
|
|
|
app.use(cors({
|
|
origin: 'http://localhost:5173'
|
|
}))
|
|
app.use(logger('dev'));
|
|
app.use(express.json());
|
|
app.use(express.urlencoded({ extended: false }));
|
|
app.use(cookieParser());
|
|
app.use(express.static(path.join(__dirname, 'public')));
|
|
|
|
const router = express.Router()
|
|
|
|
/* GET topics listing. */
|
|
router.get('/topics/:cluster', async (req, res, _next) => {
|
|
const legalName = topicName => !topicName.startsWith("__")
|
|
const topicList = await clusters[req.params.cluster]?.admin.listTopics()
|
|
if (topicList) {
|
|
res.send(topicList.filter(legalName))
|
|
} else {
|
|
res.send([])
|
|
//res.status(400).send({error: 'Invalid cluster name'})
|
|
}
|
|
})
|
|
|
|
const passwordPlaceholder = 'XXXXXXXXXXXXXXXXXXXXXXXXXXXX'
|
|
|
|
const getClusterData = () =>
|
|
Object.fromEntries(
|
|
Object.entries(clusters).map(([key, value]) => {
|
|
value = JSON.parse(JSON.stringify(value))
|
|
if (value.config.sasl?.password) {
|
|
value.config.sasl.password = passwordPlaceholder
|
|
}
|
|
return [key, value.config]
|
|
}))
|
|
|
|
router.get('/clusters', async (req, res, _next) => {
|
|
res.send(getClusterData())
|
|
})
|
|
|
|
router.post('/clusters', async (req, res, _next) => {
|
|
console.log('/clusters post body', req.body)
|
|
const clusterName = req.body.clusterName
|
|
config.clusters[clusterName] = req.body
|
|
clusters[clusterName] = buildCluster(req.body)
|
|
res.send(getClusterData())
|
|
await storeConfig(config)
|
|
})
|
|
|
|
router.post('/clusters/delete', async (req, res, _next) => {
|
|
console.log('/clusters/delete post body', req.body)
|
|
const clusterName = req.body.clusterName
|
|
// TODO: Disconnect
|
|
delete clusters[clusterName]
|
|
delete config.clusters[clusterName]
|
|
await storeConfig(config)
|
|
res.send(getClusterData())
|
|
})
|
|
|
|
router.put('/clusters', async (req, res, _next) => {
|
|
console.log('/clusters put body', req.body)
|
|
|
|
const hasPlaceholderPassword = req.body.sasl.password === passwordPlaceholder
|
|
console.log('Has placeholder password:', hasPlaceholderPassword)
|
|
const clusterName = req.body.clusterName
|
|
if (hasPlaceholderPassword) {
|
|
req.body.password = config.clusters[clusterName].password
|
|
}
|
|
config.clusters[clusterName] = req.body
|
|
clusters[clusterName] = buildCluster(req.body)
|
|
res.send('')
|
|
//res.send(getClusterData())
|
|
await storeConfig(config)
|
|
})
|
|
|
|
app.use(router)
|
|
|
|
const realTimeSearch = async ({ kafka, searchCode, immutable, socket, topic }) =>
|
|
query.realTimeMessageSearch({
|
|
kafka,
|
|
topic,
|
|
searchCode,
|
|
immutable,
|
|
onMessage: message => {
|
|
socket.send(JSON.stringify({
|
|
type: 'message',
|
|
message
|
|
}))
|
|
}
|
|
})
|
|
|
|
const search = async ({ kafka, searchCode, immutable, socket, topic, maxItems }) =>
|
|
query.searchMessages({
|
|
kafka,
|
|
topic,
|
|
maxItems,
|
|
searchCode,
|
|
immutable,
|
|
onDone: messages => {
|
|
console.log(messages.length + ' messages')
|
|
socket.send(JSON.stringify({
|
|
type: 'complete',
|
|
message: messages
|
|
}))
|
|
},
|
|
onBatchDone: count => {
|
|
socket.send(JSON.stringify({
|
|
type: 'item_count',
|
|
message: count
|
|
}))
|
|
}
|
|
})
|
|
|
|
const wsServer = new ws.WebSocketServer({
|
|
noServer: true
|
|
})
|
|
|
|
const consumers = new Map()
|
|
|
|
buildEscape(consumers, clusters)
|
|
|
|
wsServer.on('connection', socket => {
|
|
socket.send('CONNECTED')
|
|
socket.on('close', async () => {
|
|
await killConsumer(consumers.get(socket))
|
|
consumers.delete(socket)
|
|
})
|
|
|
|
socket.on('message', async message => {
|
|
message = JSON.parse(message)
|
|
|
|
if (message.mode === 'kill') {
|
|
console.log('KILLING SOCKET')
|
|
await killConsumer(consumers.get(socket))
|
|
consumers.delete(socket)
|
|
return
|
|
}
|
|
const currentMode = message.mode === 'realTime' ? realTimeSearch : search
|
|
console.log('CLUSTERS before run', clusters)
|
|
console.log('message.cluster', message.cluster)
|
|
const cluster = clusters[message.cluster]
|
|
console.log('clusters[message.cluster]', cluster)
|
|
const run = async () => consumers.set(socket, await currentMode({
|
|
kafka: cluster.kafka,
|
|
searchCode: message.searchCode,
|
|
immutable: message.immutable,
|
|
topic: message.topic,
|
|
maxItems: message.maxItems,
|
|
socket
|
|
}))
|
|
|
|
run().catch(async e => {
|
|
console.error('run() error occurred!', e.toString())
|
|
await killConsumer(consumers.get(socket))
|
|
// Try again ONCE on failure
|
|
run().catch(ee => socket.send('ERROR: ' + ee))
|
|
})
|
|
})
|
|
})
|
|
|
|
const killConsumer = async consumer => {
|
|
await consumer?.stop()
|
|
await consumer?.disconnect()
|
|
}
|
|
|
|
module.exports = {
|
|
app,
|
|
wsServer
|
|
}
|