From 25c587e62230f6eb6ff4313819dacbc59d3a71c9 Mon Sep 17 00:00:00 2001 From: Sage Vaillancourt Date: Mon, 29 Aug 2022 07:33:25 -0400 Subject: [PATCH] Kill connected consumers of a cluster when it is deleted. --- app.js | 38 ++++++++++++++++++++++++++------------ query.js | 3 ++- safely-exit.js | 2 +- 3 files changed, 29 insertions(+), 14 deletions(-) diff --git a/app.js b/app.js index eace751..248d418 100644 --- a/app.js +++ b/app.js @@ -95,7 +95,18 @@ router.post('/clusters', async (req, res, _next) => { router.post('/clusters/delete', async (req, res, _next) => { const clusterName = req.body.clusterName - // TODO: Disconnect + // Kill all consumers connected to this cluster and notify those clients via their websocket + consumers.forEach(([{ consumer, cluster }, socket]) => { + if (clusters[clusterName] !== cluster) { + // TODO This reference equality may not be sufficient? + return + } + killConsumer(consumer) + socket.send(JSON.stringify({ + type: 'cluster_deleted', + message: clusterName, + })) + }) delete clusters[clusterName] delete config.clusters[clusterName] await storeConfig(config) @@ -164,7 +175,7 @@ buildEscape(consumers, clusters) wsServer.on('connection', socket => { socket.send('CONNECTED') socket.on('close', async () => { - await killConsumer(consumers.get(socket)) + await killConsumer(consumers.get(socket).consumer) consumers.delete(socket) }) @@ -173,24 +184,27 @@ wsServer.on('connection', socket => { if (message.mode === 'kill') { console.log('KILLING SOCKET') - await killConsumer(consumers.get(socket)) + await killConsumer(consumers.get(socket).consumer) consumers.delete(socket) return } const currentMode = message.mode === 'realTime' ? realTimeSearch : search const cluster = clusters[message.cluster] - const run = async () => consumers.set(socket, await currentMode({ - kafka: cluster.kafka, - searchCode: message.searchCode, - immutable: message.immutable, - topic: message.topic, - maxItems: message.maxItems, - socket - })) + const run = async () => consumers.set(socket, { + consumer: await currentMode({ + kafka: cluster.kafka, + searchCode: message.searchCode, + immutable: message.immutable, + topic: message.topic, + maxItems: message.maxItems, + socket + }), + cluster + }) run().catch(async e => { console.error('run() error occurred!', e.toString()) - await killConsumer(consumers.get(socket)) + await killConsumer(consumers.get(socket).consumer) // Try again ONCE on failure run().catch(ee => socket.send('ERROR: ' + ee)) }) diff --git a/query.js b/query.js index a6d0afd..22abd2a 100644 --- a/query.js +++ b/query.js @@ -3,7 +3,8 @@ const buildMatcher = code => new Function('message', 'value', code) const deepCopy = obj => JSON.parse(JSON.stringify(obj)) const realTimeMessageSearch = async ({ kafka, topic, onMessage, searchCode, immutable = true }) => { - const consumer = kafka.consumer({ groupId: 'sage-testing-group' }) + // TODO Use a UUID/GUID instead + const consumer = kafka.consumer({ groupId: 'sage-testing-group' + Math.random() }) console.log({ topic, onMessage, searchCode, immutable }) await consumer.connect() diff --git a/safely-exit.js b/safely-exit.js index 1658cf1..d08c562 100644 --- a/safely-exit.js +++ b/safely-exit.js @@ -4,7 +4,7 @@ const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2'] const disconnectAll = async (consumers, clusters) => { await Promise.all([ ...Object.values(clusters).map(async cluster => cluster.admin.disconnect()), - ...Array.from(consumers.values()).map(async consumer => consumer.disconnect()) + ...Array.from(consumers.values()).map(async consumer => consumer.consumer.disconnect()) ]) }