Skip to Content
🚀 KalamDB v0.3.0-alpha2 is out — Learn more
SDK & ClientCookbook

SDK Cookbook

This page is a practical recipe collection built from real SDK source usage and the examples/chat-with-ai implementation.

Recipe 1: Chat with AI pipeline (topic + consumer + subscriptions)

This mirrors the chat-with-ai architecture:

  1. User message inserted into chat.messages
  2. CDC routes inserts into topic chat.ai-processing
  3. Background service consumes topic and writes AI reply back to table
  4. Frontend receives both user and AI rows instantly via live subscription

1) SQL setup

CREATE NAMESPACE IF NOT EXISTS chat; CREATE USER TABLE chat.messages ( id BIGINT PRIMARY KEY DEFAULT SNOWFLAKE_ID(), conversation_id BIGINT NOT NULL, sender TEXT NOT NULL, role TEXT NOT NULL DEFAULT 'user', content TEXT NOT NULL, created_at TIMESTAMP NOT NULL DEFAULT NOW() ); CREATE TOPIC "chat.ai-processing"; ALTER TOPIC "chat.ai-processing" ADD SOURCE chat.messages ON INSERT WITH (payload = 'full');

2) Frontend subscription (React)

const unsub = await client.subscribeWithSql( `SELECT * FROM chat.messages WHERE conversation_id = ${conversationId} ORDER BY created_at ASC`, (event) => { if (event.type === 'initial_data_batch' && event.rows) { setMessages((prev) => mergeRows(prev, event.rows)); return; } if (event.type === 'change' && event.change_type === 'insert' && event.rows) { setMessages((prev) => mergeRows(prev, event.rows)); } }, { batch_size: 200 } );

3) Node.js background processor (consumer)

import { createClient, Auth, parseRows } from 'kalam-link'; const client = createClient({ url: process.env.KALAMDB_URL ?? 'http://localhost:8080', auth: Auth.basic(process.env.KALAMDB_USERNAME!, process.env.KALAMDB_PASSWORD!), }); await client.connect(); const handle = client.consumer({ topic: 'chat.ai-processing', group_id: 'ai-processor-service', auto_ack: true, batch_size: 1, }); await handle.run(async (ctx) => { const payload = ctx.message.value as Record<string, unknown>; const row = (payload.row ?? payload) as { conversation_id: string; role: string; content: string; }; if (row.role !== 'user') return; const aiReply = await generateReply(row.content); await client.query( `INSERT INTO chat.messages (conversation_id, sender, role, content) VALUES (${row.conversation_id}, 'AI Assistant', 'assistant', '${aiReply.replace(/'/g, "''")}')` ); });

4) Operational notes from source

  • chat-with-ai uses executeAsUser(...) to preserve per-user isolation in service writes.
  • Typing indicator updates are done via a stream table (chat.typing_indicators).
  • Service loads WASM explicitly in Node contexts where needed.

Recipe 2: Resumable subscriptions with sequence checkpoints

Use from_seq_id + getLastSeqId(subscriptionId).

let savedSeqId: string | undefined; const unsub = await client.subscribeWithSql( 'SELECT * FROM app.events ORDER BY created_at ASC', (event) => { if (event.type === 'change') { // process event } }, savedSeqId ? { from_seq_id: savedSeqId } : undefined ); // Later, if you also keep subscription IDs in your app state: for (const sub of client.getSubscriptions()) { const seq = client.getLastSeqId(sub.id); if (seq) savedSeqId = seq; }

Recipe 3: Typed query helpers for safer app code

interface Conversation { id: string; title: string; created_at: string; } const conversations = await client.queryAll<Conversation>( 'SELECT id, title, created_at FROM chat.conversations ORDER BY updated_at DESC' ); const firstConversation = await client.queryOne<Conversation>( 'SELECT id, title, created_at FROM chat.conversations LIMIT 1' );

Why this pattern helps:

  • keeps SQL in one place
  • maps array-row response into typed objects
  • reduces unsafe indexing in UI code

Recipe 4: File attachments in messages

const result = await client.queryWithFiles( 'INSERT INTO chat.messages (conversation_id, sender, role, content, file_data) VALUES ($1, $2, $3, $4, FILE("attachment"))', { attachment: selectedFile }, [conversationId, 'user', 'user', messageText], (progress) => { setUploadPercent(progress.percent); } );

Then parse FILE metadata:

import { parseFileRef } from 'kalam-link'; const ref = parseFileRef(row.file_data); if (ref) { const downloadUrl = ref.getDownloadUrl('http://localhost:8080', 'chat', 'messages'); console.log(downloadUrl, ref.formatSize(), ref.getTypeDescription()); }

Recipe 5: Connection bootstrap for apps

const client = createClient({ url: 'http://localhost:8080', auth: Auth.basic(username, password), wasmUrl: '/wasm/kalam_link_bg.wasm', }); await client.login(); await client.connect(); client.setAutoReconnect(true); client.setReconnectDelay(1000, 30000); client.setMaxReconnectAttempts(0);

This mirrors the provider pattern in chat-with-ai: Basic login first, then JWT-backed realtime connection.

Recipe 6: Graceful shutdown for worker services

const handle = client.consumer({ topic: 'orders', group_id: 'order-worker', auto_ack: true }); const runPromise = handle.run(async (ctx) => { await processMessage(ctx.message.value); }); process.on('SIGINT', async () => { handle.stop(); await runPromise.catch(() => undefined); await client.disconnect(); process.exit(0); });
Last updated on