pipeline()
Declarative workflow builder. Wire steps into a DAG with automatic status tracking.
Use task() for work steps and step() only for source steps (triggers, cron). Status is automatically derived from task() steps — no manual wiring needed.
Signature
ts
function pipeline<S extends Record<string, StepDef>>(
steps: S,
opts?: { name?: string; tasks?: Record<string, TaskState<any>> },
): PipelineResult<S>Parameters
| Parameter | Type | Description |
|---|---|---|
steps | S | Record of step name → StepDef. Use task() for work, step() for sources. |
opts | { name?: string; tasks?: Record<string, TaskState<any>> } | Optional configuration: name (Inspector prefix), tasks (extra TaskState instances to fold into aggregate status when they are not attached to a task() step). |
Returns
PipelineResult<S> — step stores, status, reset/destroy, and inner callbag details.
| Method | Signature | Description |
|---|---|---|
steps | Record | Access step stores by name. |
status | Store\<PipelineStatus\> | Pipeline status: idle → active → completed/errored. |
reset(opts?) | (opts?: \{ resetExternalTasks?: boolean \}) => void | Reset all steps and tasks to idle. External tasks reset by default; pass { resetExternalTasks: false } to skip. |
destroy() | () => void | Dispose subscriptions and destroy auto-detected task states. |
inner | PipelineInner | Expert-level stream internals (streamStatus, stepMeta, order). |
Basic Usage
ts
import { pipeline, step, task, fromTrigger } from 'callbag-recharge/orchestrate';
const wf = pipeline({
trigger: step(fromTrigger<string>()),
fetch: task(["trigger"], async (signal, [v]) => fetchData(v), { retry: 3 }),
process: task(["fetch"], async (signal, [data]) => transform(data)),
});
wf.steps.trigger.fire("go");
wf.status.get(); // "idle" → "active" → "completed"Options / Behavior Details
- Auto-wiring: Step deps are resolved by name. Factory functions receive dep stores in declared order.
- Topological sort: Steps are wired in dependency order. Cycles are detected and throw.
- Auto status: When using
task()steps,statusautomatically tracks work execution (idle → active → completed/errored). Falls back to stream lifecycle tracking when no tasks are detected. - Skip propagation: When a task's upstream deps all reach terminal states (success/error/skipped) with at least one non-success, the pipeline automatically marks the idle downstream task as "skipped". This cascades transitively through the DAG.
- opts.tasks: Pass additional
TaskStatestores sostatusreflects work outsidetask()-wrapped steps (e.g. UI demos that runtaskStatemanually). Duplicates are deduped with auto-detected task states. Note:destroy()does NOT destroy externally providedopts.tasks— the caller owns their lifecycle. - Destroy ownership:
destroy()tears down subscriptions, destroys auto-detectedtask()states, and invalidates approval controls. Externally providedopts.tasksare left alive since the caller owns them. - Branch support: Use
branch()steps with compound deps like"validate.fail"for conditional routing.