Skip to content

How to Build a Cron Pipeline in TypeScript (Airflow Alternative)

Schedule data pipelines with cron triggers, automatic retry, and diamond-safe aggregation — no infrastructure required.

The Problem

Airflow, Prefect, and Temporal require:

  • A separate server process or managed service
  • Python (or a complex SDK)
  • DAG definitions in a different language/format than your app code

For many pipelines, you just need: trigger on schedule → fetch data → aggregate → report. In the same TypeScript codebase.

The Solution

callbag-recharge's fromCron() is a reactive source that emits on schedule. Compose it with exhaustMap (ignore overlapping triggers), retry, and derived (diamond-safe aggregation).

ts
/**
 * Cron-triggered Pipeline (Airflow-in-TypeScript)
 *
 * Demonstrates: fromCron + exhaustMap + retry + derived for
 * a scheduled data pipeline with automatic retry and aggregation.
 */

import { derived, effect, pipe } from "callbag-recharge";
import { exhaustMap, firstValueFrom, fromPromise, fromTimer, retry } from "callbag-recharge/extra";
import { fromCron } from "callbag-recharge/orchestrate";

// ── Simulated data fetchers ──────────────────────────────────

function fetchBankTransactions(): Promise<number[]> {
	return Promise.resolve([100, 250, 75]);
}

function fetchCardCharges(): Promise<number[]> {
	return Promise.resolve([50, 120, 300]);
}

// ── Cron-triggered pipeline ──────────────────────────────────

// Trigger every minute (in production: '0 9 * * *' for 9am daily)
const trigger = fromCron("* * * * *");

// Each trigger runs the fetch (exhaustMap ignores overlapping triggers)
const bankData = pipe(
	trigger,
	exhaustMap(() => fromPromise(fetchBankTransactions())),
	retry(3), // retry up to 3 times on failure
);

const cardData = pipe(
	trigger,
	exhaustMap(() => fromPromise(fetchCardCharges())),
	retry(3),
);

// ── Diamond-safe aggregation ─────────────────────────────────

// derived() waits for both sources to resolve before computing — once, not twice
const aggregate = derived([bankData, cardData], () => {
	const bank = bankData.get() ?? [];
	const cards = cardData.get() ?? [];
	const all = [...bank, ...cards];
	return {
		total: all.reduce((a, b) => a + b, 0),
		count: all.length,
		sources: { bank: bank.length, cards: cards.length },
	};
});

// ── Report ───────────────────────────────────────────────────

const dispose = effect([aggregate], () => {
	const report = aggregate.get();
	if (report) {
		console.log(
			`Report: $${report.total} from ${report.count} transactions (${report.sources.bank} bank, ${report.sources.cards} card)`,
		);
	}
});

// Let the cron tick once, then clean up
firstValueFrom(fromTimer(62_000)).then(() => {
	dispose();
	console.log("Pipeline stopped.");
	process.exit(0);
});

console.log("Cron pipeline running (triggers every minute)...");

Why This Works

  1. fromCron('0 9 * * *') — emits on schedule with a zero-dependency cron parser. No external scheduler needed.

  2. exhaustMap() — if the previous fetch is still running when the next cron tick fires, the new trigger is ignored. No duplicate runs.

  3. retry(3) — automatically re-subscribes on error, up to 3 times. Transient failures self-heal.

  4. derived([bankData, cardData], fn) — diamond-safe aggregation. When both sources complete from the same trigger, the aggregate computes exactly once.

Adding Persistence

Make the pipeline survive restarts with checkpoint():

ts
import { checkpoint, sqliteAdapter } from 'callbag-recharge/utils'

const adapter = sqliteAdapter({ path: './pipeline.db' })

const bankData = pipe(
  trigger,
  exhaustMap(() => fromPromise(plaid.sync())),
  retry(3),
  checkpoint('bank-fetch', adapter), // persists last successful value
)

On restart, checkpoint() replays the last persisted value — downstream steps skip redundant computation.

Adding Execution Logging

ts
import { executionLog, memoryLogAdapter } from 'callbag-recharge/orchestrate'

const log = executionLog({ adapter: memoryLogAdapter() })
// Auto-logs every step event: started, completed, failed, retried

Full Pipeline Builder

For complex DAGs with multiple steps and dependencies:

ts
import { pipeline, step } from 'callbag-recharge/orchestrate'

const workflow = pipeline([
  step('fetch-bank', () => pipe(trigger, exhaustMap(() => fromPromise(plaid.sync())))),
  step('fetch-cards', () => pipe(trigger, exhaustMap(() => fromPromise(stripe.charges())))),
  step('aggregate', (bank, cards) => derived([bank, cards], () => merge(bank.get(), cards.get())), ['fetch-bank', 'fetch-cards']),
  step('report', (data) => effect([data], () => sendReport(data.get())), ['aggregate']),
])

See Also

Released under the MIT License.