Document a couple types. Enact more DRYness.
Remove filterFunc handling (passing that buck to the frontend for now).
This commit is contained in:
parent
bae8b2fc6b
commit
76ab13b385
91
app.js
91
app.js
|
@ -13,14 +13,21 @@ const { buildEscape } = require('./safely-exit')
|
|||
const config = readConfig()
|
||||
console.log('CONFIG', config)
|
||||
|
||||
const c = object => {
|
||||
console.log(object)
|
||||
return object
|
||||
/**
|
||||
* Log out an object and return it.
|
||||
*
|
||||
* @param object The object to log
|
||||
* @param f Optional param to specify an alternative log function. E.g. console.error
|
||||
* @returns {*}
|
||||
*/
|
||||
const c = (object, f = console.log) => {
|
||||
f(object)
|
||||
return object;
|
||||
}
|
||||
|
||||
const buildCluster = (clusterConfig, connect = true) => {
|
||||
const kafkaConfig = {...clusterConfig}
|
||||
delete kafkaConfig.clusterName
|
||||
delete kafkaConfig.clusterName // new Kafka() tries to use this same value
|
||||
|
||||
const kafka = new Kafka(kafkaConfig)
|
||||
const admin = kafka.admin()
|
||||
|
@ -102,10 +109,8 @@ router.post('/clusters/delete', async (req, res, _next) => {
|
|||
return
|
||||
}
|
||||
killConsumer(consumer)
|
||||
socket.send(JSON.stringify({
|
||||
type: 'cluster_deleted',
|
||||
message: clusterName,
|
||||
}))
|
||||
const sendDeletedMessage = buildSocketMessageSender({ type: 'cluster_deleted', socket })
|
||||
sendDeletedMessage(clusterName)
|
||||
})
|
||||
delete clusters[clusterName]
|
||||
delete config.clusters[clusterName]
|
||||
|
@ -122,52 +127,42 @@ router.put('/clusters', async (req, res, _next) => {
|
|||
config.clusters[clusterName] = req.body
|
||||
clusters[clusterName] = buildCluster(req.body)
|
||||
res.send('')
|
||||
//res.send(getClusterData())
|
||||
await storeConfig(config)
|
||||
})
|
||||
|
||||
app.use(router)
|
||||
|
||||
const realTimeSearch = async ({ kafka, searchCode, immutable, socket, topic }) =>
|
||||
const buildSocketMessageSender = ({ type, socket }) => message => {
|
||||
socket.send(JSON.stringify({
|
||||
type,
|
||||
message
|
||||
}))
|
||||
}
|
||||
|
||||
const realTimeSearch = async ({ kafka, socket, topic }) =>
|
||||
query.realTimeMessageSearch({
|
||||
kafka,
|
||||
topic,
|
||||
searchCode,
|
||||
immutable,
|
||||
onMessage: message => {
|
||||
socket.send(JSON.stringify({
|
||||
type: 'message',
|
||||
message
|
||||
}))
|
||||
}
|
||||
})
|
||||
onMessage: buildSocketMessageSender({ socket, type: 'message' })
|
||||
});
|
||||
|
||||
const search = async ({ kafka, searchCode, immutable, socket, topic, maxItems }) =>
|
||||
const oneShotSearch = async ({ kafka, socket, topic, maxItems }) =>
|
||||
query.searchMessages({
|
||||
kafka,
|
||||
topic,
|
||||
maxItems,
|
||||
searchCode,
|
||||
immutable,
|
||||
onDone: messages => {
|
||||
console.log(messages.length + ' messages')
|
||||
socket.send(JSON.stringify({
|
||||
type: 'complete',
|
||||
message: messages
|
||||
}))
|
||||
},
|
||||
onBatchDone: count => {
|
||||
socket.send(JSON.stringify({
|
||||
type: 'item_count',
|
||||
message: count
|
||||
}))
|
||||
}
|
||||
onDone: buildSocketMessageSender({ socket, type: 'count' }),
|
||||
onBatchDone: buildSocketMessageSender({ socket, type: 'item_count' })
|
||||
})
|
||||
|
||||
/**
|
||||
* @type {WebSocketServer}
|
||||
*/
|
||||
const wsServer = new ws.WebSocketServer({
|
||||
noServer: true
|
||||
})
|
||||
|
||||
/** @type {Map<WebSocket, ({consumer: Consumer, cluster: { kafka, admin, config }})>} */
|
||||
const consumers = new Map()
|
||||
|
||||
buildEscape(consumers, clusters)
|
||||
|
@ -188,19 +183,21 @@ wsServer.on('connection', socket => {
|
|||
consumers.delete(socket)
|
||||
return
|
||||
}
|
||||
const currentMode = message.mode === 'realTime' ? realTimeSearch : search
|
||||
const startSearch = message.mode === 'realTime' ? realTimeSearch : oneShotSearch
|
||||
const cluster = clusters[message.cluster]
|
||||
const run = async () => consumers.set(socket, {
|
||||
consumer: await currentMode({
|
||||
kafka: cluster.kafka,
|
||||
searchCode: message.searchCode,
|
||||
immutable: message.immutable,
|
||||
topic: message.topic,
|
||||
maxItems: message.maxItems,
|
||||
socket
|
||||
}),
|
||||
cluster
|
||||
})
|
||||
|
||||
const run = async () => {
|
||||
const consumerCluster = {
|
||||
consumer: await startSearch({
|
||||
kafka: cluster.kafka,
|
||||
topic: message.topic,
|
||||
maxItems: message.maxItems,
|
||||
socket
|
||||
}),
|
||||
cluster
|
||||
}
|
||||
consumers.set(socket, consumerCluster);
|
||||
}
|
||||
|
||||
run().catch(async e => {
|
||||
console.error('run() error occurred!', e.toString())
|
||||
|
|
108
query.js
108
query.js
|
@ -1,75 +1,63 @@
|
|||
const buildMatcher = code => new Function('message', 'value', code)
|
||||
const parseValue = message => {
|
||||
const s = message.value.toString()
|
||||
try {
|
||||
return JSON.parse(s)
|
||||
} catch (e) {
|
||||
console.error('Error deserializing message.value', e) // But keep the value as a string
|
||||
return '' + s
|
||||
}
|
||||
}
|
||||
|
||||
const deepCopy = obj => JSON.parse(JSON.stringify(obj))
|
||||
|
||||
const realTimeMessageSearch = async ({ kafka, topic, onMessage, searchCode, immutable = true }) => {
|
||||
const buildConsumer = async ({ kafka, topic, fromBeginning }) => {
|
||||
// TODO Use a UUID/GUID instead
|
||||
const consumer = kafka.consumer({ groupId: 'sage-testing-group' + Math.random() })
|
||||
|
||||
console.log({ topic, onMessage, searchCode, immutable })
|
||||
await consumer.connect()
|
||||
await consumer.subscribe({ topic, fromBeginning: true })
|
||||
const matcher = searchCode && buildMatcher(searchCode)
|
||||
await consumer.subscribe({ topic, fromBeginning })
|
||||
|
||||
const messages = []
|
||||
|
||||
console.log('consumer.run')
|
||||
await consumer.run({
|
||||
autoCommit: false,
|
||||
eachMessage: async ({ topic, partition, message }) => {
|
||||
const s = message.value.toString()
|
||||
let value
|
||||
try {
|
||||
value = JSON.parse(s)
|
||||
} catch (e) {
|
||||
value = '' + s
|
||||
}
|
||||
message.value = value
|
||||
const workingMessage = immutable ? deepCopy(message) : message
|
||||
if (!matcher || matcher(workingMessage, workingMessage.value)) {
|
||||
onMessage(message)
|
||||
messages.push(message)
|
||||
//console.log(message)
|
||||
}
|
||||
},
|
||||
})
|
||||
|
||||
for (let i = 0; i <= 5; i++) {
|
||||
//console.log(`seek on partition ${i}`)
|
||||
await consumer.seek({topic, partition: i, offset: 0})
|
||||
}
|
||||
console.log('return consumer')
|
||||
return consumer
|
||||
}
|
||||
|
||||
const searchMessages = async ({ kafka, maxItems, topic, onDone, searchCode, onBatchDone, immutable = true }) => {
|
||||
const consumer = kafka.consumer({ groupId: 'sage-testing-group' })
|
||||
const seekToBeginning = async ({ topic, consumer, partitionCount }) => {
|
||||
for (let i = 0; i <= partitionCount; i++) {
|
||||
await consumer.seek({topic, partition: i, offset: 0})
|
||||
}
|
||||
}
|
||||
|
||||
await consumer.connect()
|
||||
await consumer.subscribe({ topic, fromBeginning: true })
|
||||
const matcher = searchCode && buildMatcher(searchCode)
|
||||
const realTimeMessageSearch = async ({ kafka, topic, onMessage }) => {
|
||||
// TODO Use a UUID/GUID instead
|
||||
const consumer = await buildConsumer({ kafka, topic, fromBeginning: true })
|
||||
|
||||
await consumer.run({
|
||||
autoCommit: false,
|
||||
eachMessage: async ({ topic, partition, message }) => {
|
||||
message.value = parseValue(message.value)
|
||||
onMessage(message)
|
||||
},
|
||||
})
|
||||
|
||||
// seekToBeginning({ topic, consumer, partitionCount: 5 })
|
||||
|
||||
return consumer
|
||||
}
|
||||
|
||||
const searchMessages = async ({ kafka, maxItems, topic, onDone, onBatchDone }) => {
|
||||
const consumer = await buildConsumer({ kafka, topic, fromBeginning: true })
|
||||
|
||||
const messages = []
|
||||
|
||||
await consumer.run({
|
||||
autoCommit: false,
|
||||
eachMessage: async ({ topic, partition, message }) => {
|
||||
const s = message.value.toString()
|
||||
const value = JSON.parse(s)
|
||||
delete value.Data
|
||||
message.value = value
|
||||
let workingMessage = immutable ? deepCopy(message) : message
|
||||
if (!matcher || matcher(workingMessage, workingMessage.value)) {
|
||||
messages.push(message)
|
||||
if (messages.length < maxItems) {
|
||||
onBatchDone(messages.length)
|
||||
} else {
|
||||
onBatchDone(maxItems)
|
||||
await consumer.stop()
|
||||
await consumer.disconnect()
|
||||
onDone(messages.slice(0, maxItems))
|
||||
}
|
||||
//console.log(message)
|
||||
message.value = parseValue(message.value)
|
||||
messages.push(message)
|
||||
if (messages.length < maxItems) {
|
||||
onBatchDone(messages.length)
|
||||
} else {
|
||||
onBatchDone(maxItems)
|
||||
await consumer.stop()
|
||||
await consumer.disconnect()
|
||||
onDone(messages.slice(0, maxItems))
|
||||
}
|
||||
},
|
||||
})
|
||||
|
@ -93,7 +81,6 @@ const searchMessages = async ({ kafka, maxItems, topic, onDone, searchCode, onBa
|
|||
*/
|
||||
let consumedTopicPartitions = {}
|
||||
consumer.on(consumer.events.GROUP_JOIN, async ({ payload }) => {
|
||||
//console.log('GROUP_JOIN')
|
||||
const { memberAssignment } = payload
|
||||
consumedTopicPartitions = Object.entries(memberAssignment).reduce(
|
||||
(topics, [topic, partitions]) => {
|
||||
|
@ -106,6 +93,7 @@ const searchMessages = async ({ kafka, maxItems, topic, onDone, searchCode, onBa
|
|||
)
|
||||
})
|
||||
|
||||
// This was part of the original code, but seems to cause a near-immediate exit
|
||||
/*
|
||||
* This is extremely unergonomic, but if we are currently caught up to the head
|
||||
* of all topic-partitions, we won't actually get any batches, which means we'll
|
||||
|
@ -119,7 +107,6 @@ const searchMessages = async ({ kafka, maxItems, topic, onDone, searchCode, onBa
|
|||
//let processedBatch = true
|
||||
consumer.on(consumer.events.FETCH_START, async () => {
|
||||
//console.log('FETCH_START')
|
||||
// This was part of the original code, but seems to cause a near-immediate exit
|
||||
//if (processedBatch === false) {
|
||||
// console.log('I DIE AT FETCH')
|
||||
// await consumer.disconnect()
|
||||
|
@ -134,7 +121,6 @@ const searchMessages = async ({ kafka, maxItems, topic, onDone, searchCode, onBa
|
|||
* and exit if all topic-partitions have been consumed,
|
||||
*/
|
||||
consumer.on(consumer.events.END_BATCH_PROCESS, async ({ payload }) => {
|
||||
//console.log('END_BATCH_PROCESS')
|
||||
const { topic, partition, offsetLag } = payload
|
||||
consumedTopicPartitions[`${topic}-${partition}`] = offsetLag === '0'
|
||||
|
||||
|
@ -152,10 +138,6 @@ const searchMessages = async ({ kafka, maxItems, topic, onDone, searchCode, onBa
|
|||
return consumer
|
||||
}
|
||||
|
||||
//run()
|
||||
// .then(() => console.log('DONE'))
|
||||
// .catch(e => console.error(`[example/consumer] ${e.message}`, e))
|
||||
|
||||
module.exports = {
|
||||
searchMessages,
|
||||
realTimeMessageSearch,
|
||||
|
|
Loading…
Reference in New Issue