生のVPCフローログファイルを開いたことがあるなら、数千行のスペースで区切られたフィールド、IP、ポート、パケット数、タイムスタンプが並んでいるのを見て、質問に対する答えがどこかにあるという感覚を知っているでしょう。あなたはそれを見つける必要があります。
そのSSH接続は拒否されましたか?
どのIPがポート443を叩き続けていますか?
このトラフィックは正常ですか、それとも問題ですか?
手動でVPCフローログを掘り下げるのは遅く、反応的で、正直言って苦痛です。通常はファイルをgrepし、スプレッドシートにエクスポートし、単発のスクリプトを作成して簡単な質問に答えなければなりません。
ログに質問できればどうでしょうか?
この記事では、静的なネットワークテレメトリをインタラクティブなセキュリティアシスタントに変えるRetrieval-Augmented Generation (RAG)駆動のVPCフローログアナライザーを構築します。
マニュアルログ分析の課題
AWS VPCフローログはネットワークトラフィックに関する重要な情報をキャプチャします。しかし、SQLインジェクション攻撃や不正アクセスなどの脅威を検出するためにこれらの生のログを分析することは、重大な課題を伴います:
情報過多:ログの量が膨大で、特定のパターンや異常を見つけるのは、干し草の山から針を探すようなものです。
コンテキストの断片化:生のログにはコンテキストが欠けています。異なるコンポーネントや時間枠で関連するパケットを特定するのは労力がかかり、エラーが発生しやすいです。
RAGベースのVPCフローログアナライザーは次のものを使用します:
- Streamlit (インタラクティブUI)
- LangChain (RAGオーケストレーション)
- Chroma (ベクターデータベース)
- OpenAI GPT-4o (推論エンジン)
最終的に、次のような質問に答えることができる会話型のセキュリティアシスタントが得られます:
- 「どのIPが拒否されましたか?」
- 「ポート22への異常なトラフィックはありましたか?」
- 「どの宛先が最も多くのパケットを受信しましたか?」
機能コンポーネント
データ取り込みと変換(「翻訳者」)
生のVPCフローログはただの数値とIPの文字列です(例:2 123... 443 6 ACCEPT)。
コンポーネント:カスタムPythonパーサー。
これにより、ログは「人間が読み取れる」文に変換されます。「ソース10.0.1.5がポート443に1000バイトを送信し、受け入れられました。」この形式にすることで、AIがデータポイントの関係を「理解」しやすくなります。埋め込みモデル(「エンコーダー」)
テキストを数学的に検索することはできないので、数値(ベクター)に変換します。
コンポーネント:OpenAI text-embedding-3-small。
これにより、各ログ行の数値的な「指紋」が作成されます。類似のイベント(複数のSSHブルートフォース攻撃など)は類似の数値的指紋を持ち、「あいまい」または意味のある検索が可能になります。ベクターデータベース(「メモリ」)
標準のデータベースは正確な単語を検索しますが、ベクターデータベースは意味を検索します。
コンポーネント:ChromaDB。
数千の「指紋」をローカルに保存します。質問すると、特定のクエリに最も関連する上位10または15のログエントリを瞬時に見つけます。RAGオーケストレーションとLLM(「脳」)
ここで実際の「チャット」が行われます。
コンポーネント:LangChain + GPT-4o。
LangChainは質問を受け取り、ChromaDBから関連するログを取得し、それらをGPT-4oに指示セットと共に渡します。「あなたはセキュリティエンジニアです。ここで何が起こったか教えてください。」
- Streamlitフロントエンド(「コックピット」)
コンポーネント:Streamlit Webフレームワーク。
これは.txtファイルのアップロード用のUIを提供し、.env経由でAPIキーを管理し、ネットワークを調査するために端末に触れる必要のないチャットインターフェースを提供します。
実装に関わるステップ:
コードベースはGitHubで確認してください。
ステップ1:仮想環境の作成と依存関係のインストール
git clone https://github.com/Damdev-95/rag_aws_flow_logs
python -m venv venv
source venv/bin/activate
cd rag_aws_flow_logs
pip install -r requirements.txt
ステップ2:設定の取り扱い
環境変数には、openaiキーなどの機密データの取り扱いが含まれます。
ENV_API_KEY = os.getenv("OPENAI_API_KEY")
ステップ3:Streamlitアプリの実行
streamlit run app.py
- 'Browse files'をクリックすると、アプリケーションでログファイルをアップロードできます。ログファイルの形式がtxtであることを確認してください。
- 「Build Knowledge Base」を選択して、ベクトルに変換後の生のログデータをベクターデータベースに保存します。





