How I test AI agent frontends without calling the API once
Testing AI agent applications is broken.
Not the model calls — those you can mock. What nobody knows how to test is the streaming layer: the event sequence your frontend actually receives, the state transitions that happen across a multi-turn agent loop, the subtle timing between a tool_use and its tool_result. Most teams either skip this entirely or write flaky integration tests that hit the real API on every CI run.
There's a better way, and it comes from a realization that took us longer to arrive at than it should have.
A .jsonl recording is just a test fixture in disguise.
Once you see it that way, your production streams become a regression test suite you're building automatically, whether you meant to or not.
The problem with testing streaming frontends
Consider what you're actually testing when you write a test for an AI agent UI. You care about:
- Does
isStreamingflip tofalseafterdone? - Does
activeToolsclear whentool_resultarrives? - Does a
progressevent at 60% actually update the progress bar? - If the server crashes mid-stream, does the UI recover?
- If two tools run in parallel and the faster one resolves first, does the state machine handle out-of-order events correctly?
None of these are about the model's output. They're about your event handling logic — the state machine that consumes the stream. And yet, to test it, you either call the real API (slow, expensive, non-deterministic) or you mock fetch at the HTTP level (brittle, doesn't test real event sequences) or you just don't test it (most common).
The core insight: what you actually need is a deterministic source of real event sequences. Not mocked ones you invented — real ones that came from production, captured exactly as they happened, down to the millisecond timing between events.
That's what AgentStreamRecorder gives you.
Part 1: The Python side
Recording a stream
You're already recording every production stream with AgentStreamRecorder. Each session in production.jsonl looks like this:
{"session": "f3a2c1b0-...", "started_at": "2026-04-01T02:14:00+00:00", "t": 0}
{"t": 0.0, "event": "token", "data": {"text": "Here is what I found"}}
{"t": 0.052, "event": "tool_use", "data": {"tool_name": "web_search", "tool_use_id": "tu_1", "status": "running"}}
{"t": 0.891, "event": "tool_result","data": {"tool_name": "web_search", "tool_use_id": "tu_1", "duration_ms": 839, "status": "done"}}
{"t": 1.204, "event": "done", "data": {"num_turns": 1, "tool_count": 1, "duration_ms": 1204}}
Turning a recording into a pytest fixture
The load_sessions function from the CLI module reads any .jsonl file into a list of session dicts. From there, you can replay it as an async generator:
# tests/utils.py
import asyncio
import json
from pathlib import Path
from agent_stream.cli import load_sessions
async def replay_as_stream(path: Path, speed: float = 10_000.0):
"""Replay a .jsonl recording as an async SSE generator.
At speed=10_000 the 1.2s recording replays in ~0.12ms.
Use speed=1.0 to replay at real timing (rare in tests).
"""
sessions = load_sessions(path)
session = sessions[-1]
prev_t = 0.0
for record in session["events"]:
gap = (record["t"] - prev_t) / speed
if gap > 0:
await asyncio.sleep(gap)
prev_t = record["t"]
yield f"event: {record['event']}
data: {json.dumps(record['data'])}
"
Now your test fixture is just a file path:
# tests/conftest.py
import pytest
from pathlib import Path
@pytest.fixture
def tool_stream():
return Path("tests/fixtures/tool_stream.jsonl")
@pytest.fixture
def multi_turn_stream():
return Path("tests/fixtures/multi_turn.jsonl")
@pytest.fixture
def error_stream():
return Path("tests/fixtures/connection_drop.jsonl")
Testing your agent endpoint
Here's what this looks like against a real FastAPI endpoint:
# tests/test_chat_endpoint.py
import json
import pytest
from httpx import AsyncClient
from agent_stream.cli import load_sessions
from tests.utils import replay_as_stream
async def test_tool_stream_returns_all_events(app, tool_stream):
"""Every event in the recording should appear in the response."""
sessions = load_sessions(tool_stream)
expected_events = [e["event"] for e in sessions[-1]["events"]]
# Patch the agent to replay the recording instead of calling Anthropic
async def mock_agent(message: str):
async for sse in replay_as_stream(tool_stream):
yield sse
app.state.agent = mock_agent
async with AsyncClient(app=app, base_url="http://test") as client:
response_events = []
async with client.stream("POST", "/chat", json={"message": "test"}) as resp:
async for line in resp.aiter_lines():
if line.startswith("event: "):
response_events.append(line.removeprefix("event: "))
assert response_events == expected_events
The regression test workflow
Here's where it gets powerful. We had a bug in our activeTools tracking: when two tools ran in parallel, the faster one's tool_result could arrive before the slower one's tool_use, and the hook would fail to remove the right name from the array.
The bug was reported by a user. We couldn't reproduce it locally. We didn't know when it had first appeared.
With AgentStreamRecorder already running in production, we had the exact session. We extracted it:
# Find the session
agent-stream replay production.jsonl --list
# SESSION STARTED EVENTS DURATION TYPES
# f3a2c1b0-... 2026-03-28T14:22:11 14 6.2s token tool_use tool_use tool_result tool_result done
# Pull it to a fixture file
grep -B0 -A15 "f3a2c1b0" production.jsonl > tests/fixtures/parallel_tools_bug.jsonl
Then we wrote the regression test before touching the code:
async def test_parallel_tools_clear_correctly(parallel_tools_bug):
"""Regression: tool_result for faster tool arriving before slower tool's
tool_use should not corrupt activeTools state."""
sessions = load_sessions(parallel_tools_bug)
events = sessions[-1]["events"]
# Verify the bug condition exists in our fixture
tool_uses = [e for e in events if e["event"] == "tool_use"]
tool_results = [e for e in events if e["event"] == "tool_result"]
assert len(tool_uses) == 2
assert len(tool_results) == 2
# The fixture should have result arriving before second use
first_result_t = tool_results[0]["t"]
second_use_t = tool_uses[1]["t"]
assert first_result_t < second_use_t, "fixture doesn't reproduce the bug condition"
# After full replay, activeTools should be empty
active = []
for record in events:
if record["event"] == "tool_use" and record["data"].get("status") == "running":
active.append(record["data"]["tool_name"])
elif record["event"] == "tool_result":
name = record["data"]["tool_name"]
if name in active:
active.remove(name)
assert active == [], f"activeTools not cleared: {active}"
Test failed. We fixed the bug. Test passed. The .jsonl file stays in tests/fixtures/ forever as a regression guard.
Part 2: The React side
The Python side is straightforward because async generators are easy to swap. The React side is harder: useAgentStream calls fetch internally and reads a ReadableStream. You need to give it a fake fetch that produces a real SSE stream from your recording.
Building a mock SSE stream
// tests/utils/mockStream.ts
interface EventRecord {
t: number;
event: string;
data: Record<string, unknown>;
}
interface RecordedSession {
session: string;
started_at: string;
events: EventRecord[];
}
export function parseRecording(jsonl: string): RecordedSession {
const lines = jsonl.trim().split("
").map(l => JSON.parse(l));
const header = lines[0];
const events = lines.slice(1);
return { ...header, events };
}
export function createMockSSEStream(
events: EventRecord[],
speed = 10_000
): ReadableStream<Uint8Array> {
const encoder = new TextEncoder();
return new ReadableStream({
async start(controller) {
let prevT = 0;
for (const record of events) {
const gapMs = ((record.t - prevT) / speed) * 1000;
if (gapMs > 0) {
await new Promise(resolve => setTimeout(resolve, gapMs));
}
prevT = record.t;
const sse = `event: ${record.event}
data: ${JSON.stringify(record.data)}
`;
controller.enqueue(encoder.encode(sse));
}
controller.close();
},
});
}
export function mockFetchWithRecording(jsonl: string): void {
const session = parseRecording(jsonl);
vi.stubGlobal("fetch", vi.fn().mockResolvedValue({
ok: true,
status: 200,
body: createMockSSEStream(session.events),
} as unknown as Response));
}
Writing hook tests against recordings
// tests/useAgentStream.recording.test.ts
import { renderHook, act, waitFor } from "@testing-library/react";
import { readFileSync } from "fs";
import { useAgentStream } from "../src/useAgentStream";
import { mockFetchWithRecording } from "./utils/mockStream";
// Load fixture files once
const TOOL_STREAM = readFileSync("fixtures/tool_stream.jsonl", "utf-8");
const MULTI_TURN = readFileSync("fixtures/multi_turn.jsonl", "utf-8");
const ERROR_STREAM = readFileSync("fixtures/connection_drop.jsonl", "utf-8");
const PARALLEL_TOOLS = readFileSync("fixtures/parallel_tools_bug.jsonl", "utf-8");
describe("useAgentStream — recording-based tests", () => {
beforeEach(() => vi.restoreAllMocks());
it("sets isDone after done event", async () => {
mockFetchWithRecording(TOOL_STREAM);
const { result } = renderHook(() => useAgentStream());
await act(async () => {
result.current.startStream("/chat", { message: "test" });
});
await waitFor(() => expect(result.current.isDone).toBe(true));
expect(result.current.isStreaming).toBe(false);
});
it("accumulates text from token events", async () => {
mockFetchWithRecording(TOOL_STREAM);
const { result } = renderHook(() => useAgentStream());
await act(async () => {
result.current.startStream("/chat", { message: "test" });
});
await waitFor(() => expect(result.current.isDone).toBe(true));
// Text should be non-empty and match the tokens in the recording
expect(result.current.text.length).toBeGreaterThan(0);
});
it("clears activeTools when tool_result arrives", async () => {
mockFetchWithRecording(TOOL_STREAM);
const { result } = renderHook(() => useAgentStream());
const toolSnapshots: string[][] = [];
await act(async () => {
result.current.startStream("/chat", { message: "test" }, {
onToolUse: () => {
toolSnapshots.push([...result.current.activeTools]);
},
});
});
await waitFor(() => expect(result.current.isDone).toBe(true));
// After stream, no tools should be active
expect(result.current.activeTools).toEqual([]);
});
it("regression: parallel tools clear correctly", async () => {
// This fixture contains tool_result arriving before second tool_use —
// the exact sequence that caused the activeTools corruption bug
mockFetchWithRecording(PARALLEL_TOOLS);
const { result } = renderHook(() => useAgentStream());
await act(async () => {
result.current.startStream("/chat", { message: "test" });
});
await waitFor(() => expect(result.current.isDone).toBe(true));
expect(result.current.activeTools).toEqual([]);
});
it("recovers from connection drop without done event", async () => {
// This fixture was recorded from a session where the server crashed —
// no done event, just connection close
mockFetchWithRecording(ERROR_STREAM);
const { result } = renderHook(() => useAgentStream());
await act(async () => {
result.current.startStream("/chat", { message: "test" });
});
// Should still resolve, not hang
await waitFor(() => expect(result.current.isStreaming).toBe(false), {
timeout: 2000,
});
// isDone should be true via synthetic done
expect(result.current.isDone).toBe(true);
});
it("progress updates to correct percentage", async () => {
mockFetchWithRecording(MULTI_TURN);
const { result } = renderHook(() => useAgentStream());
const progressValues: number[] = [];
await act(async () => {
result.current.startStream("/chat", { message: "test" }, {
onProgress: (e) => progressValues.push(e.percentage),
});
});
await waitFor(() => expect(result.current.isDone).toBe(true));
// Progress should be monotonically increasing
for (let i = 1; i < progressValues.length; i++) {
expect(progressValues[i]).toBeGreaterThanOrEqual(progressValues[i - 1]);
}
// Final progress should be 100
expect(progressValues[progressValues.length - 1]).toBe(100);
});
});
Creating fixtures when you don't have production recordings yet
If you're starting fresh and don't have recordings yet, build inline fixtures manually — they're just JSONL strings:
// tests/fixtures/inline.ts
export const SIMPLE_TOOL_SESSION = `
{"session":"test-001","started_at":"2026-04-01T00:00:00+00:00","t":0}
{"t":0.0,"event":"token","data":{"text":"Searching for that"}}
{"t":0.05,"event":"tool_use","data":{"tool_name":"web_search","tool_use_id":"tu_1","input_summary":"query=test","status":"running"}}
{"t":0.89,"event":"tool_result","data":{"tool_name":"web_search","tool_use_id":"tu_1","output_summary":"3 results","duration_ms":839,"status":"done"}}
{"t":1.02,"event":"token","data":{"text":" — here are the results"}}
{"t":1.20,"event":"done","data":{"num_turns":1,"tool_count":1,"duration_ms":1200,"model":"claude-sonnet-4-6","total_cost_usd":0.004}}
`.trim();
export const CONNECTION_DROP_SESSION = `
{"session":"test-002","started_at":"2026-04-01T00:00:00+00:00","t":0}
{"t":0.0,"event":"token","data":{"text":"Let me check that"}}
{"t":0.05,"event":"tool_use","data":{"tool_name":"web_search","tool_use_id":"tu_1","input_summary":"query=test","status":"running"}}
`.trim();
// Note: no done event — simulates server crash
The CONNECTION_DROP_SESSION fixture tests your synthetic-done fallback every single CI run, for free, deterministically.
What this unlocks
Once you have recordings as test fixtures, a few things change:
CI requires zero API keys. Your entire test suite runs without touching any LLM provider. No rate limits, no cost, no flakiness from model behavior changes. The recordings are checked into git, they run identically on every developer's machine and in every CI job.
Bugs arrive pre-packaged as regression tests. When a user reports something broke, you ask for the session ID (or pull it from your recording file by timestamp). The .jsonl is the reproduction case. You don't need to reproduce it — it's already reproduced. You write the failing assertion, fix the code, and the recording stays in tests/fixtures/ as a permanent guard.
You can test timing-sensitive behavior. Real recordings capture the actual millisecond intervals between events. If you need to test what happens when a tool call takes 4 seconds (timeout behavior, spinner state, etc.), you don't need to sleep(4) in your test — you have a real recording where it took 4 seconds, and you replay it at 1x speed.
Behavioral coverage grows automatically. Every unusual production session — rare event sequences, edge case tool combinations, partial streams — becomes a potential test fixture. The set of behaviors you're testing grows as your product is used, without anyone writing new tests.
The production → fixture workflow
To make this routine:
# Morning after a bug report
agent-stream replay production.jsonl --list
# Find the timestamp that matches the report
# SESSION STARTED EVENTS DURATION TYPES
# f3a2c1b0-... 2026-04-01T02:14:11 14 6.2s token tool_use tool_use tool_result tool_result done
# Extract to a named fixture
grep -m1 -A100 "f3a2c1b0" production.jsonl | head -15 > tests/fixtures/parallel_tools_bug.jsonl
# Replay it locally to confirm you have the right session
agent-stream replay tests/fixtures/parallel_tools_bug.jsonl --list
# Write the test (red), fix the code (green), commit both
The fixture file goes in version control alongside the test. Anyone who clones the repo gets the reproduction case. The CI run that first passes on this test is the proof of fix.
Getting started
pip install agent-event-stream
npm install @agent-stream/react
Add two lines to your FastAPI endpoint to start recording:
from agent_stream.recorder import AgentStreamRecorder
recorder = AgentStreamRecorder("production.jsonl")
@app.post("/chat")
async def chat(req: ChatRequest):
async def generate():
async for sse_str in recorder.record(run_agent(req.message)):
yield sse_str
return agent_stream_response(generate())
After a day in production, you'll have enough sessions to build your first fixture set. After a month, you'll have coverage for behaviors you never thought to test manually.



