diff --git a/app.js b/app.js index a37e64e..e85e939 100644 --- a/app.js +++ b/app.js @@ -68,7 +68,9 @@ const getClusterData = () => Object.fromEntries( Object.entries(clusters).map(([key, value]) => { value = JSON.parse(JSON.stringify(value)) - value.config.sasl.password = passwordPlaceholder + if (value.config.sasl?.password) { + value.config.sasl.password = passwordPlaceholder + } return [key, value.config] })) @@ -158,6 +160,7 @@ const consumers = new Map() buildEscape(consumers, clusters) wsServer.on('connection', socket => { + socket.send('CONNECTED') socket.on('close', async () => { await killConsumer(consumers.get(socket)) consumers.delete(socket) diff --git a/query.js b/query.js index 1263f8e..a6d0afd 100644 --- a/query.js +++ b/query.js @@ -17,8 +17,12 @@ const realTimeMessageSearch = async ({ kafka, topic, onMessage, searchCode, immu autoCommit: false, eachMessage: async ({ topic, partition, message }) => { const s = message.value.toString() - const value = JSON.parse(s) - delete value.Data + let value + try { + value = JSON.parse(s) + } catch (e) { + value = '' + s + } message.value = value const workingMessage = immutable ? deepCopy(message) : message if (!matcher || matcher(workingMessage, workingMessage.value)) {