Tool Use工程实战:让LLM精准调用外部工具的完整方案
前言
Embedding(向量嵌入)是RAG、语义搜索、推荐系统的基础。2026年,随着多模态模型的成熟,Embedding已经从纯文本向量化演进到文本+图像+音频的统一语义空间。本文系统梳理Embedding工程的核心知识:模型选型、工程优化、评估方法、以及跨模态检索的实践方案。—## 一、2026年Embedding模型全景### 1.1 文本Embedding主流模型| 模型 | 维度 | 最大长度 | 多语言 | 推荐场景 ||-----|-----|---------|-------|---------|| text-embedding-3-large | 3072 | 8191 tokens | ❌ | 英文高精度 || text-embedding-3-small | 1536 | 8191 tokens | ❌ | 英文高效 || BGE-M3 | 1024 | 8192 tokens | ✅ | 中英文RAG首选 || E5-mistral-7b | 4096 | 32768 tokens | ✅ | 长文档 || Jina-embeddings-v3 | 1024 | 8192 tokens | ✅ | 多任务 || GTE-Qwen2-7B | 3584 | 32768 tokens | ✅ | 中文最强 || Voyage-3-large | 1024 | 32768 tokens | ✅ | 代码+文本 |2026年中文RAG首选:BGE-M3(开源、中英文均衡)或GTE-Qwen2-7B(中文最强,资源充足时)### 1.2 多模态Embedding| 模型 | 支持模态 | 用途 ||-----|---------|-----|| CLIP / OpenCLIP | 文本+图像 | 图文检索 || ImageBind | 6种模态 | 跨模态搜索 || Nomic Embed Vision | 文本+图像 | 文档图像理解 || Voyage Multimodal | 文本+图像 | RAG中的图文混合 |—## 二、本地部署Embedding服务### 2.1 使用sentence-transformerspythonfrom sentence_transformers import SentenceTransformerimport numpy as npimport torchclass EmbeddingService: def __init__(self, model_name: str = "BAAI/bge-m3", device: str = None): self.device = device or ("cuda" if torch.cuda.is_available() else "cpu") self.model = SentenceTransformer(model_name, device=self.device) self.model_name = model_name print(f"✅ Embedding模型加载完成:{model_name},设备:{self.device}") def embed_text(self, text: str) -> np.ndarray: """单文本向量化""" return self.model.encode(text, normalize_embeddings=True) def embed_batch( self, texts: list[str], batch_size: int = 32, show_progress: bool = False ) -> np.ndarray: """批量向量化,自动分批处理""" embeddings = self.model.encode( texts, batch_size=batch_size, normalize_embeddings=True, show_progress_bar=show_progress ) return embeddings def embed_for_retrieval(self, query: str = None, passage: str = None) -> np.ndarray: """针对检索优化的向量化(BGE系列支持query/passage区分)""" if query is not None: # BGE建议在query前加"Represent this sentence for searching relevant passages: " text = f"Represent this sentence for searching relevant passages: {query}" elif passage is not None: text = passage else: raise ValueError("query或passage至少提供一个") return self.model.encode(text, normalize_embeddings=True) def similarity(self, embedding1: np.ndarray, embedding2: np.ndarray) -> float: """计算余弦相似度(归一化后等于点积)""" return float(np.dot(embedding1, embedding2)) def batch_similarity(self, query_emb: np.ndarray, doc_embs: np.ndarray) -> np.ndarray: """批量计算查询与文档集合的相似度""" return np.dot(doc_embs, query_emb)# 使用示例service = EmbeddingService("BAAI/bge-m3")query_emb = service.embed_for_retrieval(query="什么是RAG?")doc_emb = service.embed_for_retrieval(passage="RAG(检索增强生成)是一种将检索系统与语言模型结合的技术...")print(f"相似度:{service.similarity(query_emb, doc_emb):.4f}")text### 2.2 高性能Embedding API服务将Embedding服务部署为FastAPI:pythonfrom fastapi import FastAPI, HTTPExceptionfrom pydantic import BaseModelimport asynciofrom concurrent.futures import ThreadPoolExecutorimport numpy as npapp = FastAPI(title="Embedding Service")# 全局模型实例(单例)embedding_service = Noneexecutor = ThreadPoolExecutor(max_workers=4)@app.on_event("startup")async def startup(): global embedding_service embedding_service = EmbeddingService("BAAI/bge-m3")class EmbedRequest(BaseModel): texts: list[str] task_type: str = "retrieval_document" # retrieval_query | retrieval_document | classificationclass EmbedResponse(BaseModel): embeddings: list[list[float]] model: str usage: dict@app.post("/embed", response_model=EmbedResponse)async def embed(request: EmbedRequest): """向量化接口""" if len(request.texts) > 100: raise HTTPException(status_code=400, detail="单次最多100条文本") # 在线程池中运行(避免阻塞事件循环) loop = asyncio.get_event_loop() if request.task_type == "retrieval_query": embeddings = await loop.run_in_executor( executor, lambda: [embedding_service.embed_for_retrieval(query=t) for t in request.texts] ) else: embeddings = await loop.run_in_executor( executor, lambda: embedding_service.embed_batch(request.texts) ) # 转为list格式(JSON序列化) emb_list = [e.tolist() for e in embeddings] return EmbedResponse( embeddings=emb_list, model=embedding_service.model_name, usage={ "total_texts": len(request.texts), "total_chars": sum(len(t) for t in request.texts) } )@app.get("/health")async def health(): return {"status": "ok", "model": embedding_service.model_name if embedding_service else "loading"}text—## 三、向量索引与检索优化### 3.1 FAISS高效检索pythonimport faissimport numpy as npimport picklefrom pathlib import Pathclass VectorStore: def __init__(self, dim: int, index_type: str = "IVF"): self.dim = dim self.documents = [] # 存储原始文档 self.metadatas = [] # 存储元数据 self.index = self._create_index(index_type) def _create_index(self, index_type: str) -> faiss.Index: """创建FAISS索引""" if index_type == "Flat": # 精确搜索,适合<10万数据 return faiss.IndexFlatIP(self.dim) # 内积(归一化后=余弦相似度) elif index_type == "IVF": # 倒排索引,适合10万-1000万数据 quantizer = faiss.IndexFlatIP(self.dim) nlist = 100 # 聚类中心数 index = faiss.IndexIVFFlat(quantizer, self.dim, nlist, faiss.METRIC_INNER_PRODUCT) return index elif index_type == "HNSW": # 层次导航小世界图,高召回率低延迟 M = 16 # 每个节点的连接数 index = faiss.IndexHNSWFlat(self.dim, M, faiss.METRIC_INNER_PRODUCT) index.hnsw.efConstruction = 200 return index raise ValueError(f"不支持的索引类型:{index_type}") def add_documents(self, documents: list[str], embeddings: np.ndarray, metadatas: list[dict] = None): """添加文档到索引""" if not self.index.is_trained: # IVF索引需要先训练 self.index.train(embeddings.astype('float32')) start_id = len(self.documents) self.index.add(embeddings.astype('float32')) self.documents.extend(documents) self.metadatas.extend(metadatas or [{} for _ in documents]) print(f"✅ 添加 {len(documents)} 条文档,总计 {len(self.documents)} 条") def search(self, query_embedding: np.ndarray, k: int = 5, score_threshold: float = 0.5) -> list[dict]: """向量检索""" if len(self.documents) == 0: return [] # 设置搜索参数 if hasattr(self.index, 'nprobe'): self.index.nprobe = 10 # IVF搜索的聚类数 query = query_embedding.reshape(1, -1).astype('float32') scores, indices = self.index.search(query, k) results = [] for score, idx in zip(scores[0], indices[0]): if idx == -1: # FAISS用-1表示无效结果 continue if score < score_threshold: # 过滤低相似度 continue results.append({ "document": self.documents[idx], "metadata": self.metadatas[idx], "score": float(score), "index": int(idx) }) return results def save(self, path: str): """持久化索引""" path = Path(path) path.mkdir(exist_ok=True) faiss.write_index(self.index, str(path / "index.faiss")) with open(path / "data.pkl", "wb") as f: pickle.dump({"documents": self.documents, "metadatas": self.metadatas}, f) print(f"✅ 索引保存到 {path}") @classmethod def load(cls, path: str, dim: int) -> "VectorStore": """加载已有索引""" path = Path(path) store = cls(dim=dim) store.index = faiss.read_index(str(path / "index.faiss")) with open(path / "data.pkl", "rb") as f: data = pickle.load(f) store.documents = data["documents"] store.metadatas = data["metadatas"] return storetext### 3.2 混合检索(Dense + Sparse)结合向量检索和BM25关键词检索,提升召回率:pythonfrom rank_bm25 import BM25Okapiimport jieba # 中文分词class HybridRetriever: def __init__(self, embedding_service, vector_store, alpha: float = 0.7): """ alpha: 向量检索权重,(1-alpha)为BM25权重 经验值:RAG通常0.7效果好,关键词匹配重要时用0.3-0.5 """ self.embedding_service = embedding_service self.vector_store = vector_store self.alpha = alpha self.bm25 = None self._documents = [] def build_bm25_index(self, documents: list[str]): """构建BM25索引(中文分词)""" self._documents = documents # 中文分词 tokenized = [list(jieba.cut(doc)) for doc in documents] self.bm25 = BM25Okapi(tokenized) print(f"✅ BM25索引构建完成,{len(documents)} 条文档") def search(self, query: str, k: int = 5) -> list[dict]: """混合检索:向量 + BM25""" # 1. 向量检索 query_emb = self.embedding_service.embed_for_retrieval(query=query) dense_results = self.vector_store.search(query_emb, k=k*2) # 多取一些用于融合 # 2. BM25检索 query_tokens = list(jieba.cut(query)) bm25_scores = self.bm25.get_scores(query_tokens) top_bm25_indices = np.argsort(bm25_scores)[::-1][:k*2] # 3. 倒数排序融合(RRF) rrf_scores = {} for rank, result in enumerate(dense_results): doc_idx = result["index"] rrf_scores[doc_idx] = rrf_scores.get(doc_idx, 0) + self.alpha / (rank + 60) for rank, idx in enumerate(top_bm25_indices): rrf_scores[int(idx)] = rrf_scores.get(int(idx), 0) + (1 - self.alpha) / (rank + 60) # 4. 按融合分数排序 sorted_indices = sorted(rrf_scores.keys(), key=lambda i: rrf_scores[i], reverse=True) results = [] for idx in sorted_indices[:k]: if idx < len(self._documents): results.append({ "document": self._documents[idx], "hybrid_score": rrf_scores[idx], "index": idx }) return resultstext—## 四、多模态检索实战### 4.1 图文混合RAGpythonimport base64from openai import OpenAIclient = OpenAI()class MultimodalRetriever: """支持图文混合的检索系统""" def __init__(self): self.text_store = VectorStore(dim=1024) self.image_store = VectorStore(dim=512) # CLIP维度 self.image_paths = [] def add_image(self, image_path: str, description: str = None): """添加图片到检索库""" # 用CLIP提取图片特征 # 实际需要加载CLIP模型 image_embedding = self._get_image_embedding(image_path) metadata = { "type": "image", "path": image_path, "description": description or "" } self.image_store.add_documents( [description or image_path], np.array([image_embedding]), [metadata] ) self.image_paths.append(image_path) def _get_image_embedding(self, image_path: str) -> np.ndarray: """用CLIP提取图片向量(需要预加载CLIP模型)""" # 简化示例,实际使用open_clip库 return np.random.randn(512) # 占位符 def search_by_text(self, query: str, k: int = 3) -> list[dict]: """用文本查询跨模态检索(文本+图片)""" # 文本检索 text_results = self.text_store.search( self._embed_text(query), k=k ) # 图片检索(用文本查图) image_results = self.image_store.search( self._embed_text_for_image(query), k=k ) # 合并结果 all_results = text_results + image_results all_results.sort(key=lambda x: x["score"], reverse=True) return all_results[:k] def _embed_text(self, text: str) -> np.ndarray: return np.random.randn(1024) # 占位,替换为实际embedding def _embed_text_for_image(self, text: str) -> np.ndarray: return np.random.randn(512) # 占位,替换为CLIP text encodertext—## 五、Embedding质量评估### 5.1 系统评估框架pythonfrom sklearn.metrics.pairwise import cosine_similaritydef evaluate_embedding_quality( embedding_service, test_pairs: list[dict], # [{"query": ..., "relevant_doc": ..., "irrelevant_doc": ...}]) -> dict: """评估Embedding模型的检索质量 test_pairs格式: [{"query": "什么是RAG", "relevant_doc": "RAG是...", "irrelevant_doc": "深度学习是..."}] """ correct = 0 margins = [] for pair in test_pairs: query_emb = embedding_service.embed_for_retrieval(query=pair["query"]) relevant_emb = embedding_service.embed_for_retrieval(passage=pair["relevant_doc"]) irrelevant_emb = embedding_service.embed_for_retrieval(passage=pair["irrelevant_doc"]) rel_score = float(np.dot(query_emb, relevant_emb)) irrel_score = float(np.dot(query_emb, irrelevant_emb)) if rel_score > irrel_score: correct += 1 margins.append(rel_score - irrel_score) return { "accuracy": correct / len(test_pairs), "avg_margin": sum(margins) / len(margins), "min_margin": min(margins), "hard_cases": sum(1 for m in margins if m < 0.1) # 难例数量 }text—## 总结Embedding工程在2026年已经高度成熟,选择正确的工具并合理优化,可以构建高性能的语义检索系统:1.模型选型:中文RAG用BGE-M3,长文档用E5-mistral,追求极致用GTE-Qwen2-7B2.索引优化:小数据量用Flat,百万级用IVF/HNSW3.混合检索:RRF融合Dense+BM25,比单一方法提升10-20%召回4.持续评估:建立评估集,定期验证模型质量Embedding是RAG的地基,地基打稳了,上层应用才能稳固。
