设计 RAG 问答系统
一句话速记
RAG(Retrieval-Augmented Generation)= 检索(Retrieval)+ 生成(Generation)。用户提问 → Embedding 向量化 → 向量数据库检索最相关的文档片段 → 拼接到 Prompt → LLM 生成答案。核心挑战:检索质量(召回率 + 精准率)、LLM 幻觉控制(只基于检索内容回答)、延迟(向量检索 + LLM 推理的叠加)。
系统架构
离线(Indexing Pipeline):
文档 → 分块(Chunking)→ Embedding 模型 → 向量 → 向量数据库(Milvus/Qdrant)
→ 原文 → 文档存储(MongoDB/ES)
在线(Query Pipeline):
用户问题 → Query Embedding → 向量检索(Top-K)→ Rerank(重排序)
→ 构建 Prompt(问题 + 检索到的上下文)→ LLM → 流式输出答案
核心组件设计
1)文档处理(离线 Indexing)
分块策略(Chunking):
from langchain.text_splitter import RecursiveCharacterTextSplitter
# 固定大小分块(简单但可能切断语义)
splitter = RecursiveCharacterTextSplitter(
chunk_size=512, # 每块 512 个 Token
chunk_overlap=50, # 前后 50 Token 重叠(保留语义连续性)
separators=["\n\n", "\n", "。", " "] # 优先按段落,再按句子
)
chunks = splitter.split_text(document)
# 更好的策略:语义分块(按语义边界,而不是固定大小)
# 用 LLM 识别段落主题,相同主题的句子归为一块
# 每个 chunk 附加元数据(用于过滤和溯源)
chunk_with_metadata = {
"text": chunk,
"metadata": {
"doc_id": "doc-123",
"source": "产品手册.pdf",
"page": 5,
"section": "第二章 安装指南",
"created_at": "2024-01-01"
}
}Embedding 向量化:
from openai import OpenAI
client = OpenAI()
def embed_text(text: str) -> list[float]:
"""将文本转换为向量"""
response = client.embeddings.create(
model="text-embedding-3-small", # 1536 维
input=text
)
return response.data[0].embedding # List[float],长度 1536
# 批量 Embedding(节省 API 调用次数)
def embed_batch(texts: list[str]) -> list[list[float]]:
response = client.embeddings.create(
model="text-embedding-3-small",
input=texts # 最多 2048 条
)
return [item.embedding for item in response.data]向量写入 Milvus:
from pymilvus import Collection, CollectionSchema, FieldSchema, DataType
# 创建集合(表)
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="chunk_id", dtype=DataType.VARCHAR, max_length=64),
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=2048),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1536),
FieldSchema(name="doc_id", dtype=DataType.VARCHAR, max_length=64),
FieldSchema(name="source", dtype=DataType.VARCHAR, max_length=256),
]
schema = CollectionSchema(fields)
collection = Collection("knowledge_base", schema)
# 创建 HNSW 索引(检索速度 vs 精度的平衡)
collection.create_index("embedding", {
"index_type": "HNSW",
"metric_type": "COSINE", # 余弦相似度
"params": {"M": 16, "efConstruction": 200}
})2)在线查询
向量检索 + Rerank:
def retrieve(query: str, top_k: int = 20) -> list[dict]:
# 1. Query Embedding
query_embedding = embed_text(query)
# 2. 向量检索(初召回 Top-20,比最终 Top-5 多几倍)
results = collection.search(
data=[query_embedding],
anns_field="embedding",
param={"metric_type": "COSINE", "params": {"ef": 100}},
limit=top_k,
output_fields=["chunk_id", "text", "source"]
)
candidates = [{"text": r.entity.get("text"),
"source": r.entity.get("source"),
"score": r.score} for r in results[0]]
# 3. Rerank(精排,提升精准率)
# 用 Cross-Encoder 模型(比 Bi-Encoder 精度高,但速度慢)
reranked = reranker.rerank(query, candidates, top_n=5)
return reranked
# Reranker 选项:
# - Cohere Rerank API(调用简单)
# - BGE-Reranker(开源,本地部署)
# - LLM as Reranker(用 LLM 给候选打分,效果好但贵)构建 Prompt + LLM 生成:
def answer(query: str) -> str:
# 1. 检索相关上下文
contexts = retrieve(query, top_k=20)
# 2. 构建 Prompt
context_text = "\n\n---\n\n".join([
f"[来源: {c['source']}]\n{c['text']}"
for c in contexts
])
prompt = f"""你是一个专业助手,请根据以下参考资料回答用户问题。
如果参考资料中没有相关信息,请明确说明"根据已有资料,无法回答该问题",不要编造答案。
参考资料:
{context_text}
用户问题:{query}
回答:"""
# 3. 流式调用 LLM
stream = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
stream=True,
temperature=0.1 # 低温度,减少随机性(准确性优先)
)
# 4. 流式返回(SSE)
for chunk in stream:
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content3)检索质量优化
混合检索(Hybrid Search):
# 向量检索(语义相似)+ 关键词检索(精确匹配)
# 通过 RRF(Reciprocal Rank Fusion)融合两路结果
def hybrid_search(query: str) -> list[dict]:
# 向量检索(语义)
vector_results = milvus.search(embed(query), top_k=20)
# BM25 关键词检索(精确,适合专有名词)
keyword_results = elasticsearch.search({"match": {"text": query}}, size=20)
# RRF 融合(k=60 是经验值)
k = 60
scores = {}
for rank, doc in enumerate(vector_results):
scores[doc.id] = scores.get(doc.id, 0) + 1 / (k + rank + 1)
for rank, doc in enumerate(keyword_results):
scores[doc.id] = scores.get(doc.id, 0) + 1 / (k + rank + 1)
# 按融合得分排序
return sorted(scores.items(), key=lambda x: x[1], reverse=True)[:10]Query 优化:
# HyDE(Hypothetical Document Embeddings)
# 先让 LLM 生成一个假设性答案,用答案向量去检索(比 Query 向量更接近文档分布)
def hyde_search(query: str) -> list[dict]:
hypothetical_answer = llm.generate(
f"请简短回答以下问题(不超过100字):{query}"
)
return retrieve(hypothetical_answer + " " + query) # 问题 + 假设答案组合检索
# Query 扩展(Query Expansion)
# 生成同义词/相关表达,扩大召回
def expand_query(query: str) -> list[str]:
variations = llm.generate(
f"为以下问题生成3个语义相近的表达方式,用换行分隔:\n{query}"
)
return [query] + variations.split("\n")4)系统可观测性
# 每次查询记录评估指标
@dataclass
class QueryLog:
query: str
retrieved_chunks: list[str]
answer: str
latency_ms: float
context_relevance: float # 检索相关性评分(0-1)
answer_faithfulness: float # 答案忠实度(是否基于检索内容)
# 用 RAGAS 框架自动评估
from ragas import evaluate
from ragas.metrics import context_recall, context_precision, faithfulness, answer_relevancy
result = evaluate(
dataset,
metrics=[context_recall, context_precision, faithfulness, answer_relevancy]
)5)延迟优化
端到端延迟 = Embedding(50ms)+ 向量检索(20ms)+ Rerank(100ms)+ LLM(1-5s)
优化方向:
1. 流式输出(Streaming):LLM 生成第一个 token 就开始返回(TTFT < 500ms)
2. 缓存常见问题的答案(Redis,TTL=1h)
3. Rerank 可选(简单问题跳过,复杂问题启用)
4. 更小的 LLM(GPT-4o-mini 替代 GPT-4o,速度 3x,成本 10x 低)
5. 向量检索并行化(多集合同时检索,取 union)
延伸追问
- Q:如何评估 RAG 系统质量? → 用 RAGAS 框架评估四个维度:Context Recall(检索覆盖率)、Context Precision(检索精准率)、Faithfulness(答案是否基于检索内容)、Answer Relevancy(答案是否回答了问题)。可以用 LLM 作为 Judge 自动打分(LLM-as-Judge)。
- Q:向量数据库选哪个? → Milvus(开源,功能完整,支持分布式)用于自托管大规模;Pinecone(云服务,开箱即用)用于快速验证;Qdrant(Rust 实现,性能好,也开源);pgvector(PostgreSQL 扩展,适合已有 PG 的场景,省去额外组件)。
我的记法
- RAG = 分块 → Embedding → 向量库 → 检索 → Rerank → Prompt → LLM
- 检索:向量(语义)+ BM25(关键词)混合,RRF 融合
- 质量:Rerank + HyDE + Query 扩展
- 评估:RAGAS(Recall/Precision/Faithfulness/Relevancy)
- 延迟:流式输出 + 缓存热门问题 + 小模型替代
- 一句话:「文档向量化入库,问题向量化检索,检索结果拼 Prompt 给 LLM 生成答案」
状态
- 已背速记
- 能画 RAG 的离线和在线架构
- 能解释混合检索(向量+BM25)