Compare commits
10 Commits
5aa87c0775
...
1afe267866
Author | SHA1 | Date |
---|---|---|
Sage Vaillancourt | 1afe267866 | |
Sage Vaillancourt | 75e81e7956 | |
Sage Vaillancourt | 76ab13b385 | |
Sage Vaillancourt | bae8b2fc6b | |
Sage Vaillancourt | 25c587e622 | |
Sage Vaillancourt | e6dbc34e14 | |
Sage Vaillancourt | 3f98892ddd | |
Sage Vaillancourt | cf21c56231 | |
Sage Vaillancourt | ac2d3504bd | |
Sage Vaillancourt | 61b5da846a |
138
app.js
138
app.js
|
@ -13,23 +13,31 @@ const { buildEscape } = require('./safely-exit')
|
||||||
const config = readConfig()
|
const config = readConfig()
|
||||||
console.log('CONFIG', config)
|
console.log('CONFIG', config)
|
||||||
|
|
||||||
const c = object => {
|
/**
|
||||||
console.log(object)
|
* Log out an object and return it.
|
||||||
return object
|
*
|
||||||
|
* @param object The object to log
|
||||||
|
* @param f Optional param to specify an alternative log function. E.g. console.error
|
||||||
|
* @returns {*}
|
||||||
|
*/
|
||||||
|
const c = (object, f = console.log) => {
|
||||||
|
f(object)
|
||||||
|
return object;
|
||||||
}
|
}
|
||||||
|
|
||||||
const buildCluster = (clusterConfig, connect = true) => {
|
const buildCluster = (clusterConfig, connect = true) => {
|
||||||
const kafkaConfig = {...clusterConfig}
|
const kafkaConfig = {...clusterConfig}
|
||||||
delete kafkaConfig.clusterName
|
delete kafkaConfig.clusterName // new Kafka() tries to use this same value
|
||||||
|
|
||||||
const kafka = new Kafka(kafkaConfig)
|
const kafka = new Kafka(kafkaConfig)
|
||||||
const admin = kafka.admin()
|
const admin = kafka.admin()
|
||||||
admin.connect().catch(console.error)
|
const cluster = {
|
||||||
return c({
|
|
||||||
kafka,
|
kafka,
|
||||||
admin,
|
admin,
|
||||||
config: clusterConfig
|
config: clusterConfig
|
||||||
})
|
}
|
||||||
|
admin.connect().catch(e => console.error(cluster.error = e.toString()))
|
||||||
|
return cluster
|
||||||
}
|
}
|
||||||
|
|
||||||
const clusters =
|
const clusters =
|
||||||
|
@ -40,7 +48,7 @@ console.log('CLUSTERS', clusters)
|
||||||
const app = express();
|
const app = express();
|
||||||
|
|
||||||
app.use(cors({
|
app.use(cors({
|
||||||
origin: 'http://localhost:5173'
|
origin: config.frontendUrl,
|
||||||
}))
|
}))
|
||||||
app.use(logger('dev'));
|
app.use(logger('dev'));
|
||||||
app.use(express.json());
|
app.use(express.json());
|
||||||
|
@ -50,29 +58,45 @@ app.use(express.static(path.join(__dirname, 'public')));
|
||||||
|
|
||||||
const router = express.Router()
|
const router = express.Router()
|
||||||
|
|
||||||
|
process.on('unhandledRejection', console.error)
|
||||||
|
|
||||||
/* GET topics listing. */
|
/* GET topics listing. */
|
||||||
router.get('/topics/:cluster', async (req, res, _next) => {
|
router.get('/topics/:cluster', async (req, res, _next) => {
|
||||||
const legalName = topicName => !topicName.startsWith("__")
|
const legalName = topicName => !topicName.startsWith("__")
|
||||||
const topicList = await clusters[req.params.cluster]?.admin.listTopics()
|
try {
|
||||||
if (topicList) {
|
const topicList = (await clusters[req.params.cluster]?.admin.listTopics() || [])
|
||||||
res.send(topicList.filter(legalName))
|
res.send(topicList.filter(legalName))
|
||||||
} else {
|
} catch (e) {
|
||||||
res.send([])
|
res.status(502).send({
|
||||||
//res.status(400).send({error: 'Invalid cluster name'})
|
error: `Could not connect to cluster '${req.params.cluster}'`,
|
||||||
|
errorDetails: e.toString()
|
||||||
|
})
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
const buildSocketMessageSender = ({ type, socket }) => message => {
|
||||||
|
socket.send(JSON.stringify({
|
||||||
|
type,
|
||||||
|
message
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
const deepCopy = object => JSON.parse(JSON.stringify(object))
|
||||||
|
|
||||||
const passwordPlaceholder = 'XXXXXXXXXXXXXXXXXXXXXXXXXXXX'
|
const passwordPlaceholder = 'XXXXXXXXXXXXXXXXXXXXXXXXXXXX'
|
||||||
|
|
||||||
const getClusterData = () =>
|
const getClusterData = () =>
|
||||||
Object.fromEntries(
|
Object.fromEntries(
|
||||||
Object.entries(clusters).map(([key, value]) => {
|
Object.entries(clusters).map(([key, cluster]) => {
|
||||||
value = JSON.parse(JSON.stringify(value))
|
cluster = deepCopy(cluster)
|
||||||
value.config.sasl.password = passwordPlaceholder
|
if (cluster.config.sasl?.password) {
|
||||||
return [key, value.config]
|
cluster.config.sasl.password = passwordPlaceholder
|
||||||
|
}
|
||||||
|
return [key, cluster.config]
|
||||||
}))
|
}))
|
||||||
|
|
||||||
router.get('/clusters', async (req, res, _next) => {
|
router.get('/clusters', async (req, res, _next) => {
|
||||||
|
console.log('/clusters')
|
||||||
res.send(getClusterData())
|
res.send(getClusterData())
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -86,9 +110,17 @@ router.post('/clusters', async (req, res, _next) => {
|
||||||
})
|
})
|
||||||
|
|
||||||
router.post('/clusters/delete', async (req, res, _next) => {
|
router.post('/clusters/delete', async (req, res, _next) => {
|
||||||
console.log('/clusters/delete post body', req.body)
|
|
||||||
const clusterName = req.body.clusterName
|
const clusterName = req.body.clusterName
|
||||||
// TODO: Disconnect
|
// Kill all consumers connected to this cluster and notify those clients via their websocket
|
||||||
|
consumers.forEach(([{ consumer, cluster }, socket]) => {
|
||||||
|
if (clusters[clusterName] !== cluster) {
|
||||||
|
// TODO This reference equality may not be sufficient?
|
||||||
|
return
|
||||||
|
}
|
||||||
|
killConsumer(consumer)
|
||||||
|
const sendDeletedMessage = buildSocketMessageSender({ type: 'cluster_deleted', socket })
|
||||||
|
sendDeletedMessage(clusterName)
|
||||||
|
})
|
||||||
delete clusters[clusterName]
|
delete clusters[clusterName]
|
||||||
delete config.clusters[clusterName]
|
delete config.clusters[clusterName]
|
||||||
await storeConfig(config)
|
await storeConfig(config)
|
||||||
|
@ -96,10 +128,7 @@ router.post('/clusters/delete', async (req, res, _next) => {
|
||||||
})
|
})
|
||||||
|
|
||||||
router.put('/clusters', async (req, res, _next) => {
|
router.put('/clusters', async (req, res, _next) => {
|
||||||
console.log('/clusters put body', req.body)
|
|
||||||
|
|
||||||
const hasPlaceholderPassword = req.body.sasl.password === passwordPlaceholder
|
const hasPlaceholderPassword = req.body.sasl.password === passwordPlaceholder
|
||||||
console.log('Has placeholder password:', hasPlaceholderPassword)
|
|
||||||
const clusterName = req.body.clusterName
|
const clusterName = req.body.clusterName
|
||||||
if (hasPlaceholderPassword) {
|
if (hasPlaceholderPassword) {
|
||||||
req.body.password = config.clusters[clusterName].password
|
req.body.password = config.clusters[clusterName].password
|
||||||
|
@ -107,83 +136,72 @@ router.put('/clusters', async (req, res, _next) => {
|
||||||
config.clusters[clusterName] = req.body
|
config.clusters[clusterName] = req.body
|
||||||
clusters[clusterName] = buildCluster(req.body)
|
clusters[clusterName] = buildCluster(req.body)
|
||||||
res.send('')
|
res.send('')
|
||||||
//res.send(getClusterData())
|
|
||||||
await storeConfig(config)
|
await storeConfig(config)
|
||||||
})
|
})
|
||||||
|
|
||||||
app.use(router)
|
app.use(router)
|
||||||
|
|
||||||
const realTimeSearch = async ({ kafka, searchCode, immutable, socket, topic }) =>
|
const realTimeSearch = async ({ kafka, socket, topic }) =>
|
||||||
query.realTimeMessageSearch({
|
query.realTimeMessageSearch({
|
||||||
kafka,
|
kafka,
|
||||||
topic,
|
topic,
|
||||||
searchCode,
|
onMessage: buildSocketMessageSender({ socket, type: 'message' })
|
||||||
immutable,
|
});
|
||||||
onMessage: message => {
|
|
||||||
socket.send(JSON.stringify({
|
|
||||||
type: 'message',
|
|
||||||
message
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
})
|
|
||||||
|
|
||||||
const search = async ({ kafka, searchCode, immutable, socket, topic, maxItems }) =>
|
const oneShotSearch = async ({ kafka, socket, topic, maxItems }) =>
|
||||||
query.searchMessages({
|
query.searchMessages({
|
||||||
kafka,
|
kafka,
|
||||||
topic,
|
topic,
|
||||||
maxItems,
|
maxItems,
|
||||||
searchCode,
|
onDone: buildSocketMessageSender({ socket, type: 'count' }),
|
||||||
immutable,
|
onBatchDone: buildSocketMessageSender({ socket, type: 'item_count' })
|
||||||
onDone: messages => {
|
|
||||||
console.log(messages.length + ' messages')
|
|
||||||
socket.send(JSON.stringify({
|
|
||||||
type: 'complete',
|
|
||||||
message: messages
|
|
||||||
}))
|
|
||||||
},
|
|
||||||
onBatchDone: count => {
|
|
||||||
socket.send(JSON.stringify({
|
|
||||||
type: 'item_count',
|
|
||||||
message: count
|
|
||||||
}))
|
|
||||||
}
|
|
||||||
})
|
})
|
||||||
|
|
||||||
|
/** @type {WebSocketServer} */
|
||||||
const wsServer = new ws.WebSocketServer({
|
const wsServer = new ws.WebSocketServer({
|
||||||
noServer: true
|
noServer: true
|
||||||
})
|
})
|
||||||
|
|
||||||
|
/** @type {Map<WebSocket, ({consumer: kafka.Consumer, cluster: { kafka, admin, config }})>} */
|
||||||
const consumers = new Map()
|
const consumers = new Map()
|
||||||
|
|
||||||
buildEscape(consumers, clusters)
|
buildEscape(consumers, clusters)
|
||||||
|
|
||||||
wsServer.on('connection', socket => {
|
wsServer.on('connection', socket => {
|
||||||
|
socket.send('CONNECTED')
|
||||||
socket.on('close', async () => {
|
socket.on('close', async () => {
|
||||||
await killConsumer(consumers.get(socket))
|
await killConsumer(consumers.get(socket).consumer)
|
||||||
consumers.delete(socket)
|
consumers.delete(socket)
|
||||||
})
|
})
|
||||||
|
|
||||||
socket.on('message', async message => {
|
socket.on('message', async message => {
|
||||||
socket.send('Loading...')
|
|
||||||
message = JSON.parse(message)
|
message = JSON.parse(message)
|
||||||
|
|
||||||
const currentMode = message.mode === 'realTime' ? realTimeSearch : search
|
if (message.mode === 'kill') {
|
||||||
console.log('CLUSTERS before run', clusters)
|
console.log('KILLING SOCKET')
|
||||||
console.log('message.cluster', message.cluster)
|
await killConsumer(consumers.get(socket).consumer)
|
||||||
|
consumers.delete(socket)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
const startSearch = message.mode === 'realTime' ? realTimeSearch : oneShotSearch
|
||||||
const cluster = clusters[message.cluster]
|
const cluster = clusters[message.cluster]
|
||||||
console.log('clusters[message.cluster]', cluster)
|
|
||||||
const run = async () => consumers.set(socket, await currentMode({
|
const run = async () => {
|
||||||
|
const consumerCluster = {
|
||||||
|
consumer: await startSearch({
|
||||||
kafka: cluster.kafka,
|
kafka: cluster.kafka,
|
||||||
searchCode: message.searchCode,
|
|
||||||
immutable: message.immutable,
|
|
||||||
topic: message.topic,
|
topic: message.topic,
|
||||||
maxItems: message.maxItems,
|
maxItems: message.maxItems,
|
||||||
socket
|
socket
|
||||||
}))
|
}),
|
||||||
|
cluster
|
||||||
|
}
|
||||||
|
consumers.set(socket, consumerCluster);
|
||||||
|
}
|
||||||
|
|
||||||
run().catch(async e => {
|
run().catch(async e => {
|
||||||
console.error('run() error occurred!', e.toString())
|
console.error('run() error occurred!', e.toString())
|
||||||
await killConsumer(consumers.get(socket))
|
await killConsumer(consumers.get(socket).consumer)
|
||||||
// Try again ONCE on failure
|
// Try again ONCE on failure
|
||||||
run().catch(ee => socket.send('ERROR: ' + ee))
|
run().catch(ee => socket.send('ERROR: ' + ee))
|
||||||
})
|
})
|
||||||
|
|
12
config.js
12
config.js
|
@ -2,22 +2,22 @@ const fs = require('fs')
|
||||||
const path = require('path')
|
const path = require('path')
|
||||||
const homedir = require('os').homedir()
|
const homedir = require('os').homedir()
|
||||||
|
|
||||||
const configFilePath = homedir + path.sep + '.kafka-dance'
|
const configFilePath = homedir + path.sep + '.kafka-dance.json'
|
||||||
|
|
||||||
const emptyConfig = () => ({
|
const emptyConfig = () => ({
|
||||||
clusters: {}
|
clusters: {},
|
||||||
|
frontendUrl: 'http://localhost:5173',
|
||||||
})
|
})
|
||||||
|
|
||||||
const readConfig = () => {
|
const readConfig = () => {
|
||||||
if (!fs.existsSync(configFilePath)) {
|
if (fs.existsSync(configFilePath)) {
|
||||||
return emptyConfig()
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
return JSON.parse(fs.readFileSync(configFilePath).toString())
|
return JSON.parse(fs.readFileSync(configFilePath).toString())
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
console.error(e.toString())
|
console.error(e.toString())
|
||||||
return emptyConfig()
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
return emptyConfig()
|
||||||
}
|
}
|
||||||
|
|
||||||
const storeConfig = async config =>
|
const storeConfig = async config =>
|
||||||
|
|
93
query.js
93
query.js
|
@ -1,60 +1,55 @@
|
||||||
const buildMatcher = code => new Function('message', 'value', code)
|
const parseValue = message => {
|
||||||
|
|
||||||
const deepCopy = obj => JSON.parse(JSON.stringify(obj))
|
|
||||||
|
|
||||||
const realTimeMessageSearch = async ({ kafka, topic, onMessage, searchCode, immutable = true }) => {
|
|
||||||
const consumer = kafka.consumer({ groupId: 'sage-testing-group' })
|
|
||||||
|
|
||||||
console.log({ topic, onMessage, searchCode, immutable })
|
|
||||||
await consumer.connect()
|
|
||||||
await consumer.subscribe({ topic, fromBeginning: true })
|
|
||||||
const matcher = searchCode && buildMatcher(searchCode)
|
|
||||||
|
|
||||||
const messages = []
|
|
||||||
|
|
||||||
console.log('consumer.run')
|
|
||||||
await consumer.run({
|
|
||||||
autoCommit: false,
|
|
||||||
eachMessage: async ({ topic, partition, message }) => {
|
|
||||||
const s = message.value.toString()
|
const s = message.value.toString()
|
||||||
const value = JSON.parse(s)
|
try {
|
||||||
delete value.Data
|
return JSON.parse(s)
|
||||||
message.value = value
|
} catch (e) {
|
||||||
const workingMessage = immutable ? deepCopy(message) : message
|
console.error('Error deserializing message.value', e) // But keep the value as a string
|
||||||
if (!matcher || matcher(workingMessage, workingMessage.value)) {
|
return '' + s
|
||||||
onMessage(message)
|
|
||||||
messages.push(message)
|
|
||||||
//console.log(message)
|
|
||||||
}
|
}
|
||||||
},
|
}
|
||||||
})
|
|
||||||
|
const buildConsumer = async ({ kafka, topic, fromBeginning }) => {
|
||||||
|
// TODO Use a UUID/GUID instead
|
||||||
|
const consumer = kafka.consumer({ groupId: 'sage-testing-group' + Math.random() })
|
||||||
|
|
||||||
|
await consumer.connect()
|
||||||
|
await consumer.subscribe({ topic, fromBeginning })
|
||||||
|
|
||||||
for (let i = 0; i <= 5; i++) {
|
|
||||||
//console.log(`seek on partition ${i}`)
|
|
||||||
await consumer.seek({topic, partition: i, offset: 0})
|
|
||||||
}
|
|
||||||
console.log('return consumer')
|
|
||||||
return consumer
|
return consumer
|
||||||
}
|
}
|
||||||
|
|
||||||
const searchMessages = async ({ kafka, maxItems, topic, onDone, searchCode, onBatchDone, immutable = true }) => {
|
const seekToBeginning = async ({ topic, consumer, partitionCount }) => {
|
||||||
const consumer = kafka.consumer({ groupId: 'sage-testing-group' })
|
for (let i = 0; i <= partitionCount; i++) {
|
||||||
|
await consumer.seek({topic, partition: i, offset: 0})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
await consumer.connect()
|
const realTimeMessageSearch = async ({ kafka, topic, onMessage }) => {
|
||||||
await consumer.subscribe({ topic, fromBeginning: true })
|
// TODO Use a UUID/GUID instead
|
||||||
const matcher = searchCode && buildMatcher(searchCode)
|
const consumer = await buildConsumer({ kafka, topic, fromBeginning: true })
|
||||||
|
|
||||||
|
await consumer.run({
|
||||||
|
autoCommit: false,
|
||||||
|
eachMessage: async ({ topic, partition, message }) => {
|
||||||
|
message.value = parseValue(message.value)
|
||||||
|
onMessage(message)
|
||||||
|
},
|
||||||
|
})
|
||||||
|
|
||||||
|
// seekToBeginning({ topic, consumer, partitionCount: 5 })
|
||||||
|
|
||||||
|
return consumer
|
||||||
|
}
|
||||||
|
|
||||||
|
const searchMessages = async ({ kafka, maxItems, topic, onDone, onBatchDone }) => {
|
||||||
|
const consumer = await buildConsumer({ kafka, topic, fromBeginning: true })
|
||||||
|
|
||||||
const messages = []
|
const messages = []
|
||||||
|
|
||||||
await consumer.run({
|
await consumer.run({
|
||||||
autoCommit: false,
|
autoCommit: false,
|
||||||
eachMessage: async ({ topic, partition, message }) => {
|
eachMessage: async ({ topic, partition, message }) => {
|
||||||
const s = message.value.toString()
|
message.value = parseValue(message.value)
|
||||||
const value = JSON.parse(s)
|
|
||||||
delete value.Data
|
|
||||||
message.value = value
|
|
||||||
let workingMessage = immutable ? deepCopy(message) : message
|
|
||||||
if (!matcher || matcher(workingMessage, workingMessage.value)) {
|
|
||||||
messages.push(message)
|
messages.push(message)
|
||||||
if (messages.length < maxItems) {
|
if (messages.length < maxItems) {
|
||||||
onBatchDone(messages.length)
|
onBatchDone(messages.length)
|
||||||
|
@ -64,8 +59,6 @@ const searchMessages = async ({ kafka, maxItems, topic, onDone, searchCode, onBa
|
||||||
await consumer.disconnect()
|
await consumer.disconnect()
|
||||||
onDone(messages.slice(0, maxItems))
|
onDone(messages.slice(0, maxItems))
|
||||||
}
|
}
|
||||||
//console.log(message)
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -88,7 +81,6 @@ const searchMessages = async ({ kafka, maxItems, topic, onDone, searchCode, onBa
|
||||||
*/
|
*/
|
||||||
let consumedTopicPartitions = {}
|
let consumedTopicPartitions = {}
|
||||||
consumer.on(consumer.events.GROUP_JOIN, async ({ payload }) => {
|
consumer.on(consumer.events.GROUP_JOIN, async ({ payload }) => {
|
||||||
//console.log('GROUP_JOIN')
|
|
||||||
const { memberAssignment } = payload
|
const { memberAssignment } = payload
|
||||||
consumedTopicPartitions = Object.entries(memberAssignment).reduce(
|
consumedTopicPartitions = Object.entries(memberAssignment).reduce(
|
||||||
(topics, [topic, partitions]) => {
|
(topics, [topic, partitions]) => {
|
||||||
|
@ -101,6 +93,7 @@ const searchMessages = async ({ kafka, maxItems, topic, onDone, searchCode, onBa
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// This was part of the original code, but seems to cause a near-immediate exit
|
||||||
/*
|
/*
|
||||||
* This is extremely unergonomic, but if we are currently caught up to the head
|
* This is extremely unergonomic, but if we are currently caught up to the head
|
||||||
* of all topic-partitions, we won't actually get any batches, which means we'll
|
* of all topic-partitions, we won't actually get any batches, which means we'll
|
||||||
|
@ -114,7 +107,6 @@ const searchMessages = async ({ kafka, maxItems, topic, onDone, searchCode, onBa
|
||||||
//let processedBatch = true
|
//let processedBatch = true
|
||||||
consumer.on(consumer.events.FETCH_START, async () => {
|
consumer.on(consumer.events.FETCH_START, async () => {
|
||||||
//console.log('FETCH_START')
|
//console.log('FETCH_START')
|
||||||
// This was part of the original code, but seems to cause a near-immediate exit
|
|
||||||
//if (processedBatch === false) {
|
//if (processedBatch === false) {
|
||||||
// console.log('I DIE AT FETCH')
|
// console.log('I DIE AT FETCH')
|
||||||
// await consumer.disconnect()
|
// await consumer.disconnect()
|
||||||
|
@ -129,7 +121,6 @@ const searchMessages = async ({ kafka, maxItems, topic, onDone, searchCode, onBa
|
||||||
* and exit if all topic-partitions have been consumed,
|
* and exit if all topic-partitions have been consumed,
|
||||||
*/
|
*/
|
||||||
consumer.on(consumer.events.END_BATCH_PROCESS, async ({ payload }) => {
|
consumer.on(consumer.events.END_BATCH_PROCESS, async ({ payload }) => {
|
||||||
//console.log('END_BATCH_PROCESS')
|
|
||||||
const { topic, partition, offsetLag } = payload
|
const { topic, partition, offsetLag } = payload
|
||||||
consumedTopicPartitions[`${topic}-${partition}`] = offsetLag === '0'
|
consumedTopicPartitions[`${topic}-${partition}`] = offsetLag === '0'
|
||||||
|
|
||||||
|
@ -147,10 +138,6 @@ const searchMessages = async ({ kafka, maxItems, topic, onDone, searchCode, onBa
|
||||||
return consumer
|
return consumer
|
||||||
}
|
}
|
||||||
|
|
||||||
//run()
|
|
||||||
// .then(() => console.log('DONE'))
|
|
||||||
// .catch(e => console.error(`[example/consumer] ${e.message}`, e))
|
|
||||||
|
|
||||||
module.exports = {
|
module.exports = {
|
||||||
searchMessages,
|
searchMessages,
|
||||||
realTimeMessageSearch,
|
realTimeMessageSearch,
|
||||||
|
|
|
@ -4,7 +4,7 @@ const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2']
|
||||||
const disconnectAll = async (consumers, clusters) => {
|
const disconnectAll = async (consumers, clusters) => {
|
||||||
await Promise.all([
|
await Promise.all([
|
||||||
...Object.values(clusters).map(async cluster => cluster.admin.disconnect()),
|
...Object.values(clusters).map(async cluster => cluster.admin.disconnect()),
|
||||||
...Array.from(consumers.values()).map(async consumer => consumer.disconnect())
|
...Array.from(consumers.values()).map(async consumer => consumer.consumer.disconnect())
|
||||||
])
|
])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue