Skip to Content
🚀 KalamDB v0.3.0-alpha2 is out — Learn more
Use Cases & ExamplesAgent Worker Queue

Agent Worker Queue

Use this pattern for asynchronous processing tasks such as moderation, enrichment, classification, and indexing.

Core pattern

  • topic routes capture relevant source events
  • worker group consumes in batches
  • each worker acknowledges offsets after successful processing
  • retries/idempotency handled in application logic

Example schema and topic route

CREATE TOPIC agent.tasks PARTITIONS 4; ALTER TOPIC agent.tasks ADD SOURCE app.jobs ON INSERT WITH (payload = 'full');

Example implementation

const worker = client.consumer({ topic: 'agent.tasks', group_id: 'agent-workers', auto_ack: false, batch_size: 20, concurrency_per_partition: 2, }); await worker.run(async (ctx) => { await processTask(ctx.message.value); await ctx.ack(); });

Code references

Last updated on