本番環境におけるRAGシステム:エンタープライズ向けナレッジ検索の構築
はじめに
検索拡張生成(RAG)は、企業がインテリジェントなナレッジシステムを構築する方法を一変させました。大規模言語モデルの力と、ドメイン固有の知識を組み合わせることで、RAGシステムは、純粋なLLMだけでは単独では実現できない質問への回答、情報の統合、洞察の提供を行えます。
Groovy Webでは、Fortune 500企業向けにRAGシステムを構築し、デプロイしてきました。これにより、組織のナレッジが持つ価値を引き出す支援を行っています。本ガイドでは、月間数百万件のクエリを提供する本番環境のRAGシステムを構築する中で得た学びをすべてまとめています。
目次
- RAGシステムの理解
- システムアーキテクチャ
- ベクターデータベースの選定
- 埋め込み(Embedding)戦略
- チャンク化(Chunking)手法
- 検索(Retrieval)の最適化
- 生成と統合(Synthesis)
- 評価と品質保証
- スケーリングの考慮事項
- 本番デプロイ
- 監視と可観測性
- 実世界での実装
RAGシステムの理解
RAGとは?
RAG(Retrieval-Augmented Generation:検索拡張生成)とは、応答を生成する前にナレッジベースから関連するコンテキストを取得することで、大規模言語モデルを強化する手法です。
RAGなしの場合:
ユーザーの質問 → LLM → 回答(学習データに限定)
RAGありの場合:
ユーザーの質問 → 関連ドキュメントを取得 → LLM + コンテキスト → 回答(ナレッジベースに基づく)
エンタープライズでRAGが必要な理由
1. ドメイン固有の知識
LLMは公開インターネットのデータで学習されていますが、企業には独自の情報があります。
- 社内ドキュメント
- 製品仕様
- 顧客とのやり取り
- 研究論文
- コンプライアンス文書
RAGシステムにより、LLMはこの非公開の知識にアクセスできます。
2. 幻覚(ハルシネーション)の低減
取得したドキュメントに基づいて応答を作ることで、RAGシステムは次のことができます。
- 出典を引用する
- 検証可能な情報を提供する
- 誤った主張を減らす
- ユーザーの信頼を構築する
3. コスト効率が高い
ファインチューニングと比べて:
- モデル学習は不要
- ナレッジベースを簡単に更新できる
- インフラコストが低い
- 本番投入までの時間が短い
4. 透明性とコンプライアンス
RAGシステムは次の機能を提供します:
- 出典の紐付け
- 監査ログ(監査証跡)
- 規制への準拠
- 説明可能なAI
RAGとファインチューニングの比較
| 観点 | RAG | ファインチューニング |
|---|---|---|
| 知識の更新 | 即時(データベースに追加) | 再学習が必要 |
| コスト | 低($ /クエリ) | 高(学習コスト) |
| ドメイン適合性 | 高(ソースデータ) | 中(パターン学習) |
| 幻覚リスク | 低(根拠あり) | 中(モデルに基づく) |
| 透明性 | 高(引用) | 低(ブラックボックス) |
| セットアップ期間 | 数日〜数週間 | 数週間〜数か月 |
| 保守 | 継続的なインデックス更新 | 定期的な再学習 |
RAGの最適なユースケース:
- ナレッジ検索とQ&A
- ドキュメント分析
- 顧客サポートの自動化
- リサーチ支援
- コンプライアンスおよび法務レビュー
ファインチューニングの最適なユースケース:
- スタイルとトーンのカスタマイズ
- フォーマットの標準化
- ドメイン固有の推論
- 特化した指示の追従
システムアーキテクチャ
エンドツーエンドのRAGパイプライン
┌─────────────────────────────────────────────────────────────┐
│ ナレッジベース │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ ドキュメント │ │ ベクトル │ │ メタデータ │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────────────────────────────────────────┘
│
│ 取り込み(Ingestion)パイプライン
▼
┌─────────────────────────────────────────────────────────────┐
│ 処理レイヤー(PROCESSING LAYER) │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ チャンク化 │→│ 埋め込み │→│ インデックス │→│ 保存 │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────────┘
│
│ クエリ
▼
┌─────────────────────────────────────────────────────────────┐
│ 検索レイヤー(RETRIEVAL LAYER) │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ クエリ │→│ 意味検索 │→│ ハイブリッド検索 │ │
│ │ 埋め込み │ │ (Semantic)│ │ (Hybrid) │ │
│ └────────────┘ └────────────┘ └────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────┐ │
│ │ 再ランキング& │ │
│ │ フィルタリング │ │
│ └──────────────┘ │
└─────────────────────────────────────────────────────────────┘
│
│ コンテキスト
▼
┌─────────────────────────────────────────────────────────────┐
│ 生成レイヤー(GENERATION LAYER) │
│ ┌────────────┐ ┌────────────┐ ┌────────────┐ │
│ │ プロンプト │→│ LLM │→│ 応答 │ │
│ │ 作成 │ │ 推論(Inference)│ │ 統合(Synthesis)│ │
│ └────────────┘ └────────────┘ └────────────┘ │
└─────────────────────────────────────────────────────────────┘
│
▼
ユーザーへの応答
コンポーネントの内訳
1. 取り込み(Ingestion)パイプライン
# ingestion/pipeline.py
from typing import List, Dict
from pathlib import Path
import hashlib
class DocumentIngestionPipeline:
"""Process and ingest documents into knowledge base"""
返却形式: {"translated": "翻訳されたHTML"}def __init__(self, config: Dict):
self.chunker = DocumentChunker(config['chunking'])
self.embedder = EmbeddingGenerator(config['embeddings'])
self.vector_store = VectorStore(config['vector_db'])
async def ingest_document(self, document: Dict) -> List[str]:
"""
ドキュメントを知識ベースに取り込む
戻り値: チャンクIDのリスト
"""
# 1. テキストとメタデータを抽出
text = document['content']
metadata = {
' title': document[' title'],
' source': document[' source'],
' author': document.get(' author'),
' created_at': document.get(' created_at'),
' doc_type': document.get(' type', ' unknown'),
' language': document.get(' language', ' en')
}
# 2. チャンクに分割
chunks = self.chunker.chunk(text)
# 3. 埋め込み(embeddings)を生成
chunk_texts = [chunk['text'] for chunk in chunks]
embeddings = await self.embedder.generate_batch(chunk_texts)
# 4. 保存用のレコードを準備
records = []
for chunk, embedding in zip(chunks, embeddings):
record = {
'id': self._generate_chunk_id(document[' id'], chunk[' index']),
' document_id': document[' id'],
' text': chunk[' text'],
' embedding': embedding,
' metadata': {
**metadata,
' chunk_index': chunk[' index'],
' chunk_size': len(chunk['text']),
'start_char': chunk['start'],
'end_char': chunk['end']
}
}
records.append(record)
# 5. ベクターデータベースに格納
chunk_ids = await self.vector_store.insert(records)
return chunk_ids
def _generate_chunk_id(self, doc_id: str, chunk_index: int) -> str:
"""一意のチャンクIDを生成"""
hash_input = f"{doc_id}_{chunk_index}"
return hashlib.sha256(hash_input.encode()).hexdigest()[:32]
2. リトリーバルエンジン
# retrieval/engine.py
from typing import List, Dict, Optional
import numpy as np
class RetrievalEngine:
"""クエリに関連するドキュメントを取得する"""
def __init__(self, vector_store, embedder, config: Dict):
self.vector_store = vector_store
self.embedder = embedder
self.config = config
self.reranker = Reranker(config.get('reranking'))
async def retrieve(
self,
query: str,
top_k: int = 10,
filters: Optional[Dict] = None
) -> List[Dict]:
"""
クエリに関連するチャンクを取得する
Args:
query: ユーザーのクエリ
top_k: 返す結果の数
filters: メタデータのフィルター(例:{category: 'technology'})
Returns:
スコア付きで取得したチャンクのリスト
"""
# 1. クエリの埋め込みを生成
query_embedding = await self.embedder.generate(query)
# 2. セマンティック検索
results = await self.vector_store.similarity_search(
query_embedding,
top_k=top_k * 2, # リランキングのためにより多く取得する
filters=filters
)
# 3. 設定されている場合はリランキングする
if self.reranker and len(results) > top_k:
results = await self.reranker.rerank(query, results, top_k)
return results[:top_k]
async def retrieve_with_hybrid_search(
self,
query: str,
top_k: int = 10,
alpha: float = 0.5,
filters: Optional[Dict] = None
) -> List[Dict]:
"""
セマンティック検索とキーワード検索を組み合わせたハイブリッド取得
Args:
query: ユーザーのクエリ
top_k: 結果の件数
alpha: セマンティック検索の重み(0〜1)
filters: メタデータのフィルタ
Returns:
並べ替え(再ランキング)された結合結果
"""
# 1. セマンティック検索
semantic_results = await self.vector_store.similarity_search(
await self.embedder.generate(query),
top_k=top_k * 2,
filters=filters
)
# 2. キーワード検索
keyword_results = await self.vector_store.keyword_search(
query,
top_k=top_k * 2,
filters=filters
)
# 3. 結合して再ランキング
combined = self._combine_results(
semantic_results,
keyword_results,
alpha
)
# 4. 結合結果を再ランキング
if self.reranker:
combined = await self.reranker.rerank(query, combined, top_k)
return combined[:top_k]
def _combine_results(
self,
semantic_results: List[Dict],
keyword_results: List[Dict],
alpha: float
) -> List[Dict]:
"""セマンティック検索とキーワード検索の結果を結合する"""
# スコアの正規化
sem_scores = np.array([r['score'] for r in semantic_results])
key_scores = np.array([r['score'] for r in keyword_results])
sem_normalized = (sem_scores - sem_scores.min()) / (sem_scores.max() - sem_scores.min())
key_normalized = (key_scores - key_scores.min()) / (key_scores.max() - key_scores.min())# スコアを組み合わせる
for i, result in enumerate(semantic_results):
result['combined_score'] = alpha * sem_normalized[i]
for i, result in enumerate(keyword_results):
result['combined_score'] += (1 - alpha) * key_normalized[i]
# 組み合わせたスコアでマージしてソートする
seen = set()
combined = []
for result in semantic_results + keyword_results:
if result['id'] not in seen:
seen.add(result['id'])
combined.append(result)
combined.sort(key=lambda x: x['combined_score'], reverse=True)
return combined
3. レスポンス生成器
# generation/generator.py
from typing import List, Dict
import openai
class ResponseGenerator:
"""取得したコンテキストを使ってレスポンスを生成する"""
def __init__(self, config: Dict):
self.client = openai.AsyncClient(api_key=config['api_key'])
self.model = config['model']
self.temperature = config.get('temperature', 0.3)
self.max_tokens = config.get('max_tokens', 1000)
async def generate_response(
self,
query: str,
context: List[Dict],
conversation_history: Optional[List[Dict]] = None
) -> Dict:
"""
取得したコンテキストを使ってレスポンスを生成する
Args:
query: ユーザーのクエリ
context: 取得したチャンク
conversation_history: 以前のメッセージ(チャット用)
Returns:
引用(シテーション)付きで生成されたレスポンス
"""
# 1. コンテキストを使ってプロンプトを作成する
prompt = self._build_prompt(query, context)
# 2. レスポンスを生成する
messages = self._build_messages(prompt, conversation_history)
返却形式: {"translated": "翻訳されたHTML"}response = await self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=self.temperature,
max_tokens=self.max_tokens
)
# 3. 返信と引用を抽出する
answer = response.choices[0].message.content
citations = self._extract_citations(response, context)
return {
'answer': answer,
'citations': citations,
'sources': self._get_unique_sources(context),
'model': self.model,
'tokens_used': response.usage.total_tokens
}
def _build_prompt(self, query: str, context: List[Dict]) -> str:
"""文脈(コンテキスト)付きでプロンプトを構築する"""
context_str = "
".join([
f"[Source {i+1}]
{chunk['text']}"
for i, chunk in enumerate(context)
])
prompt = f"""あなたは、指定された文脈(コンテキスト)に基づいて質問に答えるための、有能なアシスタントです。
Context:
{context_str}
Question: {query}
Instructions:
1. 提供された文脈(コンテキスト)だけを使って質問に答えてください
2. 文脈(コンテキスト)に十分な情報が含まれていない場合は、その旨を述べてください
3. [Source X] の表記を用いて出典を引用してください
4. 簡潔かつ正確に回答してください
5. 出典が求められた場合は、それらを提示してください
Answer:"""
return prompt
def _build_messages(
self,
prompt: str,
history: Optional[List[Dict]] = None
) -> List[Dict]:
"""API 用のメッセージリストを構築する"""
messages = []
if history:
messages.extend(history)
messages.append({
"role": "user",
"content": prompt
})
return messagesdef _extract_citations(
self,
response: openai.ChatCompletion,
context: List[Dict]
) -> List[Dict]:
"""応答から引用(シテーション)を抽出する"""
answer = response.choices[0].message.content
# [Source 1]、[Source 2] などのソース参照を見つける
import re
citations = re.findall(r'\[Source (\d+)\]', answer)
# 実際のソースチャンクにマッピングする
unique_citations = []
for citation in set(citations):
idx = int(citation) - 1 # 0始まりのインデックスに変換
if idx < len(context):
unique_citations.append({
'index': int(citation),
'chunk_id': context[idx]['id'],
'document_id': context[idx]['metadata']['document_id'],
'title': context[idx]['metadata']['title'],
'source': context[idx]['metadata']['source']
})
return unique_citations
def _get_unique_sources(self, context: List[Dict]) -> List[Dict]:
"""コンテキストから一意なソースを取得する"""
seen = set()
sources = []
for chunk in context:
doc_id = chunk['metadata']['document_id']
if doc_id not in seen:
seen.add(doc_id)
sources.append({
'document_id': doc_id,
'title': chunk['metadata']['title'],
'source': chunk['metadata']['source'],
'author': chunk['metadata'].get('author'),
'created_at': chunk['metadata'].
})
返却形式: {"translated": "翻訳されたHTML"}get('created_at')
})
return sources
ベクターデータベースの選定
比較マトリクス
| データベース | オープンソース | クラウド管理 | パフォーマンス | スケーラビリティ | 機能 | コスト |
|---|---|---|---|---|---|---|
| pgvector | ✅ | ✅(Supabase など) | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | リレーショナルDB + ベクトル | $ |
| Pinecone | ❌ | ✅ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | 専用設計、導入が簡単 | $$$ |
| Weaviate | ✅ | ✅ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | GraphQL、多モーダル | $$ |
| Qdrant | ✅ | ✅ | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | フィルタ最適化、ハイブリッド | $$ |
| Milvus | ✅ | ✅(Zilliz) | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | 分散型、クラウドネイティブ | $$ |
| Chroma | ✅ | ❌ | ⭐⭐⭐ | ⭐⭐⭐ | シンプル、組み込み | 無料 |
選定基準
pgvectorを選ぶべき場合:
- すでにPostgreSQLを使用している
- ACIDトランザクションが必要
- インフラを最小限にしたい
- 予算を重視している
- ベクトル検索とSQLのJOINが必要
Pineconeを選ぶべき場合:
- 完全にマネージされたソリューションが欲しい
- 自動スケーリングが必要
- セットアップの容易さを優先したい
- マネージドサービスに回せる予算がある
- 最速で本番稼働したい
Qdrantを選ぶべき場合:
- 高度なフィルタリングが必要
- ハイブリッド検索機能が欲しい
- 高いパフォーマンスが必要
- マネージドオプション付きのオープンソースを好む
Weaviateを選ぶべき場合:
- マルチモーダル検索(テキスト + 画像)が必要
- GraphQL APIが欲しい
- モジュラーなアーキテクチャが必要
- ナレッジグラフを構築している
私たちの選択:pgvector
私たちは、ほとんどのエンタープライズRAGシステムに対して pgvector を推奨します。その理由は:
1. 統一されたデータモデル
-- ベクトル + メタデータのための単一クエリ
SELECT
d.title,
d.content,
d.metadata->>'category' as category,
1 - (d.embedding <=> query_embedding) as similarity
FROM documents d
JOIN document_tags dt ON d.id = dt.document_id
WHERE d.status = 'published'
AND dt.tag_id = ANY(SELECT id FROM tags WHERE name IN ('AI', 'ML'))
AND d.created_at > NOW() - INTERVAL '1 year'
ORDER BY d.embedding <=> query_embedding
LIMIT 20;
2. コスト効率が高い
- 別途ベクターデータベースは不要
- 既存のPostgreSQLインフラを活用できる
- セルフホスト型のオプションが利用可能
- マネージド代替案より90%安い
3. 成熟したエコシステム
- バックアップ/リストアツール
- レプリケーションとHA
- モニタリングと可観測性
- ORMサポート(SQLAlchemy、Django ORM)
4. パフォーマンス
-- 適切なインデックスを使用する場合
CREATE INDEX idx_documents_embedding_hnsw ON documents
USING hnsw(embedding vector_cosine_ops)
WITH (m = 16, ef_construction = 64);
-- クエリパフォーマンス:100万ベクトルで15〜30ms
埋め込み(Embedding)戦略
モデル選定
| モデル | 次元数 | コンテキスト長 | 速度 | 品質 | コスト/100万トークン |
|---|---|---|---|---|---|
| text-embedding-3-small | 1536 | 8191 | 高速 | 良い | 0.02ドル |
| text-embedding-3-large | 3072 | 8191 | 中速 | 優秀 | 0.13ドル |
| text-embedding-ada-002 | 1536 | 8191 | 高速 | 良い | 0.10ドル |
| bge-large-en-v1.5 | 1024 | 512 | 高速 | 優秀 | 無料(セルフホスト) |
| e5-large-v2 | 1024 | 512 | 高速 | 非常に良い | 無料(セルフホスト) |
推奨
ほとんどのエンタープライズユースケースでは: text-embedding-3-small
embeddings = OpenAIEmbeddings(
model="text-embedding-3-small",
dimensions=1536 # 512に切り詰めて高速な検索が可能
)
なぜ?
- 価格/性能の最良のバランス
- ほとんどの領域で良好な品質
- 長いコンテキスト(8191トークン)
- 多言語対応
- 保存コストの低さ
専門領域の場合: オープンソースモデル(セルフホスト)
# 法務/医療/技術コンテンツ向け
from sentence_transformers import SentenceTransformer
model = SentenceTransformer('BAAI/bge-large-en-v1.5')
embeddings = model.encode(texts)
埋め込みの最適化
1. 次元数の削減
# 1536から512次元に削減(高速な検索、低い保存)
import numpy as np
from sklearn.decomposition import PCA
def reduce_dimensions(embeddings: np.ndarray, target_dim:int = 512) -> np.ndarray:
"""PCAを使って埋め込みの次元数を削減します"""
pca = PCA(n_components=target_dim)
return pca.fit_transform(embeddings)
# 使用例
full_embeddings = np.array([...]) # (N, 1536)
reduced_embeddings = reduce_dimensions(full_embeddings, 512)
トレードオフ:
- 1536次元:最良の品質、検索が遅い
- 768次元:良いバランス
- 512次元:高速な検索、わずかな品質低下
- 256次元:最速の検索、品質低下が目立つ
2. ハイブリッド埋め込み
# 意味(セマンティック)埋め込みとキーワード埋め込みを組み合わせる
class HybridEmbedding:
def __init__(self):
self.semantic_model = OpenAIEmbeddings(model="text-embedding-3-small")
self.bm25 = BM25Encoder()
def embed_documents(self, texts: List[str]) -> Dict[str, np.ndarray]:
"""意味(セマンティック)埋め込みとキーワード埋め込みの両方を生成します"""
semantic = self.semantic_model.embed_documents(texts)
keyword = self.bm25.encode_documents(texts)
return {
'semantic': np.array(semantic),
'keyword': np.array(keyword)
}
3. クエリ拡張
# 関連する用語でクエリを拡張して、検索精度を高める
async def expand_query(query:str, llm) -> List[str]:
"""クエリのバリエーションを生成します"""
prompt = f"""次のために3〜5個の代替クエリを生成してください:""" "{query}"response = await llm.generate(prompt)
variations = [line.strip() for line in response.split('
') if line.strip()]
return [query] + variations
# 使用方法
query_variations = await expand_query("RAGを実装するにはどうすればいいですか?", llm)
# 返却: [
# "RAGを実装するにはどうすればいいですか?",
# "検索拡張生成(RAG)システムの構築",
# "RAG実装ガイド",
# "RAGアプリケーションの作成",
# "RAGシステムアーキテクチャ"
# ]
チャンク化手法
なぜチャンク化が重要なのか
チャンク化はRAGシステムにおける最も重要な判断です:
- 小さすぎる → コンテキストの喪失
- 大きすぎる → ノイズの多い検索、生成が遅くなる
- 境界が不適切 → 情報が分断される
チャンク化の戦略
1. 固定サイズのチャンク化
# chunking/fixed_size.py
from typing import List, Dict
class FixedSizeChunker:
"""テキストを固定サイズのチャンクに分割する"""
def __init__(self, chunk_size: int = 1000, overlap: int = 200):
self.chunk_size = chunk_size
self.overlap = overlap
def chunk(self, text: str) -> List[Dict]:
"""テキストをチャンクに分割する"""
chunks = []
start = 0
chunk_index = 0
while start < len(text):
end = start + self.chunk_size
chunk_text = text[start:end]
chunks.append({
'text': chunk_text,
'index': chunk_index,
'start': start,
'end': end,
'size': len(chunk_text)
})
chunk_index += 1
start = end - self.overlap
return chunks
# 長所: シンプルで予測しやすい
# 短所: 文を分割する可能性があり、コンテキストを失う
2. 文ベースのチャンク化
# chunking/sentence.py
import re
from typing import List, Dict
class SentenceChunker:
"""テキストを文ベースのチャンクに分割する"""
def __init__(self, sentences_per_chunk: int = 5, overlap: int = 1):
self.sentences_per_chunk = sentences_per_chunk
self.overlap = overlap
返却形式: {"translated": "翻訳されたHTML"}def chunk(self, text: str) -> List[Dict]:
"""文章ベースのチャンクに分割する"""
# 文に分割する
sentences = re.split(r'(?<=[.!?])
\s+'', text)
chunks = []
chunk_index = 0
i = 0
while i < len(sentences):
# このチャンク用の文を取得する
end = min(i + self.sentences_per_chunk, len(sentences))
chunk_sentences = sentences[i:end]
chunk_text = ' '.join(chunk_sentences)
start_char = text.find(chunk_sentences[0])
end_char = start_char + len(chunk_text)
chunks.append({
'text': chunk_text,
'index': chunk_index,
'start': start_char,
'end': end_char,
'size': len(chunk_text),
'sentence_count': len(chunk_sentences)
})
chunk_index += 1
i += self.sentences_per_chunk - self.overlap
return chunks
# 長所: 文の区切りを保持するため文脈がより良い
# 短所: チャンクサイズが可変で、短すぎ/長すぎになる可能性がある
3. セマンティック・チャンク分割(推奨)
# chunking/semantic.py
from typing import List, Dict
import numpy as np
class SemanticChunker:
"""意味的に一貫したチャンクにテキストを分割する"""
def __init__(self, embedder, max_chunk_size: int = 1500, threshold: float = 0.7):
self.embedder = embedder
self.max_chunk_size = max_chunk_size
self.threshold = threshold
async def chunk(self, text: str) -> List[Dict]:
"""意味的なチャンクにテキストを分割する"""
# 1. 文に分割する
sentences = self._split_sentences(text)
# 2. 各文の埋め込み(embedding)を生成する
sentence_embeddings = await self.embedder.embed_documents(sentences)
返却形式: {"translated": "翻訳されたHTML"}# 3. 連続する文同士の類似度を計算する
similarities = [
self._cosine_similarity(sentence_embeddings[i], sentence_embeddings[i+1])
for i in range(len(sentence_embeddings) - 1)
]
# 4. チャンクの境界を特定する(類似度がしきい値を下回る箇所)
boundaries = [0]
for i, sim in enumerate(similarities):
if sim < self.threshold:
boundaries.append(i + 1)
boundaries.append(len(sentences))
# 5. チャンクを作成する
chunks = []
chunk_index = 0
for i in range(len(boundaries) - 1):
start_idx = boundaries[i]
end_idx = boundaries[i+1]
# この区間の文を結合する
chunk_sentences = sentences[start_idx:end_idx]
chunk_text = ' '.join(chunk_sentences)
# チャンクが長すぎる場合はさらに分割する
if len(chunk_text) > self.max_chunk_size:
sub_chunks = self._split_long_chunk(chunk_text, self.max_chunk_size)
for sub_chunk in sub_chunks:
chunks.append({
' text': sub_chunk,
' index': chunk_index,
' type': ' semantic'
})
chunk_index += 1
else:
chunks.append({
' text': chunk_text,
' index': chunk_index,
' sentence_count': len(chunk_sentences),
' type': ' semantic'
})
chunk_index += 1
return chunks
def _split_sentences(self, text:str) -> List[str]:
"""テキストを文に分割する"""
import re
return [s.strip() for s in re.split(r'(?<=[.!?])\s+', text) if s.strip()]def _cosine_similarity(self, vec1: np.ndarray, vec2: np.ndarray) -> float:
"""コサイン類似度を計算します"""
return np.dot(vec1, vec2) / (np.linalg.norm(vec1) * np.linalg.norm(vec2))
def _split_long_chunk(self, text: str, max_size: int) -> List[str]:
"""長いチャンクを小さな部分に分割します"""
# 固定サイズで分割するフォールバック
chunks = []
start = 0
while start < len(text):
end = start + max_size
chunks.append(text[start:end])
start = end - 200 # オーバーラップを追加
return chunks
# 利点: 意味的に一貫しており、取得(リトリーバル)がより良い
# 欠点: 遅い(埋め込みが必要)、より複雑
4. 階層型チャンク化
# chunking/hierarchical.py
class HierarchicalChunker:
"""さまざまな用途に対応するための、多段階のチャンク階層を作成します"""
def __init__(self, embedder):
self.embedder = embedder
async def chunk(self, text: str, document_id: str) -> Dict[str, List[Dict]]:
"""階層的なチャンクを作成します"""
# レベル1: ドキュメント単位(幅広いクエリ向け)
doc_chunk = {
'id': f"{document_id}_doc",
'level': 'document',
'text': text[:2000], # 要約/最初の部分
'metadata': {'type': 'document_summary'}
}
# レベル2: セクション単位(中程度のクエリ向け)
section_chunks = self._chunk_by_sections(text)
# レベル3: 段落単位(特定のクエリ向け)
paragraph_chunks = self._chunk_by_paragraphs(text)
# レベル4: 文単位(精密なクエリ向け)
sentence_chunks = self._chunk_by_sentences(text)
return {
'document': [doc_chunk],
'sections': section_chunks,


