Skip to content

How to Manage On-Device LLM Streaming State

Stream tokens from a local LLM (Ollama, WebLLM, ExecuTorch) into reactive state with auto-cancellation, chunk accumulation, and derived metrics.

The Problem

On-device / edge LLM inference is production-ready (WebGPU in all browsers, Ollama as local standard, ExecuTorch on mobile). But managing the streaming state is manual:

  • Token-by-token accumulation into a response string
  • Cancelling in-flight inference when the user sends a new prompt
  • Tracking metrics (token count, latency, streaming status)
  • Handling errors from model loading failures or OOM

Every framework reimplements this with useState + useEffect + AbortController + refs.

The Solution

callbag-recharge treats the LLM token stream as a reactive producer. switchMap auto-cancels previous inference. scan accumulates tokens. derived computes metrics. All observable via Inspector.

ts
/**
 * On-Device LLM Streaming State
 *
 * Demonstrates: Managing token streams from a local LLM (Ollama, WebLLM)
 * as reactive sources with auto-cancellation and chunk accumulation.
 * Works with any OpenAI-compatible streaming endpoint.
 */

import { derived, pipe, state } from "callbag-recharge";
import { filter, scan, subscribe, switchMap } from "callbag-recharge/extra";
import { fromAbortable } from "callbag-recharge/utils/cancellableStream";

// ── State ────────────────────────────────────────────────────

const prompt = state("", { name: "prompt" });
const isStreaming = state(false, { name: "streaming" });

// ── Token stream from local model ───────────────────────────

// switchMap auto-cancels previous inference when a new prompt arrives
const tokens = pipe(
	prompt,
	filter((p: string) => p.length > 0),
	switchMap((p: string) => {
		isStreaming.set(true);
		return fromAbortable<string>(
			async function* (signal) {
				// Works with Ollama (localhost:11434) or any OpenAI-compatible endpoint
				const res = await fetch("http://localhost:11434/api/generate", {
					method: "POST",
					headers: { "Content-Type": "application/json" },
					body: JSON.stringify({ model: "llama4", prompt: p, stream: true }),
					signal,
				});
				if (!res.ok || !res.body) {
					throw new Error(`HTTP ${res.status}: ${res.statusText}`);
				}
				const reader = res.body.getReader();
				const decoder = new TextDecoder();
				while (true) {
					const { done, value } = await reader.read();
					if (done) break;
					const lines = decoder.decode(value, { stream: true }).split("\n").filter(Boolean);
					for (const line of lines) {
						try {
							const chunk = JSON.parse(line);
							if (chunk.response) yield chunk.response;
							if (chunk.done) return;
						} catch {
							/* skip malformed lines */
						}
					}
				}
			},
			{
				name: "llm-tokens",
				onComplete: () => isStreaming.set(false),
				onError: () => isStreaming.set(false),
				onAbort: () => isStreaming.set(false),
			},
		);
	}),
);

// ── Accumulated response ─────────────────────────────────────

const response = pipe(
	tokens,
	filter((t): t is string => t !== undefined),
	scan((acc: string, token: string) => acc + token, ""),
);

// ── Derived metrics ──────────────────────────────────────────

const tokenCount = derived(
	[response],
	() => (response.get() ?? "").split(/\s+/).filter(Boolean).length,
	{ name: "tokenCount" },
);

const _charCount = derived([response], () => (response.get() ?? "").length, { name: "charCount" });

// ── Usage ────────────────────────────────────────────────────

subscribe(response, (text) => {
	process.stdout.write(`\r${text ?? ""}`);
});

subscribe(tokenCount, (_count) => {
	// Token count updates reactively as response grows
});

// Send a prompt — previous inference auto-cancels
prompt.set("Explain reactive programming in one paragraph");

// Send another prompt — previous one is automatically cancelled
// prompt.set('What is the callbag protocol?')

Why This Works

  1. producer() wraps any streaming API — Ollama's HTTP streaming, WebLLM's ChatModule.generate(), or any OpenAI-compatible endpoint. The cleanup function aborts in-flight inference.

  2. switchMap auto-cancels — when the user sends a new prompt, the previous inference is cancelled via AbortController. No manual cleanup, no race conditions.

  3. scan accumulates tokens — each token emission grows the response string. The accumulated value is always current.

  4. derived computes metrics — token count, character count, and any other derived state update reactively as the response grows.

WebLLM Integration

For browser-based inference via WebLLM:

ts
import { producer } from 'callbag-recharge'

const webllmTokens = producer<string>(({ emit, complete, error }) => {
  let engine: any
  ;(async () => {
    engine = await webllm.CreateMLCEngine('Llama-4-Scout-17B-16E-Instruct-q4f16_1-MLC')
    const stream = await engine.chat.completions.create({
      messages: [{ role: 'user', content: prompt.get() }],
      stream: true,
    })
    for await (const chunk of stream) {
      const delta = chunk.choices[0]?.delta?.content
      if (delta) emit(delta)
    }
    complete()
  })().catch(e => error(e))
  return () => engine?.interruptGenerate()
})

Context Window as Derived Computation

Track token budget reactively:

ts
const contextLimit = 4096
const systemPromptTokens = 200

const history = state<Message[]>([])
const historyTokens = derived([history], () =>
  history.get().reduce((sum, msg) => sum + estimateTokens(msg.content), 0)
)
const availableTokens = derived(
  [historyTokens],
  () => contextLimit - systemPromptTokens - historyTokens.get()
)
const shouldTruncate = derived(
  [availableTokens],
  () => availableTokens.get() < 100
)

Error Handling

Wrap with retry and rescue for resilient inference:

ts
import { retry, rescue } from 'callbag-recharge/extra'

const resilientTokens = pipe(
  tokens,
  retry(2),                              // retry model loading failures
  rescue(() => cloudFallbackTokens),     // fall back to cloud API
)

See Also

Released under the MIT License.