How to Build a Reactive Data Pipeline in TypeScript
Transform, filter, batch, and aggregate streaming data using composable operators — with backpressure and type safety.
The Problem
ETL pipelines typically use batch frameworks (Airflow, Spark) or imperative loops. Both lack:
- Composability — steps are wired with glue code, not a type-safe API
- Backpressure — producers overwhelm consumers without flow control
- Reactivity — no way to observe intermediate state or derived metrics
The Solution
callbag-recharge treats every pipeline step as a composable store. pipe() chains operators that filter, transform, and batch — each step is inspectable and type-safe.
ts
/**
* Reactive Data Pipeline (ETL)
*
* Demonstrates: fromAsyncIter + pipe operators for streaming ETL.
* Filter, transform, batch, and write — fully reactive with backpressure.
*/
import { pipe } from "callbag-recharge";
import {
bufferCount,
filter,
forEach,
fromIter,
map,
scan,
subscribe,
} from "callbag-recharge/extra";
// ── Simulated database rows ──────────────────────────────────
interface Event {
id: number;
type: string;
amount: number;
timestamp: number;
}
const rawEvents: Event[] = [
{ id: 1, type: "purchase", amount: 42, timestamp: Date.now() },
{ id: 2, type: "pageview", amount: 0, timestamp: Date.now() },
{ id: 3, type: "purchase", amount: 99, timestamp: Date.now() },
{ id: 4, type: "signup", amount: 0, timestamp: Date.now() },
{ id: 5, type: "purchase", amount: 15, timestamp: Date.now() },
{ id: 6, type: "purchase", amount: 200, timestamp: Date.now() },
];
// ── Pipeline: filter → transform → batch → write ────────────
const source = fromIter(rawEvents);
const pipeline = pipe(
source,
// Keep only purchase events
filter((row: Event) => row.type === "purchase"),
// Transform: convert to cents
map((row: Event) => ({ ...row, amount: row.amount * 100 })),
// Batch into groups of 2 for bulk insert
bufferCount(2),
);
// ── Running total via scan ───────────────────────────────────
const purchases = pipe(
source,
filter((row: Event) => row.type === "purchase"),
map((row: Event) => row.amount),
scan((total, amount) => total + amount, 0),
);
// ── Execute ──────────────────────────────────────────────────
console.log("=== Batched writes ===");
const _unsub1 = forEach(pipeline, (batch) => {
console.log(
"Bulk insert:",
batch.map((r) => `$${r.amount / 100}`),
);
});
console.log("\n=== Running total ===");
const _unsub2 = subscribe(purchases, (total) => {
console.log(`Running total: $${total}`);
});Why This Works
pipe()composition — each operator is a pure function. The chain is type-checked end-to-end.filter()+map()— declarative transforms that read like the spec: "keep purchases, convert to cents."bufferCount(n)— batches emissions into groups of n for efficient bulk writes. No manual array management.scan()— running aggregation alongside the main pipeline. Both operate on the same source.- Inspectable —
Inspector.dumpGraph()shows every step, its current value, and edges.
Async Sources
Replace fromIter with fromAsyncIter for real async sources:
ts
import { fromAsyncIter } from 'callbag-recharge/extra'
// Database cursor
const rows = fromAsyncIter(db.query('SELECT * FROM events'))
// SSE stream
const events = fromAsyncIter(sseStream)
// File lines
const lines = fromAsyncIter(readline.createInterface({ input: fs.createReadStream('data.csv') }))Adding Error Handling
Wrap the pipeline with retry and rescue for resilient processing:
ts
import { retry, rescue } from 'callbag-recharge/extra'
const resilientPipeline = pipe(
source,
filter(row => row.type === 'purchase'),
map(row => transform(row)),
retry(3), // retry on transient errors
rescue(() => fromIter([])), // fallback to empty on permanent failure
bufferCount(100),
)See Also
- Cron Pipeline — schedule this pipeline on a cron trigger
- AI Chat with Streaming — streaming with auto-cancellation