kafka-dance-api/query.js

144 lines
4.3 KiB
JavaScript
Raw Permalink Normal View History

const parseValue = message => {
const s = message.value.toString()
try {
return JSON.parse(s)
} catch (e) {
console.error('Error deserializing message.value', e) // But keep the value as a string
return '' + s
}
}
const buildConsumer = async ({ kafka, topic, fromBeginning }) => {
// TODO Use a UUID/GUID instead
const consumer = kafka.consumer({ groupId: 'sage-testing-group' + Math.random() })
await consumer.connect()
await consumer.subscribe({ topic, fromBeginning })
return consumer
}
const seekToBeginning = async ({ topic, consumer, partitionCount }) => {
for (let i = 0; i <= partitionCount; i++) {
await consumer.seek({topic, partition: i, offset: 0})
}
}
const realTimeMessageSearch = async ({ kafka, topic, onMessage }) => {
// TODO Use a UUID/GUID instead
const consumer = await buildConsumer({ kafka, topic, fromBeginning: true })
await consumer.run({
autoCommit: false,
eachMessage: async ({ topic, partition, message }) => {
message.value = parseValue(message.value)
onMessage(message)
},
})
// seekToBeginning({ topic, consumer, partitionCount: 5 })
return consumer
}
const searchMessages = async ({ kafka, maxItems, topic, onDone, onBatchDone }) => {
const consumer = await buildConsumer({ kafka, topic, fromBeginning: true })
const messages = []
await consumer.run({
autoCommit: false,
eachMessage: async ({ topic, partition, message }) => {
message.value = parseValue(message.value)
messages.push(message)
if (messages.length < maxItems) {
onBatchDone(messages.length)
} else {
onBatchDone(maxItems)
await consumer.stop()
await consumer.disconnect()
onDone(messages.slice(0, maxItems))
}
},
})
/*
* 1. We need to know which partitions we are assigned.
* 2. Which partitions have we consumed the last offset for
* 3. If all partitions have 0 lag, we exit.
*/
/*
* `consumedTopicPartitions` will be an object of all topic-partitions
* and a boolean indicating whether or not we have consumed all
* messages in that topic-partition. For example:
*
* {
* "topic-test-0": false,
* "topic-test-1": false,
* "topic-test-2": false
* }
*/
let consumedTopicPartitions = {}
consumer.on(consumer.events.GROUP_JOIN, async ({ payload }) => {
const { memberAssignment } = payload
consumedTopicPartitions = Object.entries(memberAssignment).reduce(
(topics, [topic, partitions]) => {
for (const partition in partitions) {
topics[`${topic}-${partition}`] = false
}
return topics
},
{}
)
})
// This was part of the original code, but seems to cause a near-immediate exit
/*
* This is extremely unergonomic, but if we are currently caught up to the head
* of all topic-partitions, we won't actually get any batches, which means we'll
* never find out that we are actually caught up. So as a workaround, what we can do
* is to check in `FETCH_START` if we have previously made a fetch without
* processing any batches in between. If so, it means that we received empty
* fetch responses, meaning there was no more data to fetch.
*
* We need to initially set this to true, or we would immediately exit.
*/
//let processedBatch = true
consumer.on(consumer.events.FETCH_START, async () => {
//console.log('FETCH_START')
//if (processedBatch === false) {
// console.log('I DIE AT FETCH')
// await consumer.disconnect()
// process.exit(0)
//}
//processedBatch = false
})
/*
* Now whenever we have finished processing a batch, we'll update `consumedTopicPartitions`
* and exit if all topic-partitions have been consumed,
*/
consumer.on(consumer.events.END_BATCH_PROCESS, async ({ payload }) => {
const { topic, partition, offsetLag } = payload
consumedTopicPartitions[`${topic}-${partition}`] = offsetLag === '0'
if (Object.values(consumedTopicPartitions).every(Boolean)) {
await consumer.stop()
await consumer.disconnect()
console.log(`I DIE AT BATCH with ${messages.length} messages`)
onDone(messages)
//process.exit(0)
}
//processedBatch = true
})
return consumer
}
module.exports = {
searchMessages,
realTimeMessageSearch,
}