Skip to content

How to Build a Reactive Data Pipeline in TypeScript

Transform, filter, batch, and aggregate streaming data using composable operators — with backpressure and type safety.

The Problem

ETL pipelines typically use batch frameworks (Airflow, Spark) or imperative loops. Both lack:

  • Composability — steps are wired with glue code, not a type-safe API
  • Backpressure — producers overwhelm consumers without flow control
  • Reactivity — no way to observe intermediate state or derived metrics

The Solution

callbag-recharge treats every pipeline step as a composable store. pipe() chains operators that filter, transform, and batch — each step is inspectable and type-safe.

ts
/**
 * Reactive Data Pipeline (ETL)
 *
 * Demonstrates: fromAsyncIter + pipe operators for streaming ETL.
 * Filter, transform, batch, and write — fully reactive with backpressure.
 */

import { pipe } from "callbag-recharge";
import {
	bufferCount,
	filter,
	forEach,
	fromIter,
	map,
	scan,
	subscribe,
} from "callbag-recharge/extra";

// ── Simulated database rows ──────────────────────────────────

interface Event {
	id: number;
	type: string;
	amount: number;
	timestamp: number;
}

const rawEvents: Event[] = [
	{ id: 1, type: "purchase", amount: 42, timestamp: Date.now() },
	{ id: 2, type: "pageview", amount: 0, timestamp: Date.now() },
	{ id: 3, type: "purchase", amount: 99, timestamp: Date.now() },
	{ id: 4, type: "signup", amount: 0, timestamp: Date.now() },
	{ id: 5, type: "purchase", amount: 15, timestamp: Date.now() },
	{ id: 6, type: "purchase", amount: 200, timestamp: Date.now() },
];

// ── Pipeline: filter → transform → batch → write ────────────

const source = fromIter(rawEvents);

const pipeline = pipe(
	source,
	// Keep only purchase events
	filter((row: Event) => row.type === "purchase"),
	// Transform: convert to cents
	map((row: Event) => ({ ...row, amount: row.amount * 100 })),
	// Batch into groups of 2 for bulk insert
	bufferCount(2),
);

// ── Running total via scan ───────────────────────────────────

const purchases = pipe(
	source,
	filter((row: Event) => row.type === "purchase"),
	map((row: Event) => row.amount),
	scan((total, amount) => total + amount, 0),
);

// ── Execute ──────────────────────────────────────────────────

console.log("=== Batched writes ===");
const _unsub1 = forEach(pipeline, (batch) => {
	console.log(
		"Bulk insert:",
		batch.map((r) => `$${r.amount / 100}`),
	);
});

console.log("\n=== Running total ===");
const _unsub2 = subscribe(purchases, (total) => {
	console.log(`Running total: $${total}`);
});

Why This Works

  1. pipe() composition — each operator is a pure function. The chain is type-checked end-to-end.
  2. filter() + map() — declarative transforms that read like the spec: "keep purchases, convert to cents."
  3. bufferCount(n) — batches emissions into groups of n for efficient bulk writes. No manual array management.
  4. scan() — running aggregation alongside the main pipeline. Both operate on the same source.
  5. InspectableInspector.dumpGraph() shows every step, its current value, and edges.

Async Sources

Replace fromIter with fromAsyncIter for real async sources:

ts
import { fromAsyncIter } from 'callbag-recharge/extra'

// Database cursor
const rows = fromAsyncIter(db.query('SELECT * FROM events'))

// SSE stream
const events = fromAsyncIter(sseStream)

// File lines
const lines = fromAsyncIter(readline.createInterface({ input: fs.createReadStream('data.csv') }))

Adding Error Handling

Wrap the pipeline with retry and rescue for resilient processing:

ts
import { retry, rescue } from 'callbag-recharge/extra'

const resilientPipeline = pipe(
  source,
  filter(row => row.type === 'purchase'),
  map(row => transform(row)),
  retry(3),                            // retry on transient errors
  rescue(() => fromIter([])),          // fallback to empty on permanent failure
  bufferCount(100),
)

See Also

Released under the MIT License.