kafka-dance-api/query.js

162 lines
5.0 KiB
JavaScript
Raw Normal View History

const buildMatcher = code => new Function('message', 'value', code)
const deepCopy = obj => JSON.parse(JSON.stringify(obj))
const realTimeMessageSearch = async ({ kafka, topic, onMessage, searchCode, immutable = true }) => {
// TODO Use a UUID/GUID instead
const consumer = kafka.consumer({ groupId: 'sage-testing-group' + Math.random() })
console.log({ topic, onMessage, searchCode, immutable })
await consumer.connect()
await consumer.subscribe({ topic, fromBeginning: true })
const matcher = searchCode && buildMatcher(searchCode)
const messages = []
console.log('consumer.run')
await consumer.run({
autoCommit: false,
eachMessage: async ({ topic, partition, message }) => {
const s = message.value.toString()
let value
try {
value = JSON.parse(s)
} catch (e) {
value = '' + s
}
message.value = value
const workingMessage = immutable ? deepCopy(message) : message
if (!matcher || matcher(workingMessage, workingMessage.value)) {
onMessage(message)
messages.push(message)
//console.log(message)
}
},
})
for (let i = 0; i <= 5; i++) {
//console.log(`seek on partition ${i}`)
await consumer.seek({topic, partition: i, offset: 0})
}
console.log('return consumer')
return consumer
}
const searchMessages = async ({ kafka, maxItems, topic, onDone, searchCode, onBatchDone, immutable = true }) => {
const consumer = kafka.consumer({ groupId: 'sage-testing-group' })
await consumer.connect()
await consumer.subscribe({ topic, fromBeginning: true })
const matcher = searchCode && buildMatcher(searchCode)
const messages = []
await consumer.run({
autoCommit: false,
eachMessage: async ({ topic, partition, message }) => {
const s = message.value.toString()
const value = JSON.parse(s)
delete value.Data
message.value = value
let workingMessage = immutable ? deepCopy(message) : message
if (!matcher || matcher(workingMessage, workingMessage.value)) {
messages.push(message)
if (messages.length < maxItems) {
onBatchDone(messages.length)
} else {
onBatchDone(maxItems)
await consumer.stop()
await consumer.disconnect()
onDone(messages.slice(0, maxItems))
}
//console.log(message)
}
},
})
/*
* 1. We need to know which partitions we are assigned.
* 2. Which partitions have we consumed the last offset for
* 3. If all partitions have 0 lag, we exit.
*/
/*
* `consumedTopicPartitions` will be an object of all topic-partitions
* and a boolean indicating whether or not we have consumed all
* messages in that topic-partition. For example:
*
* {
* "topic-test-0": false,
* "topic-test-1": false,
* "topic-test-2": false
* }
*/
let consumedTopicPartitions = {}
consumer.on(consumer.events.GROUP_JOIN, async ({ payload }) => {
//console.log('GROUP_JOIN')
const { memberAssignment } = payload
consumedTopicPartitions = Object.entries(memberAssignment).reduce(
(topics, [topic, partitions]) => {
for (const partition in partitions) {
topics[`${topic}-${partition}`] = false
}
return topics
},
{}
)
})
/*
* This is extremely unergonomic, but if we are currently caught up to the head
* of all topic-partitions, we won't actually get any batches, which means we'll
* never find out that we are actually caught up. So as a workaround, what we can do
* is to check in `FETCH_START` if we have previously made a fetch without
* processing any batches in between. If so, it means that we received empty
* fetch responses, meaning there was no more data to fetch.
*
* We need to initially set this to true, or we would immediately exit.
*/
//let processedBatch = true
consumer.on(consumer.events.FETCH_START, async () => {
//console.log('FETCH_START')
// This was part of the original code, but seems to cause a near-immediate exit
//if (processedBatch === false) {
// console.log('I DIE AT FETCH')
// await consumer.disconnect()
// process.exit(0)
//}
//processedBatch = false
})
/*
* Now whenever we have finished processing a batch, we'll update `consumedTopicPartitions`
* and exit if all topic-partitions have been consumed,
*/
consumer.on(consumer.events.END_BATCH_PROCESS, async ({ payload }) => {
//console.log('END_BATCH_PROCESS')
const { topic, partition, offsetLag } = payload
consumedTopicPartitions[`${topic}-${partition}`] = offsetLag === '0'
if (Object.values(consumedTopicPartitions).every(Boolean)) {
await consumer.stop()
await consumer.disconnect()
console.log(`I DIE AT BATCH with ${messages.length} messages`)
onDone(messages)
//process.exit(0)
}
//processedBatch = true
})
return consumer
}
//run()
// .then(() => console.log('DONE'))
// .catch(e => console.error(`[example/consumer] ${e.message}`, e))
module.exports = {
searchMessages,
realTimeMessageSearch,
}