PostgreSQLを用いた本番環境向けRAGシステムの完全実装ガイド

Dev.to / 2026/3/21

💬 オピニオンDeveloper Stack & InfrastructureTools & Practical Usage

要点

  • 記事は本番環境のRAGシステムが失敗する三つの核心的理由を挙げている:純粋なベクトル検索だけによる取得品質の低さ、出典属性の欠如、そして単一の検索戦略への依存。
  • PostgreSQLとpgvector拡張機能を用いた、本番環境対応のアーキテクチャを提案し、1つの信頼性の高いデータベースでベクトル検索とキーワード検索の両方を可能にする。
  • 実践的な手順を提供し、完全な文脈を保持して出典を引用できるようにする文書用データベーススキーマから始める。
  • 処理から取得、回答生成へと文書が流れる様子を示すアーキテクチャ図とコード例を含み、セマンティック理解と厳密な用語の一致の両方の必要性を強調している。

PostgreSQLを使った本番運用向けRAGシステムの構築: 完全実装ガイド

ほとんどのRAG(Retrieval-Augmented Generation)システムは、本番環境で予測可能な技術的理由により失敗します:リトリーブ品質の低下、出典情報の欠如、現実のクエリの変動に対処できないこと。 このガイドは、実際に機能する本番運用向けRAGシステムを構築する方法を示します。

なぜほとんどのRAGシステムは失敗するのか

本番環境でのRAGの失敗は、3つの核となる技術的問題に起因します:

1. 純粋なベクター検索の制約
ベクトルの類似性は、必ずしも人間の関連性と一致しません。 「APIのレート制限」というクエリは、ユーザーが「1000リクエスト/時間」に関する正確な情報を求めている場合に「リクエストのスロットリングガイドライン」を返してしまうことがあります。

2. 出典情報の欠如
確認できない回答は信頼されません。 明確な出典がないと、正しくても信頼性が低く感じられます。

3. 単一の検索戦略
ベクター検索だけ、またはキーワード検索だけに頼ると、重要な結果を見逃します。 実際の質問には、意味理解と正確な用語の一致の両方が必要です。

アーキテクチャ概要: PostgreSQL + pgvector

私たちのアーキテクチャは、ベクトル検索と従来の検索の両方を、単一で信頼性の高いデータベース内で処理するために、pgvector拡張を備えたPostgreSQLを使用します:

…コードブロックは省略…

ステップ1: データベーススキーマ

…SQLコードブロックは省略…

必須インデックスの作成:

…SQLコードブロックは省略…

ステップ2: ハイブリッド検索の実装

私たちのシステムの核は、意味理解とキーワード検索を組み合わせた関数です:

…SQLコードブロックは省略…
c.embedding <=> query_embedding))::DOUBLE PRECISION as semantic_score, d.title as document_title, d.url as document_url FROM document_chunks c JOIN documents d ON c.document_id = d.id ORDER BY c.embedding <=> query_embedding LIMIT limit_count * 3 ), keyword_results AS ( SELECT c.id as chunk_id, ts_rank_cd(to_tsvector('english', c.content), plainto_tsquery('english', query_text))::DOUBLE PRECISION as keyword_score FROM document_chunks c WHERE to_tsvector('english', c.content) @@ plainto_tsquery('english', query_text) ) SELECT s.chunk_id, s.content, s.semantic_score, COALESCE(k.keyword_score, 0.0) as keyword_score, (semantic_weight * s.semantic_score + keyword_weight * COALESCE(k.keyword_score, 0.0))::DOUBLE PRECISION as combined_score, s.document_title, s.document_url FROM semantic_results s LEFT JOIN keyword_results k ON s.chunk_id = k.chunk_id WHERE (semantic_weight * s.semantic_score + keyword_weight * COALESCE(k.keyword_score, 0.0)) > 0.3 ORDER BY combined_score DESC LIMIT limit_count; END; $$ LANGUAGE plpgsql;

ステップ3: ドキュメント処理パイプライン

文脈を保持するスマートチャンク化:

import asyncpg
import openai
from typing import List, Dict
import re
from dataclasses import dataclass

@dataclass
class DocumentChunk:
    content: str
    section_title: str
    chunk_index: int
    token_count: int

class DocumentProcessor:
    def __init__(self, db_url: str, openai_api_key: str):
        self.db_url = db_url
        self.client = openai.AsyncOpenAI(api_key=openai_api_key)

    def _create_smart_chunks(self, content: str, max_tokens: int = 512) -> List[DocumentChunk]:
        """Create chunks that respect document structure."""
        chunks = []

        # Split by major sections (headers)
        sections = re.split(r'
\s*#{1,3}\s+', content)
        current_section = "Introduction"

        for section in sections:
            if not section.strip():
                continue


for chunk in chunks:
    chunk_embedding = await self._generate_embedding(chunk.content)

    await conn.execute("""
        INSERT INTO document_chunks 
        (document_id, chunk_index, content, embedding, token_count, section_title)
        VALUES ($1, $2, $3, $4, $5, $6)
    """, doc_id, chunk.chunk_index, chunk.content, 
    chunk_embedding, chunk.token_count, chunk.section_title)

    return True

except Exception as e: print(f"Error processing document: {e}") return False async def _generate_embedding(self, text: str) -> List[float]: response = await self.client.embeddings.create( model="text-embedding-3-large", input=text ) return response.data[0].embedding

Step 4: 出典を付与するクエリパイプライン

import json
from typing import List, Dict

class ProductionRAGSystem:
    def __init__(self, db_url: str, openai_api_key: str):
        self.db_url = db_url
        self.client = openai.AsyncOpenAI(api_key=openai_api_key)

    async def answer_question(self, question: str) -> Dict:
        """ユーザーの質問に回答するための主要エントリポイント。"""

        # クエリの埋め込みを生成
        embedding_response = await self.client.embeddings.create(
            model="text-embedding-3-large",
            input=question
        )
        query_embedding = embedding_response.data[0].embedding

        # 関連するチャンクを取得
        chunks = await self._retrieve_chunks(question, query_embedding)

        # 品質ゲート - 適切な出典を確保
        high_quality_chunks = [c for c in chunks if c['combined_score'] > 0.5]

        if len(high_quality_chunks) < 2:
            return {
                "answer": "この質問に自信を持って答えるのに十分な関連情報が見つかりませんでした。",
                "confidence": 0.0,
                "sources": [],
                "suggestion": "より具体的な用語で質問を言い換えてください。"
            }

        # 出典付きの回答を生成
        return await self._generate_attributed_answer(question, high_quality_chunks[:5])
async def _retrieve_chunks(self, query: str, embedding: List[float]) -> List[Dict]: async with asyncpg.connect(self.db_url) as conn: rows = await conn.fetch(""" SELECT * FROM hybrid_search($1, $2, 15, 0.6, 0.4) """, query, embedding) return [dict(row) for row in rows] async def _generate_attributed_answer(self, question: str, chunks: List[Dict]) -> Dict: """出典をすべて付記した回答を生成します。""" # 出典番号付きのコンテキストを構築する context_parts = [] sources_info = {} for i, chunk in enumerate(chunks, 1): context_parts.append( f"[Source {i: {chunk['document_title']}] " f"URL: {chunk['document_url']} " f"Content: {chunk['content']} " ) sources_info[i] = { "title": chunk['document_title'], "url": chunk['document_url'], "relevance_score": chunk['combined_score'], "preview": chunk['content'][:200] + "..." } context = " --- ".join(context_parts) # 正確で出典付きの回答のためのシステムプロンプト system_prompt = """あなたは、提供されたソースのみに基づいて正確な回答を提供する文書アシスタントです。 要件: 1. 提供されたソースの情報のみを使用して回答してください 2. [Source X] 形式で常に出典を引用してください 3. 出典が質問に十分に答えない場合、欠落している点を明確に述べてください 4. 出典のカバレッジに基づいて信頼度スコアを提供してください(0.0-1.0) 応答形式(JSON): { "answer": "出典 [Source X] の引用を含む完全な回答", "confidence": 0.85, "reasoning": "信頼度スコアの簡潔な説明" }""" response = await self.client.chat.completions.create( model="gpt-4-turbo-preview", messages=[ {"role": "system", "content": system_prompt}, {"role": "user", "content": f"質問: {question} 出典:{context}"} ], response_format={"type": "json_object"}, temperature=0.1 ) result = json.loads(response.choices[0].message.content) return { \"answer\": result.get(\"answer\", \"\"), \"confidence\": float(result.get(\"confidence\", 0.0)), \"reasoning:\ result.get(\"reasoning\", \"\"), \"sources\": sources_info, \"query_processed\": question } # Usage example async def main(): rag_system = ProductionRAGSystem( db_url = \"postgresql://user:pass@localhost/ragdb\", openai_api_key=\"your-openai-key\") result = await rag_system.answer_question( \"How do I update my API rate limits?\" ) print(f(\"Answer: \", result[\'answer\']) print(f(\"Confidence: \", result[\'confidence\']) print(f(\"Sources: \", len(result[\'sources\')) async def run_evaluation(): test_cases = [ TestCase( question="請求先住所をどのように更新しますか?", expected_topics=["請求", "住所", "更新"], expected_sources=["billing-management"], min_confidence=0.8 ) ] for test_case in test_cases: result = await rag_system.answer_question(test_case.question) # 正確性、出典品質、信頼度を評価

実装の要点

本番環境のRAGシステムを構築するには:

  1. ハイブリッド検索: 意味的検索とキーワード検索を組み合わせて関連性を高める
  2. スマートチャンク化: 文書の構造と文脈を保持する
  3. 出典表示: 検証可能な引用を常に提供する
  4. 品質ゲート: 信頼度の閾値を設定して不良な回答を防ぐ
  5. パフォーマンス最適化: キャッシュ、接続プーリング、データベースチューニング
  6. 体系的なテスト: 実際の指標を用いた包括的評価

この実装は、本番環境で実際に機能するRAGシステムの堅固な基盤を提供し、実際のユーザーの問い合わせを高い精度と透明性で処理します。

完全なコードは、特定のユースケースに合わせてデプロイおよびカスタマイズできる実用例として利用可能です。高度な機能の最適化を行う前に、取得品質、出典表示、ユーザーの信頼といった基本を正しく抑えることに集中してください。