kafka-dance-frontend/src/lib/state.js

134 lines
3.4 KiB
JavaScript

// noinspection JSCheckFunctionSignatures
import { writable } from 'svelte/store'
import { backendAddressAndPort, queryMode } from "./constants.js";
const mockItems = [
{
"eventType": "Television",
"broughtToYouBy": "AMC",
"title": "Breaking Bad"
},
{
"eventType": "Television",
"broughtToYouBy": "CBS",
"title": "Breaking Bad Bang Theory"
},
{
"eventType": "Movie",
"broughtToYouBy": "20th Century Fox",
"title": "Star Wars 2"
},
].map(o => ({ value: o, timestamp: new Date().getTime() }))
const testMode = false
export const state = writable({
items: [],
itemCount: undefined,
error: null,
})
const updateClearError = updater => state.update(s => {
s.error = null
return updater(s)
})
let itemLimit = Infinity
let ws;
let disconnected = false
const getRandomFromArray = array => array[Math.floor(Math.random() * array.length)]
const testQuery = (mode, jsFilter, queryCode) => {
try {
const f = new Function('message', 'value', queryCode)
if (mode === queryMode.REAL_TIME) {
const addItem = () => {
const item = getRandomFromArray(mockItems)
if (!jsFilter || f(item, item.value)) {
item.timestamp = new Date().getTime()
updateClearError(s => ({ ...s, items: [item, ...s.items].slice(0, itemLimit), itemCount: 0 }))
}
setTimeout(addItem, 2000)
}
setTimeout(addItem, 2000)
} else {
updateClearError(s => ({ ...s, items: mockItems.filter(item => !jsFilter || f(item, item.value)), itemCount: 0 }))
}
} catch (e) {
updateClearError(s => ({ ...s, error: e.toString() }))
}
}
// noinspection JSUnusedGlobalSymbols
export const query = async ({ cluster, topic, mode, jsFilter, queryCode, maxItems, mutableObjects }) => {
updateClearError(s => ({ ...s, items: [], itemCount: 0 }))
if (testMode) {
testQuery(mode, jsFilter, queryCode)
return
}
if (disconnected) {
connect()
disconnected = false
}
updateClearError(s => ({ ...s, items: [], itemCount: 0 }))
itemLimit = maxItems
ws.send(JSON.stringify({
cluster,
mode,
topic,
maxItems,
immutable: !mutableObjects,
searchCode: jsFilter && queryCode
}))
}
export const connect = () => {
ws = new WebSocket(`ws://${backendAddressAndPort}`)
if (!ws) {
updateClearError(s => ({ ...s, error: 'Unable to connect to websocket.' }))
return
}
ws.addEventListener('close', () => {
disconnected = true
})
ws.addEventListener('open', () => {
console.log('WebSocket opened')
})
ws.addEventListener('message', message => {
let data;
try {
data = JSON.parse(message.data)
console.log('WebSocket message received', data)
switch (data?.type.toLowerCase()) {
case 'complete':
updateClearError(s => ({
...s,
items: data.message
}))
break;
case 'item_count':
updateClearError(s => ({
...s,
itemCount: console.log('state', s) || data.message
}))
break;
case 'message':
updateClearError(s => ({
...s,
items: console.log('new item', data.message) || [data.message, ...s.items].slice(0, itemLimit)
}))
break;
}
} catch (e) {
data = message.data
}
})
ws.addEventListener('close', () => {
console.log('WebSocket closed')
})
}