const buildMatcher = code => new Function('message', 'value', code) const deepCopy = obj => JSON.parse(JSON.stringify(obj)) const realTimeMessageSearch = async ({ kafka, topic, onMessage, searchCode, immutable = true }) => { const consumer = kafka.consumer({ groupId: 'sage-testing-group' }) console.log({ topic, onMessage, searchCode, immutable }) await consumer.connect() await consumer.subscribe({ topic, fromBeginning: true }) const matcher = searchCode && buildMatcher(searchCode) const messages = [] console.log('consumer.run') await consumer.run({ autoCommit: false, eachMessage: async ({ topic, partition, message }) => { const s = message.value.toString() let value try { value = JSON.parse(s) } catch (e) { value = '' + s } message.value = value const workingMessage = immutable ? deepCopy(message) : message if (!matcher || matcher(workingMessage, workingMessage.value)) { onMessage(message) messages.push(message) //console.log(message) } }, }) for (let i = 0; i <= 5; i++) { //console.log(`seek on partition ${i}`) await consumer.seek({topic, partition: i, offset: 0}) } console.log('return consumer') return consumer } const searchMessages = async ({ kafka, maxItems, topic, onDone, searchCode, onBatchDone, immutable = true }) => { const consumer = kafka.consumer({ groupId: 'sage-testing-group' }) await consumer.connect() await consumer.subscribe({ topic, fromBeginning: true }) const matcher = searchCode && buildMatcher(searchCode) const messages = [] await consumer.run({ autoCommit: false, eachMessage: async ({ topic, partition, message }) => { const s = message.value.toString() const value = JSON.parse(s) delete value.Data message.value = value let workingMessage = immutable ? deepCopy(message) : message if (!matcher || matcher(workingMessage, workingMessage.value)) { messages.push(message) if (messages.length < maxItems) { onBatchDone(messages.length) } else { onBatchDone(maxItems) await consumer.stop() await consumer.disconnect() onDone(messages.slice(0, maxItems)) } //console.log(message) } }, }) /* * 1. We need to know which partitions we are assigned. * 2. Which partitions have we consumed the last offset for * 3. If all partitions have 0 lag, we exit. */ /* * `consumedTopicPartitions` will be an object of all topic-partitions * and a boolean indicating whether or not we have consumed all * messages in that topic-partition. For example: * * { * "topic-test-0": false, * "topic-test-1": false, * "topic-test-2": false * } */ let consumedTopicPartitions = {} consumer.on(consumer.events.GROUP_JOIN, async ({ payload }) => { //console.log('GROUP_JOIN') const { memberAssignment } = payload consumedTopicPartitions = Object.entries(memberAssignment).reduce( (topics, [topic, partitions]) => { for (const partition in partitions) { topics[`${topic}-${partition}`] = false } return topics }, {} ) }) /* * This is extremely unergonomic, but if we are currently caught up to the head * of all topic-partitions, we won't actually get any batches, which means we'll * never find out that we are actually caught up. So as a workaround, what we can do * is to check in `FETCH_START` if we have previously made a fetch without * processing any batches in between. If so, it means that we received empty * fetch responses, meaning there was no more data to fetch. * * We need to initially set this to true, or we would immediately exit. */ //let processedBatch = true consumer.on(consumer.events.FETCH_START, async () => { //console.log('FETCH_START') // This was part of the original code, but seems to cause a near-immediate exit //if (processedBatch === false) { // console.log('I DIE AT FETCH') // await consumer.disconnect() // process.exit(0) //} //processedBatch = false }) /* * Now whenever we have finished processing a batch, we'll update `consumedTopicPartitions` * and exit if all topic-partitions have been consumed, */ consumer.on(consumer.events.END_BATCH_PROCESS, async ({ payload }) => { //console.log('END_BATCH_PROCESS') const { topic, partition, offsetLag } = payload consumedTopicPartitions[`${topic}-${partition}`] = offsetLag === '0' if (Object.values(consumedTopicPartitions).every(Boolean)) { await consumer.stop() await consumer.disconnect() console.log(`I DIE AT BATCH with ${messages.length} messages`) onDone(messages) //process.exit(0) } //processedBatch = true }) return consumer } //run() // .then(() => console.log('DONE')) // .catch(e => console.error(`[example/consumer] ${e.message}`, e)) module.exports = { searchMessages, realTimeMessageSearch, }