2022-08-18 16:45:43 -04:00
|
|
|
const { Kafka } = require('kafkajs');
|
|
|
|
|
|
|
|
const { readConfig, storeConfig } = require('./config')
|
|
|
|
const express = require('express');
|
|
|
|
const cors = require('cors')
|
|
|
|
const ws = require('ws')
|
|
|
|
const path = require('path');
|
|
|
|
const cookieParser = require('cookie-parser');
|
|
|
|
const logger = require('morgan');
|
|
|
|
const query = require('./query')
|
2022-08-20 19:15:59 -04:00
|
|
|
const { buildEscape } = require('./safely-exit')
|
2022-08-18 16:45:43 -04:00
|
|
|
|
|
|
|
const config = readConfig()
|
|
|
|
console.log('CONFIG', config)
|
|
|
|
|
2022-08-20 19:15:59 -04:00
|
|
|
const c = object => {
|
|
|
|
console.log(object)
|
|
|
|
return object
|
|
|
|
}
|
|
|
|
|
|
|
|
const buildCluster = (clusterConfig, connect = true) => {
|
2022-08-18 16:45:43 -04:00
|
|
|
const kafkaConfig = {...clusterConfig}
|
|
|
|
delete kafkaConfig.clusterName
|
|
|
|
|
|
|
|
const kafka = new Kafka(kafkaConfig)
|
|
|
|
const admin = kafka.admin()
|
2022-08-26 18:50:02 -04:00
|
|
|
const cluster = {
|
2022-08-18 16:45:43 -04:00
|
|
|
kafka,
|
|
|
|
admin,
|
|
|
|
config: clusterConfig
|
2022-08-26 18:50:02 -04:00
|
|
|
}
|
|
|
|
admin.connect().catch(e => console.error(cluster.error = e.toString()))
|
|
|
|
return cluster
|
2022-08-18 16:45:43 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
const clusters =
|
|
|
|
Object.fromEntries(Object.entries(config.clusters)
|
|
|
|
.map(([clusterName, clusterData]) => [clusterName, buildCluster(clusterData)]))
|
|
|
|
console.log('CLUSTERS', clusters)
|
|
|
|
|
|
|
|
const app = express();
|
|
|
|
|
|
|
|
app.use(cors({
|
|
|
|
origin: 'http://localhost:5173'
|
|
|
|
}))
|
|
|
|
app.use(logger('dev'));
|
|
|
|
app.use(express.json());
|
|
|
|
app.use(express.urlencoded({ extended: false }));
|
|
|
|
app.use(cookieParser());
|
|
|
|
app.use(express.static(path.join(__dirname, 'public')));
|
|
|
|
|
|
|
|
const router = express.Router()
|
|
|
|
|
2022-08-26 18:50:02 -04:00
|
|
|
process.on('unhandledRejection', console.error)
|
|
|
|
|
2022-08-18 16:45:43 -04:00
|
|
|
/* GET topics listing. */
|
|
|
|
router.get('/topics/:cluster', async (req, res, _next) => {
|
|
|
|
const legalName = topicName => !topicName.startsWith("__")
|
2022-08-26 18:50:02 -04:00
|
|
|
try {
|
|
|
|
const topicList = (await clusters[req.params.cluster]?.admin.listTopics() || [])
|
2022-08-18 16:45:43 -04:00
|
|
|
res.send(topicList.filter(legalName))
|
2022-08-26 18:50:02 -04:00
|
|
|
} catch (e) {
|
|
|
|
res.status(502).send({
|
|
|
|
error: `Could not connect to cluster '${req.params.cluster}'`,
|
|
|
|
errorDetails: e.toString()
|
|
|
|
})
|
2022-08-18 16:45:43 -04:00
|
|
|
}
|
|
|
|
})
|
|
|
|
|
|
|
|
const passwordPlaceholder = 'XXXXXXXXXXXXXXXXXXXXXXXXXXXX'
|
|
|
|
|
2022-08-20 19:15:59 -04:00
|
|
|
const getClusterData = () =>
|
|
|
|
Object.fromEntries(
|
2022-08-18 16:45:43 -04:00
|
|
|
Object.entries(clusters).map(([key, value]) => {
|
|
|
|
value = JSON.parse(JSON.stringify(value))
|
2022-08-23 16:47:29 -04:00
|
|
|
if (value.config.sasl?.password) {
|
|
|
|
value.config.sasl.password = passwordPlaceholder
|
|
|
|
}
|
2022-08-18 16:45:43 -04:00
|
|
|
return [key, value.config]
|
2022-08-20 19:15:59 -04:00
|
|
|
}))
|
|
|
|
|
|
|
|
router.get('/clusters', async (req, res, _next) => {
|
2022-08-26 18:50:02 -04:00
|
|
|
console.log('/clusters')
|
2022-08-20 19:15:59 -04:00
|
|
|
res.send(getClusterData())
|
2022-08-18 16:45:43 -04:00
|
|
|
})
|
|
|
|
|
|
|
|
router.post('/clusters', async (req, res, _next) => {
|
|
|
|
console.log('/clusters post body', req.body)
|
2022-08-20 19:15:59 -04:00
|
|
|
const clusterName = req.body.clusterName
|
|
|
|
config.clusters[clusterName] = req.body
|
|
|
|
clusters[clusterName] = buildCluster(req.body)
|
|
|
|
res.send(getClusterData())
|
|
|
|
await storeConfig(config)
|
|
|
|
})
|
|
|
|
|
|
|
|
router.post('/clusters/delete', async (req, res, _next) => {
|
|
|
|
const clusterName = req.body.clusterName
|
|
|
|
// TODO: Disconnect
|
|
|
|
delete clusters[clusterName]
|
|
|
|
delete config.clusters[clusterName]
|
|
|
|
await storeConfig(config)
|
|
|
|
res.send(getClusterData())
|
2022-08-18 16:45:43 -04:00
|
|
|
})
|
|
|
|
|
|
|
|
router.put('/clusters', async (req, res, _next) => {
|
|
|
|
const hasPlaceholderPassword = req.body.sasl.password === passwordPlaceholder
|
|
|
|
const clusterName = req.body.clusterName
|
|
|
|
if (hasPlaceholderPassword) {
|
|
|
|
req.body.password = config.clusters[clusterName].password
|
|
|
|
}
|
|
|
|
config.clusters[clusterName] = req.body
|
|
|
|
clusters[clusterName] = buildCluster(req.body)
|
|
|
|
res.send('')
|
2022-08-20 19:15:59 -04:00
|
|
|
//res.send(getClusterData())
|
2022-08-18 16:45:43 -04:00
|
|
|
await storeConfig(config)
|
|
|
|
})
|
|
|
|
|
|
|
|
app.use(router)
|
|
|
|
|
2022-08-20 19:15:59 -04:00
|
|
|
const realTimeSearch = async ({ kafka, searchCode, immutable, socket, topic }) =>
|
|
|
|
query.realTimeMessageSearch({
|
|
|
|
kafka,
|
2022-08-18 16:45:43 -04:00
|
|
|
topic,
|
|
|
|
searchCode,
|
|
|
|
immutable,
|
|
|
|
onMessage: message => {
|
|
|
|
socket.send(JSON.stringify({
|
|
|
|
type: 'message',
|
|
|
|
message
|
|
|
|
}))
|
|
|
|
}
|
|
|
|
})
|
|
|
|
|
2022-08-20 19:15:59 -04:00
|
|
|
const search = async ({ kafka, searchCode, immutable, socket, topic, maxItems }) =>
|
|
|
|
query.searchMessages({
|
|
|
|
kafka,
|
2022-08-18 16:45:43 -04:00
|
|
|
topic,
|
|
|
|
maxItems,
|
|
|
|
searchCode,
|
|
|
|
immutable,
|
|
|
|
onDone: messages => {
|
|
|
|
console.log(messages.length + ' messages')
|
|
|
|
socket.send(JSON.stringify({
|
|
|
|
type: 'complete',
|
|
|
|
message: messages
|
|
|
|
}))
|
|
|
|
},
|
|
|
|
onBatchDone: count => {
|
|
|
|
socket.send(JSON.stringify({
|
|
|
|
type: 'item_count',
|
|
|
|
message: count
|
|
|
|
}))
|
|
|
|
}
|
|
|
|
})
|
|
|
|
|
|
|
|
const wsServer = new ws.WebSocketServer({
|
|
|
|
noServer: true
|
|
|
|
})
|
|
|
|
|
2022-08-20 19:15:59 -04:00
|
|
|
const consumers = new Map()
|
|
|
|
|
|
|
|
buildEscape(consumers, clusters)
|
|
|
|
|
2022-08-18 16:45:43 -04:00
|
|
|
wsServer.on('connection', socket => {
|
2022-08-23 16:47:29 -04:00
|
|
|
socket.send('CONNECTED')
|
2022-08-18 16:45:43 -04:00
|
|
|
socket.on('close', async () => {
|
|
|
|
await killConsumer(consumers.get(socket))
|
|
|
|
consumers.delete(socket)
|
|
|
|
})
|
|
|
|
|
|
|
|
socket.on('message', async message => {
|
2022-08-26 18:50:02 -04:00
|
|
|
console.log('socket message')
|
|
|
|
//socket.send('Loading...')
|
2022-08-18 16:45:43 -04:00
|
|
|
message = JSON.parse(message)
|
|
|
|
|
|
|
|
const currentMode = message.mode === 'realTime' ? realTimeSearch : search
|
2022-08-20 19:15:59 -04:00
|
|
|
const cluster = clusters[message.cluster]
|
2022-08-18 16:45:43 -04:00
|
|
|
const run = async () => consumers.set(socket, await currentMode({
|
2022-08-20 19:15:59 -04:00
|
|
|
kafka: cluster.kafka,
|
2022-08-18 16:45:43 -04:00
|
|
|
searchCode: message.searchCode,
|
|
|
|
immutable: message.immutable,
|
|
|
|
topic: message.topic,
|
|
|
|
maxItems: message.maxItems,
|
|
|
|
socket
|
|
|
|
}))
|
|
|
|
|
|
|
|
run().catch(async e => {
|
|
|
|
console.error('run() error occurred!', e.toString())
|
|
|
|
await killConsumer(consumers.get(socket))
|
|
|
|
// Try again ONCE on failure
|
2022-08-20 19:15:59 -04:00
|
|
|
run().catch(ee => socket.send('ERROR: ' + ee))
|
2022-08-18 16:45:43 -04:00
|
|
|
})
|
|
|
|
})
|
|
|
|
})
|
|
|
|
|
|
|
|
const killConsumer = async consumer => {
|
|
|
|
await consumer?.stop()
|
|
|
|
await consumer?.disconnect()
|
|
|
|
}
|
|
|
|
|
|
|
|
module.exports = {
|
|
|
|
app,
|
|
|
|
wsServer
|
|
|
|
}
|