SDK Cookbook
This page is a practical recipe collection built from real SDK source usage and the examples/chat-with-ai implementation.
Recipe 1: Chat with AI pipeline (topic + consumer + subscriptions)
This mirrors the chat-with-ai architecture:
- User message inserted into
chat.messages - CDC routes inserts into topic
chat.ai-processing - Background service consumes topic and writes AI reply back to table
- Frontend receives both user and AI rows instantly via live subscription
1) SQL setup
CREATE NAMESPACE IF NOT EXISTS chat;
CREATE USER TABLE chat.messages (
id BIGINT PRIMARY KEY DEFAULT SNOWFLAKE_ID(),
conversation_id BIGINT NOT NULL,
sender TEXT NOT NULL,
role TEXT NOT NULL DEFAULT 'user',
content TEXT NOT NULL,
created_at TIMESTAMP NOT NULL DEFAULT NOW()
);
CREATE TOPIC "chat.ai-processing";
ALTER TOPIC "chat.ai-processing"
ADD SOURCE chat.messages ON INSERT WITH (payload = 'full');2) Frontend subscription (React)
const unsub = await client.subscribeWithSql(
`SELECT * FROM chat.messages WHERE conversation_id = ${conversationId} ORDER BY created_at ASC`,
(event) => {
if (event.type === 'initial_data_batch' && event.rows) {
setMessages((prev) => mergeRows(prev, event.rows));
return;
}
if (event.type === 'change' && event.change_type === 'insert' && event.rows) {
setMessages((prev) => mergeRows(prev, event.rows));
}
},
{ batch_size: 200 }
);3) Node.js background processor (consumer)
import { createClient, Auth, parseRows } from 'kalam-link';
const client = createClient({
url: process.env.KALAMDB_URL ?? 'http://localhost:8080',
auth: Auth.basic(process.env.KALAMDB_USERNAME!, process.env.KALAMDB_PASSWORD!),
});
await client.connect();
const handle = client.consumer({
topic: 'chat.ai-processing',
group_id: 'ai-processor-service',
auto_ack: true,
batch_size: 1,
});
await handle.run(async (ctx) => {
const payload = ctx.message.value as Record<string, unknown>;
const row = (payload.row ?? payload) as {
conversation_id: string;
role: string;
content: string;
};
if (row.role !== 'user') return;
const aiReply = await generateReply(row.content);
await client.query(
`INSERT INTO chat.messages (conversation_id, sender, role, content)
VALUES (${row.conversation_id}, 'AI Assistant', 'assistant', '${aiReply.replace(/'/g, "''")}')`
);
});4) Operational notes from source
chat-with-aiusesexecuteAsUser(...)to preserve per-user isolation in service writes.- Typing indicator updates are done via a stream table (
chat.typing_indicators). - Service loads WASM explicitly in Node contexts where needed.
Recipe 2: Resumable subscriptions with sequence checkpoints
Use from_seq_id + getLastSeqId(subscriptionId).
let savedSeqId: string | undefined;
const unsub = await client.subscribeWithSql(
'SELECT * FROM app.events ORDER BY created_at ASC',
(event) => {
if (event.type === 'change') {
// process event
}
},
savedSeqId ? { from_seq_id: savedSeqId } : undefined
);
// Later, if you also keep subscription IDs in your app state:
for (const sub of client.getSubscriptions()) {
const seq = client.getLastSeqId(sub.id);
if (seq) savedSeqId = seq;
}Recipe 3: Typed query helpers for safer app code
interface Conversation {
id: string;
title: string;
created_at: string;
}
const conversations = await client.queryAll<Conversation>(
'SELECT id, title, created_at FROM chat.conversations ORDER BY updated_at DESC'
);
const firstConversation = await client.queryOne<Conversation>(
'SELECT id, title, created_at FROM chat.conversations LIMIT 1'
);Why this pattern helps:
- keeps SQL in one place
- maps array-row response into typed objects
- reduces unsafe indexing in UI code
Recipe 4: File attachments in messages
const result = await client.queryWithFiles(
'INSERT INTO chat.messages (conversation_id, sender, role, content, file_data) VALUES ($1, $2, $3, $4, FILE("attachment"))',
{ attachment: selectedFile },
[conversationId, 'user', 'user', messageText],
(progress) => {
setUploadPercent(progress.percent);
}
);Then parse FILE metadata:
import { parseFileRef } from 'kalam-link';
const ref = parseFileRef(row.file_data);
if (ref) {
const downloadUrl = ref.getDownloadUrl('http://localhost:8080', 'chat', 'messages');
console.log(downloadUrl, ref.formatSize(), ref.getTypeDescription());
}Recipe 5: Connection bootstrap for apps
const client = createClient({
url: 'http://localhost:8080',
auth: Auth.basic(username, password),
wasmUrl: '/wasm/kalam_link_bg.wasm',
});
await client.login();
await client.connect();
client.setAutoReconnect(true);
client.setReconnectDelay(1000, 30000);
client.setMaxReconnectAttempts(0);This mirrors the provider pattern in chat-with-ai: Basic login first, then JWT-backed realtime connection.
Recipe 6: Graceful shutdown for worker services
const handle = client.consumer({ topic: 'orders', group_id: 'order-worker', auto_ack: true });
const runPromise = handle.run(async (ctx) => {
await processMessage(ctx.message.value);
});
process.on('SIGINT', async () => {
handle.stop();
await runPromise.catch(() => undefined);
await client.disconnect();
process.exit(0);
});Related docs
Last updated on