中枢神経系:FastAPIとWebhookでエージェント型レーダーを24時間365日稼働させる

Dev.to / 2026/5/9

💬 オピニオンDeveloper Stack & InfrastructureTools & Practical UsageModels & Research

要点

  • この記事は、ローカルのターミナル実行から、世界各地で継続的に届くPDNに対応できるプロダクション向けイベント駆動アーキテクチャ(EDA)へ「エージェント型レーダー」を進化させる提案です。
  • アラートの取り込み元として、構造化JSONを送るB2B SaaSの「モダンAPI」と、メール/PDFでEOLや工場停止を通知する「レガシー」を整理し、HTTPS POSTで扱える共通フォーマットに正規化します。
  • レガシーのメールについては、SendGridやMailgunのようなインバウンド解析ゲートウェイを使い、メッセージを受け取って主要項目を抽出し、JSON化して中央のエンドポイントへ転送する方針を示しています。
  • ルーティング層はPythonの非同期マイクロサービスとしてFastAPIで構築し、Webhookエンドポイントで受信した標準化アラートをCrewAIへ渡す設計を説明しています。

このシリーズの前のセクションで、人工知能(AI)がコンポーネントの陳腐化(obsolescence)による損益(P&L)への影響を計算できることを実証しました。これは、決定論的なSQLツールを用いて、実行(execution)からセマンティック推論モジュールを切り離すことで実現しました。

しかし、ターミナルでローカルにPythonスクリプトを実行することは本番環境には適していません。製品の終息通知(Product Discontinuance Notices: PDN)は、世界中のタイムゾーンをまたいで継続的に届きます。サプライチェーンの運用には、中央集権的で、継続稼働でき、かつ拡張可能なシステムが必要です。

本番稼働に向けるためには、アーキテクチャを イベント駆動型アーキテクチャ(Event-Driven Architecture: EDA) へ移行する必要があります。

ブループリントのトポロジ:レガシー・ベクタ vs. 現代的なAPIフレームワーク

アーキテクチャレベルでは、サプライチェーンの物流システムは、2つの異なるアラート取り込み手法に対応する必要があります。

  1. 現代的なAPIベクタ(B2B SaaS): SiliconExpert や Accuris のような、商用のコンポーネントライフサイクル管理ツールは、構造化データを提供します。これらのプラットフォームは、市場ライフサイクルの遷移を詳述する標準化されたJSONペイロードを送信します。
  2. レガシー・ベクタ(メール): 多くのメーカーやTier 2のサプライヤは、製造施設の停止やEOL(End of Life)状態を知らせるために、プレーンテキストのメール、またはPDF添付を今も使い続けています。

ソフトウェア工学上の意思決定: PythonプロセスでIMAPのメールボックスを継続的にポーリングするのは、リソースを大きく消費し、レイテンシにもつながりやすい方法です。そこで、Inbound Parse のゲートウェイ(SendGridやMailgunのようなもの)を使用します。これらのサービスはメールを受け止め、関連するプロパティ(Subject、Body)を抽出して、標準化されたJSONペイロードにパッケージ化し、統合用のエンドポイントへ転送します。このアプローチにより、両方の通信チャネルは標準のHTTPS POSTリクエストへ正規化されます。

ルーティングロジック:FastAPIでバックボーンを構造化する

Pythonで非同期マイクロサービスを構築するために FastAPI を利用します。目的は、受信したアラートを CrewAI フレームワークへ振り分けるルーティング層をデプロイすることです。

以下の簡略化したコードは、デュアルWebhookの実装例を示しています。

@app.post("/api/v1/webhooks/commercial-radar")
async def commercial_radar_webhook(alert: CommercialAlert, background_tasks: BackgroundTasks):
    # ベクタ1プロトコル:商用APIの利用
    synthetic_pdn = f"メーカー: {alert.manufacturer}. MPN: {alert.mpn}. ステータス: EOL。"

    background_tasks.add_task(process_obsolescence_background, synthetic_pdn)
    return {"status": "accepted"}

@app.post("/api/v1/webhooks/inbound-email")
async def inbound_email_webhook(email: InboundEmail, background_tasks: BackgroundTasks):
    # ベクタ2プロトコル:インバウンドで解析済みのメールペイロード
    pdn_text = f"Subject: {email.subject}
Body: {email.text}"

    background_tasks.add_task(process_obsolescence_background, pdn_text)
    return {"status": "accepted"}

非同期実行:LLMレイテンシの取り扱い

上記のスニペットは、LLMを統合する際のWebレジリエンスに必須のパターンを示しています。CrewAIにオーケストレーションされた推論サイクルは、通常5〜15秒かかって完了します。この処理には、入力のパース、部品番号の抽出、Supabaseのリレーショナル・グラフへの問い合わせ、財務への影響の計算、そして応答の整形が含まれます。

この実行を待っている間HTTPソケットを開いたままにすると、送信側API(例:SendGrid)がタイムアウトエラー(通常は10秒で上限)に遭遇し、その結果として冗長な内部リトライが発生します。標準的な解決策は、Background Tasks を使って実行を疎結合にすることです。

サーバはただちに「HTTP 202 Accepted」のステータスコードを返し、クライアントとの接続を閉じます。一方で内部ワーカーは、ネットワークリソースをブロックすることなく、バックグラウンドでLLMの処理をインスタンス化します。

閉ループ制御:システム通知

自律エージェントが停止(downtime)のリスクを正常に評価できたとしても、出力をローカルにログとして残すだけでは、システムは運用上の目的を満たしません。出力データは、関係するステークホルダーへプッシュされる必要があります。

アーキテクチャの最終段階では、生成された軽減(mitigation)ブリーフを、アウトバウンドWebhookを介して調達チームのコミュニケーションチャネル(Microsoft TeamsやSlackなど)へ送信します。

def process_obsolescence_background(pdn_text: str):
    # ... マルチエージェント推論の反復 ...
    assessment = execute_obsolescence_analysis(pdn_text)

返却形式: {"translated": "翻訳されたHTML"}# MS Teams/Slack 調達アラート
    header = f" エージェントによる P&L アラート(処理済み)
"
    notify_teams(header + str(assessment))

完全なシステムアーキテクチャ

これらのモジュールを統合することで、SQL テーブルへのクエリと LLM によるテキスト処理が、非同期に連携して動作するイベント駆動型パイプラインが構成されます。

architecture

次のステップ

データ取り込みエンジン(ブロック 2)を設定し、セマンティック推論のフレームワーク(ブロック 3)を確立し、グローバルなコンポーネントの異常を 24/7 で処理する API 中心のサービス(ブロック 4)をデプロイしました。

このエンジニアリング連載の最終パートでは、データ可視化に焦点を当てます。エグゼクティブ ダッシュボードを構築することで、これらの非同期アラートをどのように公開するかを記録し、管理者によるレビューのためにエージェントの運用を利用できるようにします。