Topic Consumers & ACK
Beyond SQL subscriptions, kalam-link exposes typed topic consume/ack APIs.
Consumer builder API
const handle = client.consumer({
topic: 'orders',
group_id: 'billing',
auto_ack: true,
batch_size: 10,
});
await handle.run(async (ctx) => {
console.log(ctx.message.offset, ctx.message.value);
});
handle.stop();Source behavior (client.ts):
- loops
consume(options)untilstop()requested - if
auto_ackand handler didn’t manuallyctx.ack(), SDK auto-acks - when no more messages, sleeps ~1s before polling again
Manual ACK mode
const handle = client.consumer({
topic: 'orders',
group_id: 'billing-manual',
auto_ack: false,
});
await handle.run(async (ctx) => {
await processOrder(ctx.message.value);
await ctx.ack();
});ctx contains:
username(typed branded username)message(topic, group, partition, offset, payload)ack(): Promise<void>
One-shot batch consume
const batch = await client.consumeBatch({
topic: 'orders',
group_id: 'billing',
batch_size: 20,
start: 'earliest',
});
console.log(batch.messages, batch.next_offset, batch.has_more);Low-level ack API
await client.ack('orders', 'billing', 0, 42);This acknowledges up to and including offset 42.
ConsumeRequest options
{
topic: string;
group_id: string;
start?: 'earliest' | 'latest' | 'offset';
partition_id?: number;
batch_size?: number;
timeout_seconds?: number;
auto_ack?: boolean;
concurrency_per_partition?: number;
}When to use which API
- SQL subscriptions: row-level table/query reactivity.
- consumer(): explicit queue/partition/group processing with ack semantics.
Last updated on