diff --git a/app.js b/app.js index 3b79d04..a37e64e 100644 --- a/app.js +++ b/app.js @@ -8,22 +8,28 @@ const path = require('path'); const cookieParser = require('cookie-parser'); const logger = require('morgan'); const query = require('./query') +const { buildEscape } = require('./safely-exit') const config = readConfig() console.log('CONFIG', config) -const buildCluster = clusterConfig => { +const c = object => { + console.log(object) + return object +} + +const buildCluster = (clusterConfig, connect = true) => { const kafkaConfig = {...clusterConfig} delete kafkaConfig.clusterName const kafka = new Kafka(kafkaConfig) const admin = kafka.admin() admin.connect().catch(console.error) - return { + return c({ kafka, admin, config: clusterConfig - } + }) } const clusters = @@ -42,10 +48,6 @@ app.use(express.urlencoded({ extended: false })); app.use(cookieParser()); app.use(express.static(path.join(__dirname, 'public'))); -const consumers = new Map() - -require('./safely-exit')(consumers, clusters) - const router = express.Router() /* GET topics listing. */ @@ -62,18 +64,35 @@ router.get('/topics/:cluster', async (req, res, _next) => { const passwordPlaceholder = 'XXXXXXXXXXXXXXXXXXXXXXXXXXXX' -router.get('/clusters', async (req, res, _next) => { - res.send(Object.fromEntries( +const getClusterData = () => + Object.fromEntries( Object.entries(clusters).map(([key, value]) => { value = JSON.parse(JSON.stringify(value)) value.config.sasl.password = passwordPlaceholder return [key, value.config] - }))) + })) + +router.get('/clusters', async (req, res, _next) => { + res.send(getClusterData()) }) router.post('/clusters', async (req, res, _next) => { console.log('/clusters post body', req.body) - res.send('') + const clusterName = req.body.clusterName + config.clusters[clusterName] = req.body + clusters[clusterName] = buildCluster(req.body) + res.send(getClusterData()) + await storeConfig(config) +}) + +router.post('/clusters/delete', async (req, res, _next) => { + console.log('/clusters/delete post body', req.body) + const clusterName = req.body.clusterName + // TODO: Disconnect + delete clusters[clusterName] + delete config.clusters[clusterName] + await storeConfig(config) + res.send(getClusterData()) }) router.put('/clusters', async (req, res, _next) => { @@ -88,14 +107,15 @@ router.put('/clusters', async (req, res, _next) => { config.clusters[clusterName] = req.body clusters[clusterName] = buildCluster(req.body) res.send('') + //res.send(getClusterData()) await storeConfig(config) }) app.use(router) -const realTimeSearch = async ({ cluster, searchCode, immutable, socket, topic }) => { - return query.realTimeMessageSearch({ - kafka: clusters[cluster].kafka, +const realTimeSearch = async ({ kafka, searchCode, immutable, socket, topic }) => + query.realTimeMessageSearch({ + kafka, topic, searchCode, immutable, @@ -106,11 +126,10 @@ const realTimeSearch = async ({ cluster, searchCode, immutable, socket, topic }) })) } }) -} -const search = async ({ cluster, searchCode, immutable, socket, topic, maxItems }) => { - return query.searchMessages({ - kafka: clusters[cluster].kafka, +const search = async ({ kafka, searchCode, immutable, socket, topic, maxItems }) => + query.searchMessages({ + kafka, topic, maxItems, searchCode, @@ -129,12 +148,15 @@ const search = async ({ cluster, searchCode, immutable, socket, topic, maxItems })) } }) -} const wsServer = new ws.WebSocketServer({ noServer: true }) +const consumers = new Map() + +buildEscape(consumers, clusters) + wsServer.on('connection', socket => { socket.on('close', async () => { await killConsumer(consumers.get(socket)) @@ -146,8 +168,12 @@ wsServer.on('connection', socket => { message = JSON.parse(message) const currentMode = message.mode === 'realTime' ? realTimeSearch : search + console.log('CLUSTERS before run', clusters) + console.log('message.cluster', message.cluster) + const cluster = clusters[message.cluster] + console.log('clusters[message.cluster]', cluster) const run = async () => consumers.set(socket, await currentMode({ - cluster: clusters[message.cluster].kafka, + kafka: cluster.kafka, searchCode: message.searchCode, immutable: message.immutable, topic: message.topic, @@ -159,7 +185,7 @@ wsServer.on('connection', socket => { console.error('run() error occurred!', e.toString()) await killConsumer(consumers.get(socket)) // Try again ONCE on failure - run().catch(e => socket.send('ERROR: ' + e)) + run().catch(ee => socket.send('ERROR: ' + ee)) }) }) }) diff --git a/clusters.js b/clusters.js new file mode 100644 index 0000000..c0c8b51 --- /dev/null +++ b/clusters.js @@ -0,0 +1,2 @@ +const { readConfig, storeConfig } = require('./config') +const config = readConfig() diff --git a/safely-exit.js b/safely-exit.js index dd67cf3..1658cf1 100644 --- a/safely-exit.js +++ b/safely-exit.js @@ -1,40 +1,40 @@ const errorTypes = ['unhandledRejection', 'uncaughtException'] const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2'] -const disconnectAll = async () => { - await Promise.all(Object.values(clusters).map(async cluster => cluster.admin.disconnect())) - await Promise.all( - Array.from(consumers.values()) - .map(async consumer => consumer.disconnect())) +const disconnectAll = async (consumers, clusters) => { + await Promise.all([ + ...Object.values(clusters).map(async cluster => cluster.admin.disconnect()), + ...Array.from(consumers.values()).map(async consumer => consumer.disconnect()) + ]) } -errorTypes.map(type => { - process.on(type, async e => { - try { - console.log(`process.on ${type}`) - console.error(e) - await disconnectAll() - process.exit(0) - } catch (_) { - process.exit(1) - } +const buildEscape = (consumers, clusters) => { + const disconnect = async () => disconnectAll(consumers, clusters) + errorTypes.map(type => { + process.on(type, async e => { + try { + console.log(`process.on ${type}`) + console.error(e) + await disconnect() + process.exit(0) + } catch (_) { + process.exit(1) + } + }) }) -}) -signalTraps.map(type => { - process.once(type, async () => { - try { - console.log('signalTrap: ' + type) - await disconnectAll() - } finally { - process.kill(process.pid, type) - } + signalTraps.map(type => { + process.once(type, async () => { + try { + console.log('signalTrap: ' + type) + await disconnect() + } finally { + process.kill(process.pid, type) + } + }) }) -}) - -let consumers -let clusters -module.exports = (consumerMap, clusterObject) => { - consumers = consumerMap - clusters = clusterObject +} + +module.exports = { + buildEscape }