APIを一度も呼び出さずにAIエージェントのフロントエンドをテストする方法
AIエージェントアプリのテストは、めちゃくちゃです。
モデルの呼び出しのことではありません――それらはモックできます。誰もが分からないのは、ストリーミング層のテスト方法です。つまり、フロントエンドが実際に受け取るイベントのシーケンス、複数ターンのエージェントループをまたいで起きる状態遷移、そして tool_use とその tool_result の間の微妙なタイミングです。ほとんどのチームはこれを完全に飛ばすか、毎回のCI実行で実際のAPIを叩く、不安定になりがちな統合テストを書いています。
もっと良い方法があり、それは、私たちが本来より長く時間をかけてたどり着いた気づきから来ています。
.jsonl の記録は、見た目はテストフィクスチャそのものです。
そうと気づいた瞬間に、あなたの本番ストリームは、意図したかどうかに関係なく、 自動的に構築していく回帰テストスイートになります。
ストリーミングフロントエンドをテストする上での問題点
AIエージェントUIのテストを書くとき、あなたが実際にテストしているのは何でしょう。あなたが気にしているのは:
doneの後にisStreamingがfalseに切り替わるか?tool_resultが届いたときactiveToolsはクリアされるか?- 60%の
progressイベントは、実際にプログレスバーを更新するか? - サーバがストリーム途中でクラッシュしたら、UIは復旧できるか?
- 2つのツールが並列に動き、より速い方が先に解決した場合、状態機械は順不同のイベントを正しく処理できるか?
これらはモデルの出力に関するものではありません。ストリームを消費する、あなたのイベント処理ロジック――ストレートに言えば、その状態機械のことです。にもかかわらず、それをテストするために、あなたは実際のAPIを呼び出す(遅い・高価・非決定的)、HTTPレベルで fetch をモックする(脆い・実際のイベントシーケンスをテストできない)、あるいはそもそもテストしない(最もよくある)という選択をしています。
核心となる洞察: 本当に必要なのは、決定的なソースとなる“実際のイベントシーケンス”です。 あなたがでっち上げたモックではありません――本番から生まれ、出来事が起きたそのままの順序で、イベント間のミリ秒単位のタイミングまで含めて記録された“実際のもの”が必要です。
それを提供してくれるのが AgentStreamRecorder です。
パート1:Python側
ストリームを記録する
あなたはすでに、AgentStreamRecorder で本番のすべてのストリームを記録しています。production.jsonl の各セッションは次のようになっています:
{"session": "f3a2c1b0-...", "started_at": "2026-04-01T02:14:00+00:00", "t": 0}
{"t": 0.0, "event": "token", "data": {"text": "Here is what I found"}}
{"t": 0.052, "event": "tool_use", "data": {"tool_name": "web_search", "tool_use_id": "tu_1", "status": "running"}}
{"t": 0.891, "event": "tool_result","data": {"tool_name": "web_search", "tool_use_id": "tu_1", "duration_ms": 839, "status": "done"}}
{"t": 1.204, "event": "done", "data": {"num_turns": 1, "tool_count": 1, "duration_ms": 1204}}
記録をpytestフィクスチャに変換する
CLIモジュールの load_sessions 関数は、どんな .jsonl ファイルでも読み込み、セッションのdictのリストにします。そこから、それを非同期ジェネレータとして再生できます:
# tests/utils.py
import asyncio
import json
from pathlib import Path
from agent_stream.cli import load_sessions
async def replay_as_stream(path: Path, speed: float = 10_000.0):
""".jsonl の記録を非同期 SSE ジェネレータとして再生します。
speed=10_000 のとき、1.2s の記録は約 0.12ms で再生されます。
テストでは、実際のタイミングで再生するために speed=1.0 を使います(まれです)。
"""
sessions = load_sessions(path)
session = sessions[-1]
prev_t = 0.0
for record in session["events"]:
gap = (record["t"] - prev_t) / speed
if gap > 0:
await asyncio.sleep(gap)
prev_t = record["t"]
yield f"event: {record['event']}
data: {json.dumps(record['data'])}
"
これで、テストフィクスチャは単なるファイルパスになります:
# tests/conftest.py
import pytest
from pathlib import Path
@pytest.fixture
def tool_stream():
return Path("tests/fixtures/tool_stream.jsonl")
@pytest.fixture
def multi_turn_stream():
return Path("tests/fixtures/multi_turn.jsonl")
返却形式: {"translated": "翻訳されたHTML"}@pytest.fixture
def error_stream():
return Path("tests/fixtures/connection_drop.jsonl")
エージェントのエンドポイントをテストする
これが、実際の FastAPI エンドポイントに対して動かすとどう見えるかです:
# tests/test_chat_endpoint.py
import json
import pytest
from httpx import AsyncClient
from agent_stream.cli import load_sessions
from tests.utils import replay_as_stream
async def test_tool_stream_returns_all_events(app, tool_stream):
"""記録されたすべてのイベントは、レスポンスに現れるはずです。"""
sessions = load_sessions(tool_stream)
expected_events = [e["event"] for e in sessions[-1]["events"]]
# Anthropic を呼び出す代わりに、記録をリプレイするようにエージェントをパッチする
async def mock_agent(message: str):
async for sse in replay_as_stream(tool_stream):
yield sse
app.state.agent = mock_agent
async with AsyncClient(app=app, base_url="http://test") as client:
response_events = []
async with client.stream("POST", "/chat", json={"message": "test"}) as resp:
async for line in resp.aiter_lines():
if line.startswith("event: "):
response_events.append(line.removeprefix("event: "))
assert response_events == expected_events
リグレッションテストのワークフロー
ここが強力なところです。私たちは activeTools の追跡にバグがありました。つまり、2 つのツールが並行して実行されたとき、速い側の tool_result が、遅い側の tool_use より先に到着し得て、その結果フックが配列から正しい名前を削除できなくなっていました。
そのバグはユーザーから報告されました。私たちはローカルでは再現できませんでした。また、それが最初にいつ現れたのかも分かっていませんでした。
AgentStreamRecorder がすでに本番で動作していたため、完全に同じセッションが手元にありました。それを抽出しました:
# セッションを探す
agent-stream replay production.jsonl --list
# SESSION STARTED EVENTS DURATION TYPES
# f3a2c1b0-... 2026-03-28T14:22:11 14 6.2s token tool_use tool_use tool_result tool_result done
# フィクスチャファイルに取り出す
grep -B0 -A15 "f3a2c1b0" production.jsonl > tests/fixtures/parallel_tools_bug.jsonl
そして、コードに触る前にリグレッションテストを書きました:
async def test_parallel_tools_clear_correctly(parallel_tools_bug):
"""リグレッション:速いツールの tool_result が遅いツールの tool_use より先に到着したとき"
tool_use が activeTools の状態を壊してはなりません。"""
sessions = load_sessions(parallel_tools_bug)
events = sessions[-1]["events"]
# バグの条件がフィクスチャ内に存在することを確認する tool_uses = [e for e in events if e["event"] == "tool_use"]
tool_results = [e for e in events if e["event"] == "tool_result"]
assert len(tool_uses) == 2
assert len(tool_results) == 2
# フィクスチャでは、2回目の使用より前に結果が到着しているはず
first_result_t = tool_results[0]["t"]
second_use_t = tool_uses[1]["t"]
assert first_result_t < second_use_t, "fixture doesn't reproduce the bug condition"
# 完全なリプレイの後、activeTools は空であるべき
active = []
for record in events:
if record["event"] == "tool_use"and record["data"].get("status") == "running":
active.append(record["data"]["tool_name"])
elif record["event"] == "tool_result":
name = record["data"]["tool_name"]
if name in active:
active.remove(name)
assert active == [], f"activeTools not cleared: {active}"
テストに失敗しました。バグを修正しました。テストは成功しました。.jsonl ファイルは回帰(リグレッション)を防ぐため、tests/fixtures/ に永遠に置かれます。
Part 2: React側
Python側はシンプルです。非同期ジェネレータは簡単に差し替えられるからです。React側は難しいです。useAgentStream は内部で fetch を呼び出し、ReadableStream を読み取ります。記録から本物のSSEストリームを生成する、偽の fetch を用意する必要があります。
モックSSEストリームを構築する
// tests/utils/mockStream.ts
interface EventRecord {
t: number;
event: string;
data: Record<string, unknown>;
}
interface RecordedSession {
session: string;
started_at: string;
events: EventRecord[];
}
返却形式: {"translated": "翻訳されたHTML"}export function parseRecording(jsonl: string): RecordedSession {
const lines = jsonl.trim().split("
").map((l => JSON.parse(l));
const header = lines[0];
const events = lines.slice(1);
return { ...header, events };
}
export function createMockSSEStream(
events: EventRecord[],
speed = 10_000
): ReadableStream<Uint8Array> {
const encoder = new TextEncoder();
return new ReadableStream({
async start(controller) {
let prevT = 0;
for (const record of events) {
const gapMs = ((record.t - prevT) / speed) * 1000;
if (gapMs > 0) {
await new Promise(resolve => setTimeout(resolve, gapMs));
}
prevT = record.t;
const sse = `event: ${record.event}
data: ${JSON.stringify(record.data)}
`;
controller.enqueue(encoder.encode(sse));
}
controller.close();
},
});
}
export function mockFetchWithRecording(jsonl: string): void {
const session = parseRecording(jsonl);
vi.stubGlobal("fetch", vi.fn().mockResolvedValue({
ok: true,
status: 200,
body: createMockSSEStream(session.events),
} as unknown as Response));
}
記録に対するフックテストの作成
返却形式: {"translated": "翻訳されたHTML"}// tests/useAgentStream.recording.test.ts
import { renderHook, act, waitFor } from "@testing-library/react";
import { readFileSync } from "fs";
import { useAgentStream } from "../src/useAgentStream";
import { mockFetchWithRecording } from "./utils/mockStream";
// フィクスチャファイルを一度だけ読み込む
const TOOL_STREAM = readFileSync("fixtures/tool_stream.jsonl", "utf-8");
const MULTI_TURN = readFileSync("fixtures/multi_turn.jsonl", "utf-8");
const ERROR_STREAM = readFileSync("fixtures/connection_drop.jsonl", "utf-8");
const PARALLEL_TOOLS = readFileSync("fixtures/parallel_tools_bug.jsonl", "utf-8");
describe("useAgentStream — 記録ベースのテスト", () => {
beforeEach(() => vi.restoreAllMocks());
it("done イベントの後に isDone を設定する", async () => {
mockFetchWithRecording(TOOL_STREAM);
const { result } = renderHook(() => useAgentStream());
await act(async () => {
result.current.startStream("/chat", { message: "test" });
});
await waitFor(() => expect(result.current.isDone).toBe(true));
expect(result.current.isStreaming).toBe(false);
});
it("トークンイベントからテキストを蓄積する", async () => {
mockFetchWithRecording(TOOL_STREAM);
const { result } = renderHook(() => useAgentStream());
await act(async () => {
result.current.startStream("/chat", { message: "test" });
});
await waitFor(() => expect(result.current.isDone).toBe(true));
// テキストは空でなく、記録内のトークンと一致している必要がある
expect(result.current.text.length).toBeGreaterThan(0);
});it("tool_resultが到着するとactiveToolsをクリアする", async () => {
mockFetchWithRecording(TOOL_STREAM);
const { result } = renderHook(() => useAgentStream());
const toolSnapshots: string[][] = [];
await act(async () => {
result.current.startStream("/chat", { message: "test" }, {
onToolUse: () => {
toolSnapshots.push([...result.current.activeTools]);
},
});
});
await waitFor(() => expect(result.current.isDone).toBe(true));
// ストリーム後、ツールはどれもアクティブでないはず
expect(result.current.activeTools).toEqual([]);
});
it("回帰: 並列ツールが正しくクリアされる", async () => {
// このフィクスチャには、2つ目のtool_useの前にtool_resultが到着する —
// activeToolsの破損バグが発生した原因となる、問題の正確なシーケンスが含まれています
mockFetchWithRecording(PARALLEL_TOOLS);
const { result } = renderHook(() => useAgentStream());
await act(async () => {
result.current.startStream("/chat", { message: "test" });
});
await waitFor(() => expect(result.current.isDone).toBe(true));
expect(result.current.activeTools).toEqual([]);
});
it("doneイベントなしでの接続ドロップから復旧する", async () => {
// このフィクスチャは、サーバーがクラッシュしたセッションから記録されました —
// doneイベントはなく、接続クローズだけが発生していました
mockFetchWithRecording(ERROR_STREAM);
const { result } = renderHook(() => useAgentStream());
await act(async () => {
result.current.startStream("/chat", { message: "test" });
});
// まだ解決されるはずで、ハングしない
await waitFor(() => expect(result.current.isStreaming).toBe(false), {
timeout: 2000,
});// isDone は合成 done によって true になるはず
expect(result.current.isDone).toBe(true);
});
it("正しい割合への進捗更新", async () => {
mockFetchWithRecording(MULTI_TURN);
const { result } = renderHook(() => useAgentStream());
const progressValues: number[] = [];
await act(async () => {
result.current.startStream("/chat", { message: "test" }, {
onProgress: (e) => progressValues.push(e.percentage),
});
});
await waitFor(() => expect(result.current.isDone).toBe(true));
// 進捗は単調増加であるべき
for (let i = 1; i < progressValues.length; i++) {
expect(progressValues[i]).toBeGreaterThanOrEqual(progressValues[i - 1]);
}
// 最終進捗は 100 であるべき
expect(progressValues[progressValues.length - 1]).toBe(100);
});
});
本番の記録がまだない場合にフィクスチャを作成する
新しく始めたばかりで、まだ記録がない場合は、インラインのフィクスチャを手動で作成します — それらは単なる JSONL 文字列です:
// tests/fixtures/inline.ts
export const SIMPLE_TOOL_SESSION = `
{"session":"test-001","started_at":"2026-04-01T00:00:00+00:00","t":0}
{"t":0.0,"event":"token","data":{"text":"Searching for that"}}
{"t":0.05,"event":"tool_use","data":{"tool_name":"web_search","tool_use_id":"tu_1","input_summary":"query=test","status":"running"}}
{"t":0.89,"event":"tool_result","data":{"tool_name":"web_search","tool_use_id":"tu_1","output_summary":"3 results","duration_ms":839,"status":"done"}}
{"t":1.02,"event":"token","data":{"text":" — here are the results"}}
{"t":1.20,"event":"done","data":{"num_turns":1,"tool_count":1,"duration_ms":1200,"model":"claude-sonnet-4-6","total_cost_usd":0.004}}
`.trim();
export const CONNECTION_DROP_SESSION = `
{"session":"test-002","started_at":"2026-04-01T00:00:00+00:00","t":0}
{"t":0.0,"event":"token","data":{"text":"Let me check that"}}
{"t":0.05,"event":"tool_use","data":{"tool_name":"web_search","tool_use_id":"tu_1","input_summary":"query=test","status":"running"}}
`.trim();
// 注: done イベントがない — サーバークラッシュをシミュレートする
CONNECTION_DROP_SESSION のフィクスチャは、合成 done へのフォールバックを、毎回の CI 実行で、毎回無料で、決定論的にテストします。
これで何ができるようになるか
テストフィクスチャとして記録が揃うと、いくつかのことが変わります:
CI は API キーを一切要求しません。 テストスイート全体が LLM プロバイダに一切触れずに動作します。レート制限も、コストも、モデル挙動の変更による不安定さもありません。記録は git にコミットされ、すべての開発者のマシンと、すべての CI ジョブで同じように再生されます。
バグが回帰テストとして事前にパッケージ化されて届きます。 ユーザーから「何かが壊れた」という報告を受けたとき、セッション ID を要求します(またはタイムスタンプから記録ファイルを引きます)。.jsonl は再現ケースです。再現をあなたがやる必要はありません — すでに再現されています。失敗するアサーションを書き、コードを修正し、記録は tests/fixtures/ に恒久的なガードとして残ります。
タイミングに敏感な挙動をテストできます。 実際の記録は、イベント間のミリ秒間隔をそのまま捉えます。ツール呼び出しが 4 秒かかったときに何が起きるか(タイムアウト挙動、スピナーの状態など)をテストする必要があるなら、テストで sleep(4) する必要はありません。4 秒かかった実際の記録があり、それを 1x スピードでリプレイするだけです。
挙動のカバレッジは自動的に増えていきます。 すべての「変わった」本番セッション — 稀なイベントの並び、エッジケースのツール組み合わせ、部分的なストリームなど — が、潜在的なテストフィクスチャになります。新しいテストを書かなくても、プロダクトが使われるにつれて、あなたがテストしている挙動の集合は成長します。
本番 → フィクスチャ のワークフロー
この手順を実現するために:
# バグ報告の翌朝
agent-stream replay production.jsonl --list
# レポートに一致するタイムスタンプを見つける
# SESSION STARTED EVENTS DURATION TYPES
# f3a2c1b0-... 2026-04-01T02:14:11 14 6.2s token tool_use tool_use tool_result tool_result done
# 名前付きフィクスチャに抽出する
grep -m1 -A100 "f3a2c1b0" production.jsonl | head -15 > tests/fixtures/parallel_tools_bug.jsonl
# ローカルでリプレイして、正しいセッションか確認する
agent-stream replay tests/fixtures/parallel_tools_bug.jsonl --list
# テスト(赤)を書く、コード(緑)を直す、両方をコミットする
フィクスチャファイルは、テストと一緒にバージョン管理に入れます。リポジトリをクローンした誰でも再現ケースを得られます。このテストに対して最初にパスしたCI実行は、修正が正しいことの証明です。
はじめに
pip install agent-event-stream
npm install @agent-stream/react
記録を開始するために、FastAPIのエンドポイントに2行追加してください。
from agent_stream.recorder import AgentStreamRecorder
recorder = AgentStreamRecorder("production.jsonl")
@app.post("/chat")
async def chat(req: ChatRequest):
async def generate():
async for sse_str in recorder.record(run_agent(req.message)):
yield sse_str
return agent_stream_response(generate())
本番運用で1日過ごすと、最初のフィクスチャセットを作るのに十分なセッションが集まります。1か月後には、手動でテストしようと思いもよらなかった振る舞いにまでカバーできるようになります。



