Kill connected consumers of a cluster when it is deleted.
This commit is contained in:
parent
3f98892ddd
commit
25c587e622
38
app.js
38
app.js
|
@ -95,7 +95,18 @@ router.post('/clusters', async (req, res, _next) => {
|
|||
|
||||
router.post('/clusters/delete', async (req, res, _next) => {
|
||||
const clusterName = req.body.clusterName
|
||||
// TODO: Disconnect
|
||||
// Kill all consumers connected to this cluster and notify those clients via their websocket
|
||||
consumers.forEach(([{ consumer, cluster }, socket]) => {
|
||||
if (clusters[clusterName] !== cluster) {
|
||||
// TODO This reference equality may not be sufficient?
|
||||
return
|
||||
}
|
||||
killConsumer(consumer)
|
||||
socket.send(JSON.stringify({
|
||||
type: 'cluster_deleted',
|
||||
message: clusterName,
|
||||
}))
|
||||
})
|
||||
delete clusters[clusterName]
|
||||
delete config.clusters[clusterName]
|
||||
await storeConfig(config)
|
||||
|
@ -164,7 +175,7 @@ buildEscape(consumers, clusters)
|
|||
wsServer.on('connection', socket => {
|
||||
socket.send('CONNECTED')
|
||||
socket.on('close', async () => {
|
||||
await killConsumer(consumers.get(socket))
|
||||
await killConsumer(consumers.get(socket).consumer)
|
||||
consumers.delete(socket)
|
||||
})
|
||||
|
||||
|
@ -173,24 +184,27 @@ wsServer.on('connection', socket => {
|
|||
|
||||
if (message.mode === 'kill') {
|
||||
console.log('KILLING SOCKET')
|
||||
await killConsumer(consumers.get(socket))
|
||||
await killConsumer(consumers.get(socket).consumer)
|
||||
consumers.delete(socket)
|
||||
return
|
||||
}
|
||||
const currentMode = message.mode === 'realTime' ? realTimeSearch : search
|
||||
const cluster = clusters[message.cluster]
|
||||
const run = async () => consumers.set(socket, await currentMode({
|
||||
kafka: cluster.kafka,
|
||||
searchCode: message.searchCode,
|
||||
immutable: message.immutable,
|
||||
topic: message.topic,
|
||||
maxItems: message.maxItems,
|
||||
socket
|
||||
}))
|
||||
const run = async () => consumers.set(socket, {
|
||||
consumer: await currentMode({
|
||||
kafka: cluster.kafka,
|
||||
searchCode: message.searchCode,
|
||||
immutable: message.immutable,
|
||||
topic: message.topic,
|
||||
maxItems: message.maxItems,
|
||||
socket
|
||||
}),
|
||||
cluster
|
||||
})
|
||||
|
||||
run().catch(async e => {
|
||||
console.error('run() error occurred!', e.toString())
|
||||
await killConsumer(consumers.get(socket))
|
||||
await killConsumer(consumers.get(socket).consumer)
|
||||
// Try again ONCE on failure
|
||||
run().catch(ee => socket.send('ERROR: ' + ee))
|
||||
})
|
||||
|
|
3
query.js
3
query.js
|
@ -3,7 +3,8 @@ const buildMatcher = code => new Function('message', 'value', code)
|
|||
const deepCopy = obj => JSON.parse(JSON.stringify(obj))
|
||||
|
||||
const realTimeMessageSearch = async ({ kafka, topic, onMessage, searchCode, immutable = true }) => {
|
||||
const consumer = kafka.consumer({ groupId: 'sage-testing-group' })
|
||||
// TODO Use a UUID/GUID instead
|
||||
const consumer = kafka.consumer({ groupId: 'sage-testing-group' + Math.random() })
|
||||
|
||||
console.log({ topic, onMessage, searchCode, immutable })
|
||||
await consumer.connect()
|
||||
|
|
|
@ -4,7 +4,7 @@ const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2']
|
|||
const disconnectAll = async (consumers, clusters) => {
|
||||
await Promise.all([
|
||||
...Object.values(clusters).map(async cluster => cluster.admin.disconnect()),
|
||||
...Array.from(consumers.values()).map(async consumer => consumer.disconnect())
|
||||
...Array.from(consumers.values()).map(async consumer => consumer.consumer.disconnect())
|
||||
])
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue