const parseValue = message => { const s = message.value.toString() try { return JSON.parse(s) } catch (e) { console.error('Error deserializing message.value', e) // But keep the value as a string return '' + s } } const buildConsumer = async ({ kafka, topic, fromBeginning }) => { // TODO Use a UUID/GUID instead const consumer = kafka.consumer({ groupId: 'sage-testing-group' + Math.random() }) await consumer.connect() await consumer.subscribe({ topic, fromBeginning }) return consumer } const seekToBeginning = async ({ topic, consumer, partitionCount }) => { for (let i = 0; i <= partitionCount; i++) { await consumer.seek({topic, partition: i, offset: 0}) } } const realTimeMessageSearch = async ({ kafka, topic, onMessage }) => { // TODO Use a UUID/GUID instead const consumer = await buildConsumer({ kafka, topic, fromBeginning: true }) await consumer.run({ autoCommit: false, eachMessage: async ({ topic, partition, message }) => { message.value = parseValue(message.value) onMessage(message) }, }) // seekToBeginning({ topic, consumer, partitionCount: 5 }) return consumer } const searchMessages = async ({ kafka, maxItems, topic, onDone, onBatchDone }) => { const consumer = await buildConsumer({ kafka, topic, fromBeginning: true }) const messages = [] await consumer.run({ autoCommit: false, eachMessage: async ({ topic, partition, message }) => { message.value = parseValue(message.value) messages.push(message) if (messages.length < maxItems) { onBatchDone(messages.length) } else { onBatchDone(maxItems) await consumer.stop() await consumer.disconnect() onDone(messages.slice(0, maxItems)) } }, }) /* * 1. We need to know which partitions we are assigned. * 2. Which partitions have we consumed the last offset for * 3. If all partitions have 0 lag, we exit. */ /* * `consumedTopicPartitions` will be an object of all topic-partitions * and a boolean indicating whether or not we have consumed all * messages in that topic-partition. For example: * * { * "topic-test-0": false, * "topic-test-1": false, * "topic-test-2": false * } */ let consumedTopicPartitions = {} consumer.on(consumer.events.GROUP_JOIN, async ({ payload }) => { const { memberAssignment } = payload consumedTopicPartitions = Object.entries(memberAssignment).reduce( (topics, [topic, partitions]) => { for (const partition in partitions) { topics[`${topic}-${partition}`] = false } return topics }, {} ) }) // This was part of the original code, but seems to cause a near-immediate exit /* * This is extremely unergonomic, but if we are currently caught up to the head * of all topic-partitions, we won't actually get any batches, which means we'll * never find out that we are actually caught up. So as a workaround, what we can do * is to check in `FETCH_START` if we have previously made a fetch without * processing any batches in between. If so, it means that we received empty * fetch responses, meaning there was no more data to fetch. * * We need to initially set this to true, or we would immediately exit. */ //let processedBatch = true consumer.on(consumer.events.FETCH_START, async () => { //console.log('FETCH_START') //if (processedBatch === false) { // console.log('I DIE AT FETCH') // await consumer.disconnect() // process.exit(0) //} //processedBatch = false }) /* * Now whenever we have finished processing a batch, we'll update `consumedTopicPartitions` * and exit if all topic-partitions have been consumed, */ consumer.on(consumer.events.END_BATCH_PROCESS, async ({ payload }) => { const { topic, partition, offsetLag } = payload consumedTopicPartitions[`${topic}-${partition}`] = offsetLag === '0' if (Object.values(consumedTopicPartitions).every(Boolean)) { await consumer.stop() await consumer.disconnect() console.log(`I DIE AT BATCH with ${messages.length} messages`) onDone(messages) //process.exit(0) } //processedBatch = true }) return consumer } module.exports = { searchMessages, realTimeMessageSearch, }