Agent Worker Queue
Use this pattern for asynchronous processing tasks such as moderation, enrichment, classification, and indexing.
Core pattern
- topic routes capture relevant source events
- worker group consumes in batches
- each worker acknowledges offsets after successful processing
- retries/idempotency handled in application logic
Example schema and topic route
CREATE TOPIC agent.tasks PARTITIONS 4;
ALTER TOPIC agent.tasks
ADD SOURCE app.jobs
ON INSERT
WITH (payload = 'full');Example implementation
const worker = client.consumer({
topic: 'agent.tasks',
group_id: 'agent-workers',
auto_ack: false,
batch_size: 20,
concurrency_per_partition: 2,
});
await worker.run(async (ctx) => {
await processTask(ctx.message.value);
await ctx.ack();
});Code references
- Starter worker scaffold: https://github.com/jamals86/KalamDB/tree/main/examples/simple-typescript
- Topic API contracts: https://github.com/jamals86/KalamDB/tree/main/docs/api
Last updated on