Skip to Content
🚀 KalamDB v0.3.0-alpha2 is out — Learn more
Use Cases & ExamplesChat Applications

Chat Applications at Scale

This use case combines two production patterns:

  • conversational messaging with realtime updates
  • AI post-processing via topic workers

Why it fits KalamDB

  • USER tables isolate user message histories
  • live subscriptions keep the UI event-driven
  • topics + consumers handle async AI reply generation
  • FILE(...) enables file-aware chat messages

Example schema

CREATE NAMESPACE IF NOT EXISTS chat; CREATE TABLE chat.conversations ( id BIGINT PRIMARY KEY DEFAULT SNOWFLAKE_ID(), title TEXT NOT NULL, created_at TIMESTAMP DEFAULT NOW() ) WITH (TYPE = 'USER', FLUSH_POLICY = 'rows:1000'); CREATE TABLE chat.messages ( id BIGINT PRIMARY KEY DEFAULT SNOWFLAKE_ID(), conversation_id BIGINT NOT NULL, role TEXT NOT NULL, content TEXT NOT NULL, file_data FILE, created_at TIMESTAMP DEFAULT NOW() ) WITH (TYPE = 'USER', FLUSH_POLICY = 'rows:1000'); CREATE TABLE chat.typing_events ( id BIGINT PRIMARY KEY DEFAULT SNOWFLAKE_ID(), conversation_id BIGINT NOT NULL, user_id TEXT NOT NULL, event_type TEXT NOT NULL, created_at TIMESTAMP DEFAULT NOW() ) WITH (TYPE = 'STREAM', TTL_SECONDS = 30); CREATE TOPIC chat.ai_processing; ALTER TOPIC chat.ai_processing ADD SOURCE chat.messages ON INSERT WITH (payload = 'full');

Example implementation

Frontend realtime subscription:

const unsub = await client.subscribeWithSql( 'SELECT * FROM chat.messages WHERE conversation_id = $1 ORDER BY created_at ASC', (event) => { if (event.type === 'change' && event.change_type === 'insert') { renderMessages(event.rows); } }, { batch_size: 200 } );

Background worker pattern:

const worker = client.consumer({ topic: 'chat.ai_processing', group_id: 'ai-processor', auto_ack: true, batch_size: 1, }); await worker.run(async (ctx) => { const row = ctx.message.value?.row ?? ctx.message.value; if (row?.role !== 'user') return; const reply = await generateReply(row.content); await client.query( 'INSERT INTO chat.messages (conversation_id, role, content) VALUES ($1, $2, $3)', [row.conversation_id, 'assistant', reply] ); });

Code references

Last updated on