How to Build a Cron Pipeline in TypeScript (Airflow Alternative)
Schedule data pipelines with cron triggers, automatic retry, and diamond-safe aggregation — no infrastructure required.
The Problem
Airflow, Prefect, and Temporal require:
- A separate server process or managed service
- Python (or a complex SDK)
- DAG definitions in a different language/format than your app code
For many pipelines, you just need: trigger on schedule → fetch data → aggregate → report. In the same TypeScript codebase.
The Solution
callbag-recharge's fromCron() is a reactive source that emits on schedule. Compose it with exhaustMap (ignore overlapping triggers), retry, and derived (diamond-safe aggregation).
/**
* Cron-triggered Pipeline (Airflow-in-TypeScript)
*
* Demonstrates: fromCron + exhaustMap + retry + derived for
* a scheduled data pipeline with automatic retry and aggregation.
*/
import { derived, effect, pipe } from "callbag-recharge";
import { exhaustMap, firstValueFrom, fromPromise, fromTimer, retry } from "callbag-recharge/extra";
import { fromCron } from "callbag-recharge/orchestrate";
// ── Simulated data fetchers ──────────────────────────────────
function fetchBankTransactions(): Promise<number[]> {
return Promise.resolve([100, 250, 75]);
}
function fetchCardCharges(): Promise<number[]> {
return Promise.resolve([50, 120, 300]);
}
// ── Cron-triggered pipeline ──────────────────────────────────
// Trigger every minute (in production: '0 9 * * *' for 9am daily)
const trigger = fromCron("* * * * *");
// Each trigger runs the fetch (exhaustMap ignores overlapping triggers)
const bankData = pipe(
trigger,
exhaustMap(() => fromPromise(fetchBankTransactions())),
retry(3), // retry up to 3 times on failure
);
const cardData = pipe(
trigger,
exhaustMap(() => fromPromise(fetchCardCharges())),
retry(3),
);
// ── Diamond-safe aggregation ─────────────────────────────────
// derived() waits for both sources to resolve before computing — once, not twice
const aggregate = derived([bankData, cardData], () => {
const bank = bankData.get() ?? [];
const cards = cardData.get() ?? [];
const all = [...bank, ...cards];
return {
total: all.reduce((a, b) => a + b, 0),
count: all.length,
sources: { bank: bank.length, cards: cards.length },
};
});
// ── Report ───────────────────────────────────────────────────
const dispose = effect([aggregate], () => {
const report = aggregate.get();
if (report) {
console.log(
`Report: $${report.total} from ${report.count} transactions (${report.sources.bank} bank, ${report.sources.cards} card)`,
);
}
});
// Let the cron tick once, then clean up
firstValueFrom(fromTimer(62_000)).then(() => {
dispose();
console.log("Pipeline stopped.");
process.exit(0);
});
console.log("Cron pipeline running (triggers every minute)...");Why This Works
fromCron('0 9 * * *')— emits on schedule with a zero-dependency cron parser. No external scheduler needed.exhaustMap()— if the previous fetch is still running when the next cron tick fires, the new trigger is ignored. No duplicate runs.retry(3)— automatically re-subscribes on error, up to 3 times. Transient failures self-heal.derived([bankData, cardData], fn)— diamond-safe aggregation. When both sources complete from the same trigger, the aggregate computes exactly once.
Adding Persistence
Make the pipeline survive restarts with checkpoint():
import { checkpoint, sqliteAdapter } from 'callbag-recharge/utils'
const adapter = sqliteAdapter({ path: './pipeline.db' })
const bankData = pipe(
trigger,
exhaustMap(() => fromPromise(plaid.sync())),
retry(3),
checkpoint('bank-fetch', adapter), // persists last successful value
)On restart, checkpoint() replays the last persisted value — downstream steps skip redundant computation.
Adding Execution Logging
import { executionLog, memoryLogAdapter } from 'callbag-recharge/orchestrate'
const log = executionLog({ adapter: memoryLogAdapter() })
// Auto-logs every step event: started, completed, failed, retriedFull Pipeline Builder
For complex DAGs with multiple steps and dependencies:
import { pipeline, step } from 'callbag-recharge/orchestrate'
const workflow = pipeline([
step('fetch-bank', () => pipe(trigger, exhaustMap(() => fromPromise(plaid.sync())))),
step('fetch-cards', () => pipe(trigger, exhaustMap(() => fromPromise(stripe.charges())))),
step('aggregate', (bank, cards) => derived([bank, cards], () => merge(bank.get(), cards.get())), ['fetch-bank', 'fetch-cards']),
step('report', (data) => effect([data], () => sendReport(data.get())), ['aggregate']),
])See Also
- Data Pipeline — ETL without scheduling
- Real-Time Dashboard — reactive metrics from pipeline output