AI Chat with Streaming
Build a streaming AI chat with auto-cancellation, chunk accumulation, and retry — using only Level 1-2 primitives.
The Problem
Every AI chat app needs:
- Streaming chunks from the LLM API accumulated into a full message
- Auto-cancellation — when the user sends a new prompt, cancel the in-flight response
- Retry — resilient API calls that recover from transient failures
- Reactive UI state — conversation history, loading indicator, token count
Most solutions use ad-hoc useState + useEffect + AbortController + refs. This recipe shows how callbag-recharge handles all of it in a declarative reactive graph.
The Solution
/**
* AI chat streaming — switchMap + scan (3 operators, no footguns)
*
* switchMap auto-cancels the previous stream when a new prompt arrives.
* scan accumulates chunks into a full response.
*
* With Option D3, switchMap is purely reactive — no eager evaluation,
* no undefined leak, no extra filter needed.
*
* Run: npx tsx examples/streaming.ts
*/
import { pipe, producer, state } from "callbag-recharge";
import {
filter,
firstValueFrom,
fromTimer,
scan,
subscribe,
switchMap,
} from "callbag-recharge/extra";
// Simulate a streaming LLM response
function fakeLLMStream(prompt: string, signal: AbortSignal): AsyncIterable<string> {
const words = `Response to "${prompt}": Hello world from the AI`.split(" ");
return {
async *[Symbol.asyncIterator]() {
for (const word of words) {
if (signal.aborted) return;
await firstValueFrom(fromTimer(50, signal));
if (signal.aborted) return;
yield `${word} `;
}
},
};
}
// User prompt — setting this auto-cancels any in-flight stream
const prompt = state("");
// Stream response with auto-cancellation via switchMap + scan
// Before D3: needed 5 operators (filter + switchMap + filter(undefined) + scan)
// After D3: 3 operators — switchMap is lazy, no undefined leak
const fullResponse = pipe(
prompt,
filter((p): p is string => p.length > 0),
switchMap((p) =>
producer<string>(({ emit, complete }) => {
const ctrl = new AbortController();
(async () => {
for await (const chunk of fakeLLMStream(p, ctrl.signal)) {
emit(chunk);
}
complete();
})();
return () => ctrl.abort(); // cleanup cancels the stream
}),
),
scan((acc, chunk) => acc + (chunk ?? ""), ""),
);
const unsub = subscribe(fullResponse, (text) => {
process.stdout.write(`\r${text}`);
});
// Start streaming
prompt.set("Tell me a joke");
// After the stream completes, clean up
firstValueFrom(fromTimer(500)).then(() => {
console.log("\n--- done ---");
unsub();
});Why This Works
pipe+switchMap— Each new prompt cancels the previous fetch viaAbortController. No manual cleanup. The cleanup function in the producer fires automatically.pipe+scan— Accumulates chunks into a growing string. Each chunk emission updatescurrentResponse, which updatestokenEstimateanddisplayHistoryvia the reactive graph.filter— Skips empty prompts andundefinedinitial values fromswitchMap.Diamond resolution —
displayHistorydepends on bothhistoryandisStreaming. When streaming ends and both update, the derived store recomputes exactly once with consistent values.Inspectable — Every store in this graph has a name and can be observed via
Inspector. You can see the full reactive graph, current values, and dirty/resolved phases at any time.
Adding Retry
Wrap the streaming pipe with retry for resilient API calls:
import { retry } from 'callbag-recharge/extra'
const resilientChunks = pipe(chunks, retry(3))
const currentResponse = pipe(resilientChunks, scan((acc, chunk) => acc + chunk, ''))The retry operator will re-subscribe to the producer up to 3 times on error, automatically re-triggering the fetch.
Framework Integration
This recipe is framework-agnostic. To connect to React:
// Minimal React hook (no external dependency)
function useStore<T>(store: Store<T>): T {
const [value, setValue] = useState(store.get())
useEffect(() => subscribe(store, setValue), [store])
return value
}
function ChatUI() {
const messages = useStore(displayHistory)
const streaming = useStore(isStreaming)
const tokens = useStore(tokenEstimate)
// ... render
}The same stores work with Vue (watchEffect), Svelte ($: blocks), Solid (createEffect), or no framework at all.