NLP 进阶:RAG 检索增强生成——从幻觉困境到知识锚定的工程实践
一、LLM 的知识盲区与幻觉困境:当模型"自信地胡说"
大语言模型在开放域问答中表现惊艳,但其知识来源于训练数据,存在两个根本性缺陷:知识时效性滞后(训练截止日期之后的事件一无所知)和长尾知识缺失(低频领域的专业知识覆盖不足)。更危险的是,LLM 面对知识盲区时不会坦承"我不知道",而是自信地生成看似合理但完全错误的内容——这就是幻觉(Hallucination)问题。
以企业内部知识库问答为例,当用户询问"公司 2025 年 Q3 的差旅报销政策变更了哪些条款",通用 LLM 只能编造答案,因为这类信息从未出现在其训练数据中。即便使用最新的模型,也无法保证回答的准确性。RAG(Retrieval-Augmented Generation)的核心思路是:不让模型凭记忆回答,而是先从知识库中检索相关文档,再将文档作为上下文提供给模型,让模型基于事实生成回答。
代码是人与机器的对话,而 RAG 更像是给这段对话配备了一座图书馆——不再凭空想象,而是先翻书再回答。如同古人解卦需先查阅典籍,RAG 让生成过程有了知识的锚点。
二、RAG 的双阶段架构:检索与生成的协同机制
RAG 系统分为检索(Retrieval)和生成(Generation)两个阶段。检索阶段将用户查询转换为向量,在知识库中找到语义最相关的文档片段;生成阶段将检索到的文档与用户问题拼接为 Prompt,由 LLM 基于上下文生成回答。
graph TB Q[用户查询] --> QR[查询预处理与改写] QR --> QE[查询向量化 Embedding] QE --> VS[向量相似度检索] VS --> RR[检索结果重排序 Rerank] RR --> CT[上下文组装与截断] subgraph 知识库构建离线 D[原始文档] --> DC[文档切分 Chunking] DC --> DE[文档向量化 Embedding] DE --> VI[向量索引构建<br/>FAISS/Milvus] end VI -.-> VS CT --> LLM[大语言模型生成] LLM --> A[最终回答 + 来源引用] subgraph 质量保障 H[幻觉检测] S[来源追溯] end A --> H A --> S检索质量是 RAG 系统的生命线。向量检索基于语义相似度,但语义相似不等于信息相关——"如何减肥"和"如何增重"语义相近但信息相反。Rerank 阶段使用交叉编码器(Cross-Encoder)对检索结果进行精排,将查询和文档拼接后计算相关性分数,比双塔向量的点积更准确但计算成本更高。
文档切分策略直接影响检索粒度。过大的 Chunk 包含过多无关信息,稀释了关键内容的相似度;过小的 Chunk 丢失上下文,检索到的片段可能无法独立理解。实践中通常按段落或语义边界切分,Chunk 大小在 256-512 Token 之间,相邻 Chunk 之间保留 50-100 Token 的重叠。
三、生产级 RAG 系统实现
以下代码实现了一个完整的 RAG 系统,包含文档处理、向量检索、Rerank 和幻觉检测:
import hashlib import logging from typing import Any, Dict, List, Optional, Tuple from dataclasses import dataclass, field from abc import ABC, abstractmethod import numpy as np logger = logging.getLogger(__name__) @dataclass class Document: """文档数据结构""" content: str metadata: Dict[str, Any] = field(default_factory=dict) doc_id: str = "" def __post_init__(self): if not self.doc_id: self.doc_id = hashlib.md5( self.content.encode() ).hexdigest()[:12] @dataclass class Chunk: """文档片段""" content: str doc_id: str chunk_index: int embedding: Optional[np.ndarray] = None metadata: Dict[str, Any] = field(default_factory=dict) class DocumentChunker: """文档切分器,支持多种切分策略""" def __init__( self, chunk_size: int = 400, chunk_overlap: int = 80, separator: str = "\n", ): if chunk_overlap >= chunk_size: raise ValueError( f"重叠大小 ({chunk_overlap}) 必须小于 " f"块大小 ({chunk_size})" ) self._chunk_size = chunk_size self._chunk_overlap = chunk_overlap self._separator = separator def chunk(self, doc: Document) -> List[Chunk]: """将文档切分为片段""" # 按分隔符拆分 segments = doc.content.split(self._separator) segments = [s.strip() for s in segments if s.strip()] chunks = [] current_text = "" chunk_index = 0 for segment in segments: # 简化的 Token 估算:中文约 1.5 字/Token estimated_tokens = len(current_text) / 1.5 if estimated_tokens + len(segment) / 1.5 > self._chunk_size: if current_text: chunks.append(Chunk( content=current_text.strip(), doc_id=doc.doc_id, chunk_index=chunk_index, metadata={**doc.metadata}, )) chunk_index += 1 # 保留重叠部分 overlap_text = current_text[-self._chunk_overlap * 2:] current_text = overlap_text + self._separator + segment else: current_text = segment else: current_text += self._separator + segment # 处理最后一块 if current_text.strip(): chunks.append(Chunk( content=current_text.strip(), doc_id=doc.doc_id, chunk_index=chunk_index, metadata={**doc.metadata}, )) return chunks class EmbeddingService(ABC): """向量化服务抽象接口""" @abstractmethod async def embed_texts(self, texts: List[str]) -> np.ndarray: """批量文本向量化""" pass @abstractmethod async def embed_query(self, query: str) -> np.ndarray: """查询向量化""" pass class VectorStore(ABC): """向量存储抽象接口""" @abstractmethod async def add(self, chunks: List[Chunk]) -> None: """添加文档片段""" pass @abstractmethod async def search( self, query_embedding: np.ndarray, top_k: int = 5 ) -> List[Tuple[Chunk, float]]: """向量相似度检索""" pass class InMemoryVectorStore(VectorStore): """基于内存的向量存储,适用于中小规模知识库""" def __init__(self): self._chunks: List[Chunk] = [] self._embeddings: Optional[np.ndarray] = None async def add(self, chunks: List[Chunk]) -> None: embeddings = [ c.embedding for c in chunks if c.embedding is not None ] if not embeddings: raise ValueError("所有 Chunk 都缺少 embedding") new_embeddings = np.stack(embeddings) if self._embeddings is None: self._embeddings = new_embeddings else: self._embeddings = np.vstack( [self._embeddings, new_embeddings] ) self._chunks.extend(chunks) async def search( self, query_embedding: np.ndarray, top_k: int = 5 ) -> List[Tuple[Chunk, float]]: if self._embeddings is None or len(self._chunks) == 0: return [] # 余弦相似度 query_norm = query_embedding / ( np.linalg.norm(query_embedding) + 1e-10 ) doc_norms = self._embeddings / ( np.linalg.norm(self._embeddings, axis=1, keepdims=True) + 1e-10 ) similarities = doc_norms @ query_norm top_indices = np.argsort(similarities)[::-1][:top_k] results = [] for idx in top_indices: score = float(similarities[idx]) if score > 0.3: # 最低相似度阈值 results.append((self._chunks[idx], score)) return results class Reranker(ABC): """重排序器抽象接口""" @abstractmethod async def rerank( self, query: str, chunks: List[Chunk], top_k: int = 3 ) -> List[Tuple[Chunk, float]]: pass class LLMReranker(Reranker): """基于 LLM 的重排序器,利用 LLM 判断文档与查询的相关性""" def __init__(self, llm_client: Any): self._llm = llm_client async def rerank( self, query: str, chunks: List[Chunk], top_k: int = 3 ) -> List[Tuple[Chunk, float]]: scored_chunks = [] for chunk in chunks: prompt = ( f"请判断以下文档片段与查询的相关性," f"给出 0-10 的评分。\n\n" f"查询: {query}\n\n" f"文档: {chunk.content[:500]}\n\n" f"相关性评分(仅输出数字):" ) try: response = await self._llm.generate( prompt, temperature=0.0, max_tokens=10 ) score = float(response.strip()) scored_chunks.append((chunk, score / 10.0)) except (ValueError, Exception) as e: logger.warning(f"重排序评分失败: {e}") scored_chunks.append((chunk, 0.5)) scored_chunks.sort(key=lambda x: x[1], reverse=True) return scored_chunks[:top_k] class HallucinationDetector: """幻觉检测器:验证生成内容是否被检索文档支撑""" def __init__(self, llm_client: Any): self._llm = llm_client async def check( self, answer: str, context_chunks: List[Chunk] ) -> Dict[str, Any]: """检测回答中的幻觉内容""" context_text = "\n\n".join( f"[文档{i+1}]: {c.content[:300]}" for i, c in enumerate(context_chunks) ) prompt = ( "请检查以下回答是否被提供的文档内容所支撑。\n" "对于回答中的每个关键声明,判断是否有文档依据。\n\n" f"文档:\n{context_text}\n\n" f"回答:\n{answer}\n\n" "请以 JSON 格式输出:\n" '{"supported_claims": 数量, ' '"unsupported_claims": 数量, ' '"hallucination_risk": "low/medium/high", ' '"details": "具体说明"}' ) try: import json response = await self._llm.generate( prompt, temperature=0.0, max_tokens=300 ) result = json.loads(response.strip()) return result except Exception as e: logger.warning(f"幻觉检测失败: {e}") return { "hallucination_risk": "unknown", "details": f"检测失败: {str(e)}", } class RAGPipeline: """RAG 完整管道""" def __init__( self, embedding_service: EmbeddingService, vector_store: VectorStore, reranker: Optional[Reranker] = None, hallucination_detector: Optional[HallucinationDetector] = None, max_context_tokens: int = 3000, ): self._embedding = embedding_service self._store = vector_store self._reranker = reranker self._hallucination_detector = hallucination_detector self._max_context_tokens = max_context_tokens async def index_documents(self, docs: List[Document]) -> int: """索引文档到知识库""" chunker = DocumentChunker() all_chunks = [] for doc in docs: chunks = chunker.chunk(doc) all_chunks.extend(chunks) # 批量向量化 texts = [c.content for c in all_chunks] embeddings = await self._embedding.embed_texts(texts) for i, chunk in enumerate(all_chunks): chunk.embedding = embeddings[i] await self._store.add(all_chunks) logger.info( f"已索引 {len(docs)} 篇文档," f"共 {len(all_chunks)} 个片段" ) return len(all_chunks) async def query( self, question: str, llm_client: Any, top_k: int = 5, ) -> Dict[str, Any]: """执行 RAG 查询""" # 1. 查询向量化 query_embedding = await self._embedding.embed_query(question) # 2. 向量检索 search_results = await self._store.search( query_embedding, top_k=top_k ) if not search_results: return { "answer": "未找到相关文档,无法回答该问题", "sources": [], "hallucination_risk": "none", } # 3. 重排序 candidate_chunks = [chunk for chunk, _ in search_results] if self._reranker: ranked_results = await self._reranker.rerank( question, candidate_chunks, top_k=3 ) final_chunks = [c for c, _ in ranked_results] else: final_chunks = candidate_chunks[:3] # 4. 组装上下文(控制 Token 预算) context_parts = [] total_tokens = 0 for chunk in final_chunks: estimated_tokens = len(chunk.content) / 1.5 if total_tokens + estimated_tokens > self._max_context_tokens: break context_parts.append(chunk.content) total_tokens += estimated_tokens context_text = "\n\n---\n\n".join(context_parts) # 5. LLM 生成 prompt = ( "请基于以下文档内容回答问题。" "如果文档中没有相关信息,请明确说明。\n\n" f"文档:\n{context_text}\n\n" f"问题: {question}\n\n" "回答(请引用文档来源):" ) answer = await llm_client.generate( prompt, temperature=0.1, max_tokens=500 ) # 6. 幻觉检测 hallucination_result = {"hallucination_risk": "unknown"} if self._hallucination_detector: hallucination_result = ( await self._hallucination_detector.check( answer, final_chunks ) ) return { "answer": answer, "sources": [ {"doc_id": c.doc_id, "chunk_index": c.chunk_index} for c in final_chunks ], "hallucination_risk": hallucination_result.get( "hallucination_risk", "unknown" ), "hallucination_details": hallucination_result.get( "details", "" ), }关键工程实践:文档切分保留重叠区域避免语义断裂;向量检索设置最低相似度阈值过滤无关结果;上下文组装控制 Token 预算防止超出 LLM 窗口;幻觉检测验证生成内容是否被文档支撑。
四、RAG 系统的权衡:检索质量的天花板效应
检索质量决定生成上限:RAG 的回答质量受限于检索结果的相关性。如果关键文档未被检索到,LLM 无法凭空生成正确答案。在知识库规模增大时,向量检索的召回率会下降,需要引入混合检索(向量 + 关键词 BM25)提升召回。
Chunk 粒度的两难:大 Chunk 保留完整上下文但引入噪声,降低检索精度;小 Chunk 检索精准但可能丢失关键上下文。一种折中方案是"检索小 Chunk,返回大 Chunk"——用小 Chunk 做相似度匹配,但返回包含该 Chunk 的完整段落作为上下文。
延迟与质量的取舍:Rerank 阶段显著提升检索质量,但每个候选文档都需要一次 LLM 调用或 Cross-Encoder 推理,延迟从毫秒级上升到秒级。在实时对话场景中,可能需要跳过 Rerank 或使用轻量级模型。
知识库更新的时效性:向量索引构建是离线批处理,新文档入库后需要重新计算 Embedding 并更新索引。在知识频繁更新的场景中,索引更新的延迟成为瓶颈。
禁用场景:答案需要复杂推理而非简单检索的场景(如数学证明),RAG 的检索结果可能反而干扰推理;知识库质量极低(大量错误或过时信息)时,RAG 会放大错误;对延迟要求极高的实时系统,检索+生成的两阶段延迟无法接受。
五、总结
RAG 通过"先检索后生成"的架构,将 LLM 的回答锚定在知识库的事实上,有效缓解了幻觉问题。核心组件包括文档切分、向量化、向量检索、重排序和幻觉检测。生产实践中需关注:Chunk 大小与重叠的平衡、最低相似度阈值过滤、Token 预算控制、混合检索提升召回率。RAG 适用于知识密集型问答场景,但在需要复杂推理或对延迟极度敏感的场景中需谨慎评估,必要时应结合其他技术方案。