How to Manage On-Device LLM Streaming State
Stream tokens from a local LLM (Ollama, WebLLM, ExecuTorch) into reactive state with auto-cancellation, chunk accumulation, and derived metrics.
The Problem
On-device / edge LLM inference is production-ready (WebGPU in all browsers, Ollama as local standard, ExecuTorch on mobile). But managing the streaming state is manual:
- Token-by-token accumulation into a response string
- Cancelling in-flight inference when the user sends a new prompt
- Tracking metrics (token count, latency, streaming status)
- Handling errors from model loading failures or OOM
Every framework reimplements this with useState + useEffect + AbortController + refs.
The Solution
callbag-recharge treats the LLM token stream as a reactive producer. switchMap auto-cancels previous inference. scan accumulates tokens. derived computes metrics. All observable via Inspector.
/**
* On-Device LLM Streaming State
*
* Demonstrates: Managing token streams from a local LLM (Ollama, WebLLM)
* as reactive sources with auto-cancellation and chunk accumulation.
* Works with any OpenAI-compatible streaming endpoint.
*/
import { derived, pipe, state } from "callbag-recharge";
import { filter, scan, subscribe, switchMap } from "callbag-recharge/extra";
import { fromAbortable } from "callbag-recharge/utils/cancellableStream";
// ── State ────────────────────────────────────────────────────
const prompt = state("", { name: "prompt" });
const isStreaming = state(false, { name: "streaming" });
// ── Token stream from local model ───────────────────────────
// switchMap auto-cancels previous inference when a new prompt arrives
const tokens = pipe(
prompt,
filter((p: string) => p.length > 0),
switchMap((p: string) => {
isStreaming.set(true);
return fromAbortable<string>(
async function* (signal) {
// Works with Ollama (localhost:11434) or any OpenAI-compatible endpoint
const res = await fetch("http://localhost:11434/api/generate", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ model: "llama4", prompt: p, stream: true }),
signal,
});
if (!res.ok || !res.body) {
throw new Error(`HTTP ${res.status}: ${res.statusText}`);
}
const reader = res.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const lines = decoder.decode(value, { stream: true }).split("\n").filter(Boolean);
for (const line of lines) {
try {
const chunk = JSON.parse(line);
if (chunk.response) yield chunk.response;
if (chunk.done) return;
} catch {
/* skip malformed lines */
}
}
}
},
{
name: "llm-tokens",
onComplete: () => isStreaming.set(false),
onError: () => isStreaming.set(false),
onAbort: () => isStreaming.set(false),
},
);
}),
);
// ── Accumulated response ─────────────────────────────────────
const response = pipe(
tokens,
filter((t): t is string => t !== undefined),
scan((acc: string, token: string) => acc + token, ""),
);
// ── Derived metrics ──────────────────────────────────────────
const tokenCount = derived(
[response],
() => (response.get() ?? "").split(/\s+/).filter(Boolean).length,
{ name: "tokenCount" },
);
const _charCount = derived([response], () => (response.get() ?? "").length, { name: "charCount" });
// ── Usage ────────────────────────────────────────────────────
subscribe(response, (text) => {
process.stdout.write(`\r${text ?? ""}`);
});
subscribe(tokenCount, (_count) => {
// Token count updates reactively as response grows
});
// Send a prompt — previous inference auto-cancels
prompt.set("Explain reactive programming in one paragraph");
// Send another prompt — previous one is automatically cancelled
// prompt.set('What is the callbag protocol?')Why This Works
producer()wraps any streaming API — Ollama's HTTP streaming, WebLLM'sChatModule.generate(), or any OpenAI-compatible endpoint. The cleanup function aborts in-flight inference.switchMapauto-cancels — when the user sends a new prompt, the previous inference is cancelled viaAbortController. No manual cleanup, no race conditions.scanaccumulates tokens — each token emission grows the response string. The accumulated value is always current.derivedcomputes metrics — token count, character count, and any other derived state update reactively as the response grows.
WebLLM Integration
For browser-based inference via WebLLM:
import { producer } from 'callbag-recharge'
const webllmTokens = producer<string>(({ emit, complete, error }) => {
let engine: any
;(async () => {
engine = await webllm.CreateMLCEngine('Llama-4-Scout-17B-16E-Instruct-q4f16_1-MLC')
const stream = await engine.chat.completions.create({
messages: [{ role: 'user', content: prompt.get() }],
stream: true,
})
for await (const chunk of stream) {
const delta = chunk.choices[0]?.delta?.content
if (delta) emit(delta)
}
complete()
})().catch(e => error(e))
return () => engine?.interruptGenerate()
})Context Window as Derived Computation
Track token budget reactively:
const contextLimit = 4096
const systemPromptTokens = 200
const history = state<Message[]>([])
const historyTokens = derived([history], () =>
history.get().reduce((sum, msg) => sum + estimateTokens(msg.content), 0)
)
const availableTokens = derived(
[historyTokens],
() => contextLimit - systemPromptTokens - historyTokens.get()
)
const shouldTruncate = derived(
[availableTokens],
() => availableTokens.get() < 100
)Error Handling
Wrap with retry and rescue for resilient inference:
import { retry, rescue } from 'callbag-recharge/extra'
const resilientTokens = pipe(
tokens,
retry(2), // retry model loading failures
rescue(() => cloudFallbackTokens), // fall back to cloud API
)See Also
- Hybrid Cloud+Edge Routing — route between local and cloud models
- Tool Calls for Local LLMs — reactive tool call lifecycle
- AI Chat with Streaming — cloud LLM streaming pattern