Job Queue
Build a standalone durable job processing system with jobQueue() — concurrency, progress tracking, priority ordering, scheduled execution, rate limiting, batch operations, introspection, persistence, and distributed processing.
The Problem
Background job processing typically requires external infrastructure (Redis + BullMQ, SQS, Celery). This recipe shows how to build a fully-featured job queue in-process using callbag-recharge's messaging primitives — no external dependencies.
The Solution
ts
/**
* Job Queue — standalone durable job processing
*
* Demonstrates the jobQueue as a self-contained processing engine:
* concurrency, progress, priority, scheduling, rate limiting,
* introspection, batch add, persistence, and distributed bridging.
*
* Run: pnpm exec tsx --tsconfig tsconfig.examples.json examples/job-queue.ts
*/
import { subscribe } from "callbag-recharge/extra";
import { jobQueue, topic } from "callbag-recharge/messaging";
import { memoryAdapter } from "callbag-recharge/utils";
// ---------------------------------------------------------------------------
// 1. Basic queue with concurrency and progress
// ---------------------------------------------------------------------------
const emailQueue = jobQueue<string, { sent: boolean }>(
"emails",
async (signal, _address, progress) => {
progress(0.1);
// Simulate sending an email
await new Promise((r) => setTimeout(r, 100));
if (signal.aborted) throw new Error("cancelled");
progress(0.9);
return { sent: true };
},
{ concurrency: 5 },
);
// React to companion stores
subscribe(emailQueue.active, (n) => console.log(`Active: ${n}`));
subscribe(emailQueue.progress, (p) => console.log(`Progress: ${(p * 100).toFixed(0)}%`));
// Add jobs
emailQueue.add("alice@example.com");
emailQueue.add("bob@example.com");
// Listen for events
emailQueue.on("completed", (job) => {
console.log(`Job ${job.seq} completed in ${job.duration}ms`, job.result);
});
// ---------------------------------------------------------------------------
// 2. Priority ordering
// ---------------------------------------------------------------------------
const priorityQueue = jobQueue<{ task: string }, void>(
"tasks",
async (_signal, data) => {
console.log(`Processing: ${data.task}`);
},
{ concurrency: 3 },
);
// Lower priority number = processed first (within a pull batch)
priorityQueue.add({ task: "low-priority" }, { priority: 10 });
priorityQueue.add({ task: "high-priority" }, { priority: 1 });
priorityQueue.add({ task: "medium-priority" }, { priority: 5 });
// ---------------------------------------------------------------------------
// 3. Scheduled jobs
// ---------------------------------------------------------------------------
const scheduledQueue = jobQueue<string, void>("scheduled", async (_signal, data) => {
console.log(`Running scheduled: ${data}`);
});
// Run 30 seconds from now
scheduledQueue.add("report-generation", {
runAt: new Date(Date.now() + 30_000),
});
// Introspect the scheduled job
const info = scheduledQueue.getJob(1);
console.log(`Job status: ${info?.status}`); // "scheduled"
// ---------------------------------------------------------------------------
// 4. Batch add + retry + dead letter queue
// ---------------------------------------------------------------------------
const dlq = topic<string>("failed-jobs");
const batchQueue = jobQueue<string, void>(
"batch-work",
async (_signal, data) => {
if (data === "poison") throw new Error("bad data");
},
{
retry: { maxRetries: 2, backoff: () => 100 },
deadLetterTopic: dlq,
},
);
// Add multiple jobs atomically
batchQueue.addBatch(["good-1", "poison", "good-2"]);
// Failed jobs route to the dead letter topic after retries exhausted
batchQueue.on("failed", (job) => {
console.log(`Job ${job.seq} failed after ${job.attempts} attempts`);
});
// ---------------------------------------------------------------------------
// 5. Rate limiting
// ---------------------------------------------------------------------------
const apiQueue = jobQueue<string, Response>("api-calls", async (_signal, url) => fetch(url), {
concurrency: 10,
rateLimit: { max: 5, windowMs: 1000 }, // max 5 job starts per second
});
// ---------------------------------------------------------------------------
// 6. Job introspection and removal
// ---------------------------------------------------------------------------
const longQueue = jobQueue<string, void>("long-jobs", async (signal) => {
await new Promise((resolve, reject) => {
const timer = setTimeout(resolve, 60_000);
signal.addEventListener("abort", () => {
clearTimeout(timer);
reject(new Error("aborted"));
});
});
});
const seq = longQueue.add("long-task");
// Check status
const job = longQueue.getJob(seq);
console.log(`Job ${seq}: status=${job?.status}, attempts=${job?.attempts}`);
// Cancel it
longQueue.remove(seq);
// ---------------------------------------------------------------------------
// 7. Persistence — survive restarts
// ---------------------------------------------------------------------------
const adapter = memoryAdapter(); // swap with a file/db adapter in production
const persistentQueue = jobQueue<string, string>(
"persistent",
async (_signal, data) => `done:${data}`,
{ persistence: adapter },
);
persistentQueue.add("important-work");
// After restart, create a new queue with the same name + adapter.
// Completed jobs are recovered for introspection via getJob().
// ---------------------------------------------------------------------------
// 8. Distributed jobs via topicBridge
// ---------------------------------------------------------------------------
// Expose the internal topic for wiring into a topicBridge:
//
// import { topicBridge, wsMessageTransport } from 'callbag-recharge/messaging';
//
// const bridge = topicBridge(
// wsMessageTransport({ url: 'ws://worker-node:8080' }),
// { 'emails:jobs': { topic: emailQueue.inner.topic } },
// );
//
// Remote workers consume from the same topic, enabling distributed processing.
// ---------------------------------------------------------------------------
// Cleanup
// ---------------------------------------------------------------------------
// Pause/resume lifecycle
emailQueue.pause();
console.log(`Paused: ${emailQueue.isPaused}`); // true
emailQueue.resume();
// Tear down
emailQueue.destroy();
priorityQueue.destroy();
scheduledQueue.destroy();
batchQueue.destroy();
apiQueue.destroy();
longQueue.destroy();
persistentQueue.destroy();
dlq.destroy();API Overview
Creating a queue
ts
const q = jobQueue<InputType, ResultType>(
"queue-name",
async (signal, data, progress) => {
progress(0.5); // report progress (0–1)
if (signal.aborted) throw new Error("cancelled");
return result; // returned to "completed" event
},
{
concurrency: 5, // parallel workers
rateLimit: { max: 10, windowMs: 1000 }, // throttle starts
retry: { maxRetries: 3, backoff: exponential() },
persistence: memoryAdapter(), // survive restarts
deadLetterTopic: dlq, // terminal failures
},
);Adding jobs
| Method | Description |
|---|---|
q.add(data) | Add a single job. Returns sequence number. |
q.add(data, { priority: 1 }) | Priority ordering (lower = first). |
q.add(data, { runAt: new Date(...) }) | Scheduled execution. |
q.addBatch(items) | Atomic batch add. Returns sequence numbers. |
Reactive companion stores
| Store | Type | Description |
|---|---|---|
q.active | Store<number> | Currently processing jobs |
q.completed | Store<number> | Total completed |
q.failed | Store<number> | Total failed |
q.waiting | Store<number> | Backlog size |
q.progress | Store<number> | Aggregate progress (0–1) across active jobs |
Events
ts
q.on("completed", (job) => { /* job.result, job.duration */ });
q.on("failed", (job) => { /* job.error, job.attempts */ });
q.on("stalled", (job) => { /* ackTimeout exceeded */ });
q.on("progress", (job) => { /* job.progress (0–1) */ });Introspection
ts
q.getJob(seq); // → JobInfo | undefined (status, attempts, result, error, progress)
q.remove(seq); // → boolean — cancel and remove a jobLifecycle
ts
q.pause(); // stop pulling new jobs (in-flight continue)
q.resume(); // resume pulling
q.isPaused; // boolean
q.destroy(); // tear down all resourcesDistributed processing
Expose the internal topic for bridging to remote workers:
ts
import { topicBridge, wsMessageTransport } from 'callbag-recharge/messaging';
const bridge = topicBridge(
wsMessageTransport({ url: 'ws://worker:8080' }),
{ 'emails:jobs': { topic: q.inner.topic } },
);Primitives Used
| Primitive | From | Role |
|---|---|---|
jobQueue() | callbag-recharge/messaging | Core job processing engine |
topic() | callbag-recharge/messaging | Dead letter queue for failed jobs |
topicBridge() | callbag-recharge/messaging | Distributed job processing |
subscribe() | callbag-recharge/extra | React to companion stores |
memoryAdapter() | callbag-recharge/utils | In-memory persistence |
exponential() | callbag-recharge/utils | Retry backoff strategy |
Design Notes
- Signal-first: Processor receives
(signal, data, progress)— signal is always first, matching the orchestrate convention. - No polling: Job dispatch is push-based via reactive subscription on topic depth changes.
- No raw Promises: Retry delays use
fromTimer(), rate limiting usesslidingWindow.acquire()— all callbag-native. - Priority is batch-scoped: Jobs are sorted by priority within each pull batch. For full backlog ordering, a future
priorityOrderflag on subscriptionpull()is planned. - Persistence is sync-only: The
CheckpointAdaptermust emit values synchronously (e.g.,memoryAdapter()). Async adapters are not supported.