This project has been succeeded by GraphReFly. New development happens at graphrefly-ts. npm install @graphrefly/graphrefly
Skip to content

Job Queue

Build a standalone durable job processing system with jobQueue() — concurrency, progress tracking, priority ordering, scheduled execution, rate limiting, batch operations, introspection, persistence, and distributed processing.

The Problem

Background job processing typically requires external infrastructure (Redis + BullMQ, SQS, Celery). This recipe shows how to build a fully-featured job queue in-process using callbag-recharge's messaging primitives — no external dependencies.

The Solution

ts
/**
 * Job Queue — standalone durable job processing
 *
 * Demonstrates the jobQueue as a self-contained processing engine:
 * concurrency, progress, priority, scheduling, rate limiting,
 * introspection, batch add, persistence, and distributed bridging.
 *
 * Run: pnpm exec tsx --tsconfig tsconfig.examples.json examples/job-queue.ts
 */
import { subscribe } from "callbag-recharge/extra";
import { jobQueue, topic } from "callbag-recharge/messaging";
import { memoryAdapter } from "callbag-recharge/utils";

// ---------------------------------------------------------------------------
// 1. Basic queue with concurrency and progress
// ---------------------------------------------------------------------------

const emailQueue = jobQueue<string, { sent: boolean }>(
	"emails",
	async (signal, _address, progress) => {
		progress(0.1);
		// Simulate sending an email
		await new Promise((r) => setTimeout(r, 100));
		if (signal.aborted) throw new Error("cancelled");
		progress(0.9);
		return { sent: true };
	},
	{ concurrency: 5 },
);

// React to companion stores
subscribe(emailQueue.active, (n) => console.log(`Active: ${n}`));
subscribe(emailQueue.progress, (p) => console.log(`Progress: ${(p * 100).toFixed(0)}%`));

// Add jobs
emailQueue.add("alice@example.com");
emailQueue.add("bob@example.com");

// Listen for events
emailQueue.on("completed", (job) => {
	console.log(`Job ${job.seq} completed in ${job.duration}ms`, job.result);
});

// ---------------------------------------------------------------------------
// 2. Priority ordering
// ---------------------------------------------------------------------------

const priorityQueue = jobQueue<{ task: string }, void>(
	"tasks",
	async (_signal, data) => {
		console.log(`Processing: ${data.task}`);
	},
	{ concurrency: 3 },
);

// Lower priority number = processed first (within a pull batch)
priorityQueue.add({ task: "low-priority" }, { priority: 10 });
priorityQueue.add({ task: "high-priority" }, { priority: 1 });
priorityQueue.add({ task: "medium-priority" }, { priority: 5 });

// ---------------------------------------------------------------------------
// 3. Scheduled jobs
// ---------------------------------------------------------------------------

const scheduledQueue = jobQueue<string, void>("scheduled", async (_signal, data) => {
	console.log(`Running scheduled: ${data}`);
});

// Run 30 seconds from now
scheduledQueue.add("report-generation", {
	runAt: new Date(Date.now() + 30_000),
});

// Introspect the scheduled job
const info = scheduledQueue.getJob(1);
console.log(`Job status: ${info?.status}`); // "scheduled"

// ---------------------------------------------------------------------------
// 4. Batch add + retry + dead letter queue
// ---------------------------------------------------------------------------

const dlq = topic<string>("failed-jobs");

const batchQueue = jobQueue<string, void>(
	"batch-work",
	async (_signal, data) => {
		if (data === "poison") throw new Error("bad data");
	},
	{
		retry: { maxRetries: 2, backoff: () => 100 },
		deadLetterTopic: dlq,
	},
);

// Add multiple jobs atomically
batchQueue.addBatch(["good-1", "poison", "good-2"]);

// Failed jobs route to the dead letter topic after retries exhausted
batchQueue.on("failed", (job) => {
	console.log(`Job ${job.seq} failed after ${job.attempts} attempts`);
});

// ---------------------------------------------------------------------------
// 5. Rate limiting
// ---------------------------------------------------------------------------

const apiQueue = jobQueue<string, Response>("api-calls", async (_signal, url) => fetch(url), {
	concurrency: 10,
	rateLimit: { max: 5, windowMs: 1000 }, // max 5 job starts per second
});

// ---------------------------------------------------------------------------
// 6. Job introspection and removal
// ---------------------------------------------------------------------------

const longQueue = jobQueue<string, void>("long-jobs", async (signal) => {
	await new Promise((resolve, reject) => {
		const timer = setTimeout(resolve, 60_000);
		signal.addEventListener("abort", () => {
			clearTimeout(timer);
			reject(new Error("aborted"));
		});
	});
});

const seq = longQueue.add("long-task");

// Check status
const job = longQueue.getJob(seq);
console.log(`Job ${seq}: status=${job?.status}, attempts=${job?.attempts}`);

// Cancel it
longQueue.remove(seq);

// ---------------------------------------------------------------------------
// 7. Persistence — survive restarts
// ---------------------------------------------------------------------------

const adapter = memoryAdapter(); // swap with a file/db adapter in production

const persistentQueue = jobQueue<string, string>(
	"persistent",
	async (_signal, data) => `done:${data}`,
	{ persistence: adapter },
);

persistentQueue.add("important-work");

// After restart, create a new queue with the same name + adapter.
// Completed jobs are recovered for introspection via getJob().

// ---------------------------------------------------------------------------
// 8. Distributed jobs via topicBridge
// ---------------------------------------------------------------------------

// Expose the internal topic for wiring into a topicBridge:
//
//   import { topicBridge, wsMessageTransport } from 'callbag-recharge/messaging';
//
//   const bridge = topicBridge(
//     wsMessageTransport({ url: 'ws://worker-node:8080' }),
//     { 'emails:jobs': { topic: emailQueue.inner.topic } },
//   );
//
// Remote workers consume from the same topic, enabling distributed processing.

// ---------------------------------------------------------------------------
// Cleanup
// ---------------------------------------------------------------------------

// Pause/resume lifecycle
emailQueue.pause();
console.log(`Paused: ${emailQueue.isPaused}`); // true
emailQueue.resume();

// Tear down
emailQueue.destroy();
priorityQueue.destroy();
scheduledQueue.destroy();
batchQueue.destroy();
apiQueue.destroy();
longQueue.destroy();
persistentQueue.destroy();
dlq.destroy();

API Overview

Creating a queue

ts
const q = jobQueue<InputType, ResultType>(
  "queue-name",
  async (signal, data, progress) => {
    progress(0.5);        // report progress (0–1)
    if (signal.aborted) throw new Error("cancelled");
    return result;        // returned to "completed" event
  },
  {
    concurrency: 5,                          // parallel workers
    rateLimit: { max: 10, windowMs: 1000 },  // throttle starts
    retry: { maxRetries: 3, backoff: exponential() },
    persistence: memoryAdapter(),             // survive restarts
    deadLetterTopic: dlq,                    // terminal failures
  },
);

Adding jobs

MethodDescription
q.add(data)Add a single job. Returns sequence number.
q.add(data, { priority: 1 })Priority ordering (lower = first).
q.add(data, { runAt: new Date(...) })Scheduled execution.
q.addBatch(items)Atomic batch add. Returns sequence numbers.

Reactive companion stores

StoreTypeDescription
q.activeStore<number>Currently processing jobs
q.completedStore<number>Total completed
q.failedStore<number>Total failed
q.waitingStore<number>Backlog size
q.progressStore<number>Aggregate progress (0–1) across active jobs

Events

ts
q.on("completed", (job) => { /* job.result, job.duration */ });
q.on("failed",    (job) => { /* job.error, job.attempts  */ });
q.on("stalled",   (job) => { /* ackTimeout exceeded      */ });
q.on("progress",  (job) => { /* job.progress (0–1)       */ });

Introspection

ts
q.getJob(seq);   // → JobInfo | undefined (status, attempts, result, error, progress)
q.remove(seq);   // → boolean — cancel and remove a job

Lifecycle

ts
q.pause();       // stop pulling new jobs (in-flight continue)
q.resume();      // resume pulling
q.isPaused;      // boolean
q.destroy();     // tear down all resources

Distributed processing

Expose the internal topic for bridging to remote workers:

ts
import { topicBridge, wsMessageTransport } from 'callbag-recharge/messaging';

const bridge = topicBridge(
  wsMessageTransport({ url: 'ws://worker:8080' }),
  { 'emails:jobs': { topic: q.inner.topic } },
);

Primitives Used

PrimitiveFromRole
jobQueue()callbag-recharge/messagingCore job processing engine
topic()callbag-recharge/messagingDead letter queue for failed jobs
topicBridge()callbag-recharge/messagingDistributed job processing
subscribe()callbag-recharge/extraReact to companion stores
memoryAdapter()callbag-recharge/utilsIn-memory persistence
exponential()callbag-recharge/utilsRetry backoff strategy

Design Notes

  • Signal-first: Processor receives (signal, data, progress) — signal is always first, matching the orchestrate convention.
  • No polling: Job dispatch is push-based via reactive subscription on topic depth changes.
  • No raw Promises: Retry delays use fromTimer(), rate limiting uses slidingWindow.acquire() — all callbag-native.
  • Priority is batch-scoped: Jobs are sorted by priority within each pull batch. For full backlog ordering, a future priorityOrder flag on subscription pull() is planned.
  • Persistence is sync-only: The CheckpointAdapter must emit values synchronously (e.g., memoryAdapter()). Async adapters are not supported.

Released under the MIT License.