Skip to Content
🚀 KalamDB v0.3.0-alpha2 is out — Learn more
SDK & ClientTopic Consumers & ACK

Topic Consumers & ACK

Beyond SQL subscriptions, kalam-link exposes typed topic consume/ack APIs.

Consumer builder API

const handle = client.consumer({ topic: 'orders', group_id: 'billing', auto_ack: true, batch_size: 10, }); await handle.run(async (ctx) => { console.log(ctx.message.offset, ctx.message.value); }); handle.stop();

Source behavior (client.ts):

  • loops consume(options) until stop() requested
  • if auto_ack and handler didn’t manually ctx.ack(), SDK auto-acks
  • when no more messages, sleeps ~1s before polling again

Manual ACK mode

const handle = client.consumer({ topic: 'orders', group_id: 'billing-manual', auto_ack: false, }); await handle.run(async (ctx) => { await processOrder(ctx.message.value); await ctx.ack(); });

ctx contains:

  • username (typed branded username)
  • message (topic, group, partition, offset, payload)
  • ack(): Promise<void>

One-shot batch consume

const batch = await client.consumeBatch({ topic: 'orders', group_id: 'billing', batch_size: 20, start: 'earliest', }); console.log(batch.messages, batch.next_offset, batch.has_more);

Low-level ack API

await client.ack('orders', 'billing', 0, 42);

This acknowledges up to and including offset 42.

ConsumeRequest options

{ topic: string; group_id: string; start?: 'earliest' | 'latest' | 'offset'; partition_id?: number; batch_size?: number; timeout_seconds?: number; auto_ack?: boolean; concurrency_per_partition?: number; }

When to use which API

  • SQL subscriptions: row-level table/query reactivity.
  • consumer(): explicit queue/partition/group processing with ack semantics.
Last updated on