Compare commits

..

10 Commits

Author SHA1 Message Date
Sage Vaillancourt 1afe267866 Add .json extension to config file.
A tiny bit of refactoring.
2022-09-08 22:50:33 -04:00
Sage Vaillancourt 75e81e7956 Add frontend URL to config. 2022-09-08 21:51:10 -04:00
Sage Vaillancourt 76ab13b385 Document a couple types. Enact more DRYness.
Remove filterFunc handling (passing that buck to the frontend for now).
2022-09-08 21:22:10 -04:00
Sage Vaillancourt bae8b2fc6b Merge branch 'main' of https://gitlab.com/kafka-dance/kafka-dance-api 2022-09-08 21:21:23 -04:00
Sage Vaillancourt 25c587e622 Kill connected consumers of a cluster when it is deleted. 2022-08-29 07:35:19 -04:00
Sage Vaillancourt e6dbc34e14 Kill connected consumers of a cluster when it is deleted. 2022-08-29 07:33:25 -04:00
Sage Vaillancourt 3f98892ddd Merge branch 'main' of https://gitlab.com/kafka-dance/kafka-dance-api into main 2022-08-26 18:51:41 -04:00
Sage Vaillancourt cf21c56231 A bit more error handling.
Seeing if we can consistently inform the frontend about request errors.
Also, trying to outright crash with slightly less frequency.
2022-08-26 18:50:02 -04:00
Sage Vaillancourt ac2d3504bd Add kill message handling.
Remove 'Loading' message.
2022-08-23 22:27:08 -04:00
Sage Vaillancourt 61b5da846a Make slightly more flexible about configs and data 2022-08-23 16:47:29 -04:00
4 changed files with 143 additions and 138 deletions

138
app.js
View File

@ -13,23 +13,31 @@ const { buildEscape } = require('./safely-exit')
const config = readConfig() const config = readConfig()
console.log('CONFIG', config) console.log('CONFIG', config)
const c = object => { /**
console.log(object) * Log out an object and return it.
return object *
* @param object The object to log
* @param f Optional param to specify an alternative log function. E.g. console.error
* @returns {*}
*/
const c = (object, f = console.log) => {
f(object)
return object;
} }
const buildCluster = (clusterConfig, connect = true) => { const buildCluster = (clusterConfig, connect = true) => {
const kafkaConfig = {...clusterConfig} const kafkaConfig = {...clusterConfig}
delete kafkaConfig.clusterName delete kafkaConfig.clusterName // new Kafka() tries to use this same value
const kafka = new Kafka(kafkaConfig) const kafka = new Kafka(kafkaConfig)
const admin = kafka.admin() const admin = kafka.admin()
admin.connect().catch(console.error) const cluster = {
return c({
kafka, kafka,
admin, admin,
config: clusterConfig config: clusterConfig
}) }
admin.connect().catch(e => console.error(cluster.error = e.toString()))
return cluster
} }
const clusters = const clusters =
@ -40,7 +48,7 @@ console.log('CLUSTERS', clusters)
const app = express(); const app = express();
app.use(cors({ app.use(cors({
origin: 'http://localhost:5173' origin: config.frontendUrl,
})) }))
app.use(logger('dev')); app.use(logger('dev'));
app.use(express.json()); app.use(express.json());
@ -50,29 +58,45 @@ app.use(express.static(path.join(__dirname, 'public')));
const router = express.Router() const router = express.Router()
process.on('unhandledRejection', console.error)
/* GET topics listing. */ /* GET topics listing. */
router.get('/topics/:cluster', async (req, res, _next) => { router.get('/topics/:cluster', async (req, res, _next) => {
const legalName = topicName => !topicName.startsWith("__") const legalName = topicName => !topicName.startsWith("__")
const topicList = await clusters[req.params.cluster]?.admin.listTopics() try {
if (topicList) { const topicList = (await clusters[req.params.cluster]?.admin.listTopics() || [])
res.send(topicList.filter(legalName)) res.send(topicList.filter(legalName))
} else { } catch (e) {
res.send([]) res.status(502).send({
//res.status(400).send({error: 'Invalid cluster name'}) error: `Could not connect to cluster '${req.params.cluster}'`,
errorDetails: e.toString()
})
} }
}) })
const buildSocketMessageSender = ({ type, socket }) => message => {
socket.send(JSON.stringify({
type,
message
}))
}
const deepCopy = object => JSON.parse(JSON.stringify(object))
const passwordPlaceholder = 'XXXXXXXXXXXXXXXXXXXXXXXXXXXX' const passwordPlaceholder = 'XXXXXXXXXXXXXXXXXXXXXXXXXXXX'
const getClusterData = () => const getClusterData = () =>
Object.fromEntries( Object.fromEntries(
Object.entries(clusters).map(([key, value]) => { Object.entries(clusters).map(([key, cluster]) => {
value = JSON.parse(JSON.stringify(value)) cluster = deepCopy(cluster)
value.config.sasl.password = passwordPlaceholder if (cluster.config.sasl?.password) {
return [key, value.config] cluster.config.sasl.password = passwordPlaceholder
}
return [key, cluster.config]
})) }))
router.get('/clusters', async (req, res, _next) => { router.get('/clusters', async (req, res, _next) => {
console.log('/clusters')
res.send(getClusterData()) res.send(getClusterData())
}) })
@ -86,9 +110,17 @@ router.post('/clusters', async (req, res, _next) => {
}) })
router.post('/clusters/delete', async (req, res, _next) => { router.post('/clusters/delete', async (req, res, _next) => {
console.log('/clusters/delete post body', req.body)
const clusterName = req.body.clusterName const clusterName = req.body.clusterName
// TODO: Disconnect // Kill all consumers connected to this cluster and notify those clients via their websocket
consumers.forEach(([{ consumer, cluster }, socket]) => {
if (clusters[clusterName] !== cluster) {
// TODO This reference equality may not be sufficient?
return
}
killConsumer(consumer)
const sendDeletedMessage = buildSocketMessageSender({ type: 'cluster_deleted', socket })
sendDeletedMessage(clusterName)
})
delete clusters[clusterName] delete clusters[clusterName]
delete config.clusters[clusterName] delete config.clusters[clusterName]
await storeConfig(config) await storeConfig(config)
@ -96,10 +128,7 @@ router.post('/clusters/delete', async (req, res, _next) => {
}) })
router.put('/clusters', async (req, res, _next) => { router.put('/clusters', async (req, res, _next) => {
console.log('/clusters put body', req.body)
const hasPlaceholderPassword = req.body.sasl.password === passwordPlaceholder const hasPlaceholderPassword = req.body.sasl.password === passwordPlaceholder
console.log('Has placeholder password:', hasPlaceholderPassword)
const clusterName = req.body.clusterName const clusterName = req.body.clusterName
if (hasPlaceholderPassword) { if (hasPlaceholderPassword) {
req.body.password = config.clusters[clusterName].password req.body.password = config.clusters[clusterName].password
@ -107,83 +136,72 @@ router.put('/clusters', async (req, res, _next) => {
config.clusters[clusterName] = req.body config.clusters[clusterName] = req.body
clusters[clusterName] = buildCluster(req.body) clusters[clusterName] = buildCluster(req.body)
res.send('') res.send('')
//res.send(getClusterData())
await storeConfig(config) await storeConfig(config)
}) })
app.use(router) app.use(router)
const realTimeSearch = async ({ kafka, searchCode, immutable, socket, topic }) => const realTimeSearch = async ({ kafka, socket, topic }) =>
query.realTimeMessageSearch({ query.realTimeMessageSearch({
kafka, kafka,
topic, topic,
searchCode, onMessage: buildSocketMessageSender({ socket, type: 'message' })
immutable, });
onMessage: message => {
socket.send(JSON.stringify({
type: 'message',
message
}))
}
})
const search = async ({ kafka, searchCode, immutable, socket, topic, maxItems }) => const oneShotSearch = async ({ kafka, socket, topic, maxItems }) =>
query.searchMessages({ query.searchMessages({
kafka, kafka,
topic, topic,
maxItems, maxItems,
searchCode, onDone: buildSocketMessageSender({ socket, type: 'count' }),
immutable, onBatchDone: buildSocketMessageSender({ socket, type: 'item_count' })
onDone: messages => {
console.log(messages.length + ' messages')
socket.send(JSON.stringify({
type: 'complete',
message: messages
}))
},
onBatchDone: count => {
socket.send(JSON.stringify({
type: 'item_count',
message: count
}))
}
}) })
/** @type {WebSocketServer} */
const wsServer = new ws.WebSocketServer({ const wsServer = new ws.WebSocketServer({
noServer: true noServer: true
}) })
/** @type {Map<WebSocket, ({consumer: kafka.Consumer, cluster: { kafka, admin, config }})>} */
const consumers = new Map() const consumers = new Map()
buildEscape(consumers, clusters) buildEscape(consumers, clusters)
wsServer.on('connection', socket => { wsServer.on('connection', socket => {
socket.send('CONNECTED')
socket.on('close', async () => { socket.on('close', async () => {
await killConsumer(consumers.get(socket)) await killConsumer(consumers.get(socket).consumer)
consumers.delete(socket) consumers.delete(socket)
}) })
socket.on('message', async message => { socket.on('message', async message => {
socket.send('Loading...')
message = JSON.parse(message) message = JSON.parse(message)
const currentMode = message.mode === 'realTime' ? realTimeSearch : search if (message.mode === 'kill') {
console.log('CLUSTERS before run', clusters) console.log('KILLING SOCKET')
console.log('message.cluster', message.cluster) await killConsumer(consumers.get(socket).consumer)
consumers.delete(socket)
return
}
const startSearch = message.mode === 'realTime' ? realTimeSearch : oneShotSearch
const cluster = clusters[message.cluster] const cluster = clusters[message.cluster]
console.log('clusters[message.cluster]', cluster)
const run = async () => consumers.set(socket, await currentMode({ const run = async () => {
const consumerCluster = {
consumer: await startSearch({
kafka: cluster.kafka, kafka: cluster.kafka,
searchCode: message.searchCode,
immutable: message.immutable,
topic: message.topic, topic: message.topic,
maxItems: message.maxItems, maxItems: message.maxItems,
socket socket
})) }),
cluster
}
consumers.set(socket, consumerCluster);
}
run().catch(async e => { run().catch(async e => {
console.error('run() error occurred!', e.toString()) console.error('run() error occurred!', e.toString())
await killConsumer(consumers.get(socket)) await killConsumer(consumers.get(socket).consumer)
// Try again ONCE on failure // Try again ONCE on failure
run().catch(ee => socket.send('ERROR: ' + ee)) run().catch(ee => socket.send('ERROR: ' + ee))
}) })

View File

@ -2,22 +2,22 @@ const fs = require('fs')
const path = require('path') const path = require('path')
const homedir = require('os').homedir() const homedir = require('os').homedir()
const configFilePath = homedir + path.sep + '.kafka-dance' const configFilePath = homedir + path.sep + '.kafka-dance.json'
const emptyConfig = () => ({ const emptyConfig = () => ({
clusters: {} clusters: {},
frontendUrl: 'http://localhost:5173',
}) })
const readConfig = () => { const readConfig = () => {
if (!fs.existsSync(configFilePath)) { if (fs.existsSync(configFilePath)) {
return emptyConfig()
}
try { try {
return JSON.parse(fs.readFileSync(configFilePath).toString()) return JSON.parse(fs.readFileSync(configFilePath).toString())
} catch (e) { } catch (e) {
console.error(e.toString()) console.error(e.toString())
return emptyConfig()
} }
}
return emptyConfig()
} }
const storeConfig = async config => const storeConfig = async config =>

View File

@ -1,60 +1,55 @@
const buildMatcher = code => new Function('message', 'value', code) const parseValue = message => {
const deepCopy = obj => JSON.parse(JSON.stringify(obj))
const realTimeMessageSearch = async ({ kafka, topic, onMessage, searchCode, immutable = true }) => {
const consumer = kafka.consumer({ groupId: 'sage-testing-group' })
console.log({ topic, onMessage, searchCode, immutable })
await consumer.connect()
await consumer.subscribe({ topic, fromBeginning: true })
const matcher = searchCode && buildMatcher(searchCode)
const messages = []
console.log('consumer.run')
await consumer.run({
autoCommit: false,
eachMessage: async ({ topic, partition, message }) => {
const s = message.value.toString() const s = message.value.toString()
const value = JSON.parse(s) try {
delete value.Data return JSON.parse(s)
message.value = value } catch (e) {
const workingMessage = immutable ? deepCopy(message) : message console.error('Error deserializing message.value', e) // But keep the value as a string
if (!matcher || matcher(workingMessage, workingMessage.value)) { return '' + s
onMessage(message)
messages.push(message)
//console.log(message)
} }
}, }
})
const buildConsumer = async ({ kafka, topic, fromBeginning }) => {
// TODO Use a UUID/GUID instead
const consumer = kafka.consumer({ groupId: 'sage-testing-group' + Math.random() })
await consumer.connect()
await consumer.subscribe({ topic, fromBeginning })
for (let i = 0; i <= 5; i++) {
//console.log(`seek on partition ${i}`)
await consumer.seek({topic, partition: i, offset: 0})
}
console.log('return consumer')
return consumer return consumer
} }
const searchMessages = async ({ kafka, maxItems, topic, onDone, searchCode, onBatchDone, immutable = true }) => { const seekToBeginning = async ({ topic, consumer, partitionCount }) => {
const consumer = kafka.consumer({ groupId: 'sage-testing-group' }) for (let i = 0; i <= partitionCount; i++) {
await consumer.seek({topic, partition: i, offset: 0})
}
}
await consumer.connect() const realTimeMessageSearch = async ({ kafka, topic, onMessage }) => {
await consumer.subscribe({ topic, fromBeginning: true }) // TODO Use a UUID/GUID instead
const matcher = searchCode && buildMatcher(searchCode) const consumer = await buildConsumer({ kafka, topic, fromBeginning: true })
await consumer.run({
autoCommit: false,
eachMessage: async ({ topic, partition, message }) => {
message.value = parseValue(message.value)
onMessage(message)
},
})
// seekToBeginning({ topic, consumer, partitionCount: 5 })
return consumer
}
const searchMessages = async ({ kafka, maxItems, topic, onDone, onBatchDone }) => {
const consumer = await buildConsumer({ kafka, topic, fromBeginning: true })
const messages = [] const messages = []
await consumer.run({ await consumer.run({
autoCommit: false, autoCommit: false,
eachMessage: async ({ topic, partition, message }) => { eachMessage: async ({ topic, partition, message }) => {
const s = message.value.toString() message.value = parseValue(message.value)
const value = JSON.parse(s)
delete value.Data
message.value = value
let workingMessage = immutable ? deepCopy(message) : message
if (!matcher || matcher(workingMessage, workingMessage.value)) {
messages.push(message) messages.push(message)
if (messages.length < maxItems) { if (messages.length < maxItems) {
onBatchDone(messages.length) onBatchDone(messages.length)
@ -64,8 +59,6 @@ const searchMessages = async ({ kafka, maxItems, topic, onDone, searchCode, onBa
await consumer.disconnect() await consumer.disconnect()
onDone(messages.slice(0, maxItems)) onDone(messages.slice(0, maxItems))
} }
//console.log(message)
}
}, },
}) })
@ -88,7 +81,6 @@ const searchMessages = async ({ kafka, maxItems, topic, onDone, searchCode, onBa
*/ */
let consumedTopicPartitions = {} let consumedTopicPartitions = {}
consumer.on(consumer.events.GROUP_JOIN, async ({ payload }) => { consumer.on(consumer.events.GROUP_JOIN, async ({ payload }) => {
//console.log('GROUP_JOIN')
const { memberAssignment } = payload const { memberAssignment } = payload
consumedTopicPartitions = Object.entries(memberAssignment).reduce( consumedTopicPartitions = Object.entries(memberAssignment).reduce(
(topics, [topic, partitions]) => { (topics, [topic, partitions]) => {
@ -101,6 +93,7 @@ const searchMessages = async ({ kafka, maxItems, topic, onDone, searchCode, onBa
) )
}) })
// This was part of the original code, but seems to cause a near-immediate exit
/* /*
* This is extremely unergonomic, but if we are currently caught up to the head * This is extremely unergonomic, but if we are currently caught up to the head
* of all topic-partitions, we won't actually get any batches, which means we'll * of all topic-partitions, we won't actually get any batches, which means we'll
@ -114,7 +107,6 @@ const searchMessages = async ({ kafka, maxItems, topic, onDone, searchCode, onBa
//let processedBatch = true //let processedBatch = true
consumer.on(consumer.events.FETCH_START, async () => { consumer.on(consumer.events.FETCH_START, async () => {
//console.log('FETCH_START') //console.log('FETCH_START')
// This was part of the original code, but seems to cause a near-immediate exit
//if (processedBatch === false) { //if (processedBatch === false) {
// console.log('I DIE AT FETCH') // console.log('I DIE AT FETCH')
// await consumer.disconnect() // await consumer.disconnect()
@ -129,7 +121,6 @@ const searchMessages = async ({ kafka, maxItems, topic, onDone, searchCode, onBa
* and exit if all topic-partitions have been consumed, * and exit if all topic-partitions have been consumed,
*/ */
consumer.on(consumer.events.END_BATCH_PROCESS, async ({ payload }) => { consumer.on(consumer.events.END_BATCH_PROCESS, async ({ payload }) => {
//console.log('END_BATCH_PROCESS')
const { topic, partition, offsetLag } = payload const { topic, partition, offsetLag } = payload
consumedTopicPartitions[`${topic}-${partition}`] = offsetLag === '0' consumedTopicPartitions[`${topic}-${partition}`] = offsetLag === '0'
@ -147,10 +138,6 @@ const searchMessages = async ({ kafka, maxItems, topic, onDone, searchCode, onBa
return consumer return consumer
} }
//run()
// .then(() => console.log('DONE'))
// .catch(e => console.error(`[example/consumer] ${e.message}`, e))
module.exports = { module.exports = {
searchMessages, searchMessages,
realTimeMessageSearch, realTimeMessageSearch,

View File

@ -4,7 +4,7 @@ const signalTraps = ['SIGTERM', 'SIGINT', 'SIGUSR2']
const disconnectAll = async (consumers, clusters) => { const disconnectAll = async (consumers, clusters) => {
await Promise.all([ await Promise.all([
...Object.values(clusters).map(async cluster => cluster.admin.disconnect()), ...Object.values(clusters).map(async cluster => cluster.admin.disconnect()),
...Array.from(consumers.values()).map(async consumer => consumer.disconnect()) ...Array.from(consumers.values()).map(async consumer => consumer.consumer.disconnect())
]) ])
} }