diff --git a/app.js b/app.js index 248d418..ef394ae 100644 --- a/app.js +++ b/app.js @@ -13,14 +13,21 @@ const { buildEscape } = require('./safely-exit') const config = readConfig() console.log('CONFIG', config) -const c = object => { - console.log(object) - return object +/** + * Log out an object and return it. + * + * @param object The object to log + * @param f Optional param to specify an alternative log function. E.g. console.error + * @returns {*} + */ +const c = (object, f = console.log) => { + f(object) + return object; } const buildCluster = (clusterConfig, connect = true) => { const kafkaConfig = {...clusterConfig} - delete kafkaConfig.clusterName + delete kafkaConfig.clusterName // new Kafka() tries to use this same value const kafka = new Kafka(kafkaConfig) const admin = kafka.admin() @@ -102,10 +109,8 @@ router.post('/clusters/delete', async (req, res, _next) => { return } killConsumer(consumer) - socket.send(JSON.stringify({ - type: 'cluster_deleted', - message: clusterName, - })) + const sendDeletedMessage = buildSocketMessageSender({ type: 'cluster_deleted', socket }) + sendDeletedMessage(clusterName) }) delete clusters[clusterName] delete config.clusters[clusterName] @@ -122,52 +127,42 @@ router.put('/clusters', async (req, res, _next) => { config.clusters[clusterName] = req.body clusters[clusterName] = buildCluster(req.body) res.send('') - //res.send(getClusterData()) await storeConfig(config) }) app.use(router) -const realTimeSearch = async ({ kafka, searchCode, immutable, socket, topic }) => +const buildSocketMessageSender = ({ type, socket }) => message => { + socket.send(JSON.stringify({ + type, + message + })) +} + +const realTimeSearch = async ({ kafka, socket, topic }) => query.realTimeMessageSearch({ kafka, topic, - searchCode, - immutable, - onMessage: message => { - socket.send(JSON.stringify({ - type: 'message', - message - })) - } - }) + onMessage: buildSocketMessageSender({ socket, type: 'message' }) + }); -const search = async ({ kafka, searchCode, immutable, socket, topic, maxItems }) => +const oneShotSearch = async ({ kafka, socket, topic, maxItems }) => query.searchMessages({ kafka, topic, maxItems, - searchCode, - immutable, - onDone: messages => { - console.log(messages.length + ' messages') - socket.send(JSON.stringify({ - type: 'complete', - message: messages - })) - }, - onBatchDone: count => { - socket.send(JSON.stringify({ - type: 'item_count', - message: count - })) - } + onDone: buildSocketMessageSender({ socket, type: 'count' }), + onBatchDone: buildSocketMessageSender({ socket, type: 'item_count' }) }) +/** + * @type {WebSocketServer} + */ const wsServer = new ws.WebSocketServer({ noServer: true }) +/** @type {Map} */ const consumers = new Map() buildEscape(consumers, clusters) @@ -188,19 +183,21 @@ wsServer.on('connection', socket => { consumers.delete(socket) return } - const currentMode = message.mode === 'realTime' ? realTimeSearch : search + const startSearch = message.mode === 'realTime' ? realTimeSearch : oneShotSearch const cluster = clusters[message.cluster] - const run = async () => consumers.set(socket, { - consumer: await currentMode({ - kafka: cluster.kafka, - searchCode: message.searchCode, - immutable: message.immutable, - topic: message.topic, - maxItems: message.maxItems, - socket - }), - cluster - }) + + const run = async () => { + const consumerCluster = { + consumer: await startSearch({ + kafka: cluster.kafka, + topic: message.topic, + maxItems: message.maxItems, + socket + }), + cluster + } + consumers.set(socket, consumerCluster); + } run().catch(async e => { console.error('run() error occurred!', e.toString()) diff --git a/query.js b/query.js index 22abd2a..798e9f4 100644 --- a/query.js +++ b/query.js @@ -1,75 +1,63 @@ -const buildMatcher = code => new Function('message', 'value', code) +const parseValue = message => { + const s = message.value.toString() + try { + return JSON.parse(s) + } catch (e) { + console.error('Error deserializing message.value', e) // But keep the value as a string + return '' + s + } +} -const deepCopy = obj => JSON.parse(JSON.stringify(obj)) - -const realTimeMessageSearch = async ({ kafka, topic, onMessage, searchCode, immutable = true }) => { +const buildConsumer = async ({ kafka, topic, fromBeginning }) => { // TODO Use a UUID/GUID instead const consumer = kafka.consumer({ groupId: 'sage-testing-group' + Math.random() }) - console.log({ topic, onMessage, searchCode, immutable }) await consumer.connect() - await consumer.subscribe({ topic, fromBeginning: true }) - const matcher = searchCode && buildMatcher(searchCode) + await consumer.subscribe({ topic, fromBeginning }) - const messages = [] - - console.log('consumer.run') - await consumer.run({ - autoCommit: false, - eachMessage: async ({ topic, partition, message }) => { - const s = message.value.toString() - let value - try { - value = JSON.parse(s) - } catch (e) { - value = '' + s - } - message.value = value - const workingMessage = immutable ? deepCopy(message) : message - if (!matcher || matcher(workingMessage, workingMessage.value)) { - onMessage(message) - messages.push(message) - //console.log(message) - } - }, - }) - - for (let i = 0; i <= 5; i++) { - //console.log(`seek on partition ${i}`) - await consumer.seek({topic, partition: i, offset: 0}) - } - console.log('return consumer') return consumer } -const searchMessages = async ({ kafka, maxItems, topic, onDone, searchCode, onBatchDone, immutable = true }) => { - const consumer = kafka.consumer({ groupId: 'sage-testing-group' }) +const seekToBeginning = async ({ topic, consumer, partitionCount }) => { + for (let i = 0; i <= partitionCount; i++) { + await consumer.seek({topic, partition: i, offset: 0}) + } +} - await consumer.connect() - await consumer.subscribe({ topic, fromBeginning: true }) - const matcher = searchCode && buildMatcher(searchCode) +const realTimeMessageSearch = async ({ kafka, topic, onMessage }) => { + // TODO Use a UUID/GUID instead + const consumer = await buildConsumer({ kafka, topic, fromBeginning: true }) + + await consumer.run({ + autoCommit: false, + eachMessage: async ({ topic, partition, message }) => { + message.value = parseValue(message.value) + onMessage(message) + }, + }) + + // seekToBeginning({ topic, consumer, partitionCount: 5 }) + + return consumer +} + +const searchMessages = async ({ kafka, maxItems, topic, onDone, onBatchDone }) => { + const consumer = await buildConsumer({ kafka, topic, fromBeginning: true }) const messages = [] await consumer.run({ autoCommit: false, eachMessage: async ({ topic, partition, message }) => { - const s = message.value.toString() - const value = JSON.parse(s) - delete value.Data - message.value = value - let workingMessage = immutable ? deepCopy(message) : message - if (!matcher || matcher(workingMessage, workingMessage.value)) { - messages.push(message) - if (messages.length < maxItems) { - onBatchDone(messages.length) - } else { - onBatchDone(maxItems) - await consumer.stop() - await consumer.disconnect() - onDone(messages.slice(0, maxItems)) - } - //console.log(message) + message.value = parseValue(message.value) + messages.push(message) + if (messages.length < maxItems) { + onBatchDone(messages.length) + } else { + onBatchDone(maxItems) + await consumer.stop() + await consumer.disconnect() + onDone(messages.slice(0, maxItems)) } }, }) @@ -93,7 +81,6 @@ const searchMessages = async ({ kafka, maxItems, topic, onDone, searchCode, onBa */ let consumedTopicPartitions = {} consumer.on(consumer.events.GROUP_JOIN, async ({ payload }) => { - //console.log('GROUP_JOIN') const { memberAssignment } = payload consumedTopicPartitions = Object.entries(memberAssignment).reduce( (topics, [topic, partitions]) => { @@ -106,6 +93,7 @@ const searchMessages = async ({ kafka, maxItems, topic, onDone, searchCode, onBa ) }) + // This was part of the original code, but seems to cause a near-immediate exit /* * This is extremely unergonomic, but if we are currently caught up to the head * of all topic-partitions, we won't actually get any batches, which means we'll @@ -119,7 +107,6 @@ const searchMessages = async ({ kafka, maxItems, topic, onDone, searchCode, onBa //let processedBatch = true consumer.on(consumer.events.FETCH_START, async () => { //console.log('FETCH_START') - // This was part of the original code, but seems to cause a near-immediate exit //if (processedBatch === false) { // console.log('I DIE AT FETCH') // await consumer.disconnect() @@ -134,7 +121,6 @@ const searchMessages = async ({ kafka, maxItems, topic, onDone, searchCode, onBa * and exit if all topic-partitions have been consumed, */ consumer.on(consumer.events.END_BATCH_PROCESS, async ({ payload }) => { - //console.log('END_BATCH_PROCESS') const { topic, partition, offsetLag } = payload consumedTopicPartitions[`${topic}-${partition}`] = offsetLag === '0' @@ -152,10 +138,6 @@ const searchMessages = async ({ kafka, maxItems, topic, onDone, searchCode, onBa return consumer } -//run() -// .then(() => console.log('DONE')) -// .catch(e => console.error(`[example/consumer] ${e.message}`, e)) - module.exports = { searchMessages, realTimeMessageSearch,