企业知识库与流程自动化 Agent 项目的完整实践
项目:NexusAgent|技术栈:FastAPI、DeepSeek、LangGraph、PostgreSQL、原生前端
1. 项目背景与最终目标
NexusAgent 的出发点,是把企业里分散的资料、问答需求和流程动作连接起来。很多知识库产品只解决“搜资料”的问题,很多聊天机器人只解决“自然语言回答”的问题,但真实业务里常常还需要执行动作:查询数据、计算结果、生成图表、保留来源、记录过程,并且让不同用户的历史对话互相隔离。
- 知识库能力:上传 TXT、Markdown、PDF 后自动解析、分块并进入检索。
- 问答能力:基于来源片段生成答案,命中不足时明确拒答。
- Agent 能力:根据用户意图自动规划工具并组合知识库结果。
2. 整体架构与技术选型
项目采用前后端分离但部署上保持轻量的结构。后端使用 FastAPI 统一提供认证、文档、知识库、对话和 Agent 接口;前端使用原生 HTML、CSS、JavaScript 构建工作台;大模型能力由 DeepSeek API 提供;任务编排由 LangGraph 承担;PostgreSQL 保存用户与历史对话;文档和知识片段在当前版本中仍使用本地 JSON 文件保存。
FastAPI 入口与接口组织
后端入口集中在 Backend/App.py。它既挂载前端静态资源,也暴露核心 API。这样的结构让本地启动和 Render 部署都比较简单:一个 Uvicorn 服务即可同时承载页面和接口。
源码摘录:Backend/App.py
App = FastAPI(title="NexusAgent", version="0.2.0") App.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) App.mount("/Frontend", StaticFiles(directory=FrontendDirectory), name="Frontend") GeneratedChartsDirectory.mkdir(parents=True, exist_ok=True) App.mount("/GeneratedCharts", StaticFiles(directory=GeneratedChartsDirectory), name="GeneratedCharts") @App.post("/Chat/AgentAsk", response_model=ChatResponse) async def AskWithAgent(Request: ChatRequest, CurrentUser=Depends(RequireCurrentUser)): return await ExecuteAgentAsk(Request, CurrentUser)3. 知识库与 RAG 问答实现
知识库的第一步是让资料进入系统。NexusAgent 支持 TXT、Markdown 和 PDF。上传后,DocumentService 会根据文件类型抽取文本、做基础清洗,然后按段落进行分块。分块结果会写入 KnowledgeItem,每个片段都保留 SourceId、标题、正文、文档 ID 和 ChunkIndex,方便后续引用。
源码摘录:Backend/DocumentService.py - 文档上传与分块
async def ExtractUpload(self, FileData: UploadFile): FileName = FileData.filename or "UploadedDocument" Suffix = Path(FileName).suffix.lower() if Suffix not in self.SupportedSuffixes: raise HTTPException(status_code=400, detail="仅支持上传 TXT、Markdown 或 PDF 文档。") RawContent = await FileData.read() if not RawContent: raise HTTPException(status_code=400, detail="上传文件为空。") if Suffix == ".pdf": Content = self.ExtractPdfText(RawContent) else: Content = self.DecodeText(RawContent) Content = self.NormalizeText(Content) if not Content: raise HTTPException(status_code=400, detail="未能从文件中解析出文本内容。") return FileName, Content def BuildChunks(self, Content, ChunkSize=1200, Overlap=120): Paragraphs = [Paragraph.strip() for Paragraph in re.split(r"\n\s*\n", Content) if Paragraph.strip()] Chunks = [] Current = "" for Paragraph in Paragraphs: Candidate = f"{Current}\n\n{Paragraph}".strip() if Current else Paragraph if len(Candidate) <= ChunkSize: Current = Candidate continue if Current: Chunks.append(Current) Current = Paragraph while len(Current) > ChunkSize: Chunks.append(Current[:ChunkSize]) Current = Current[ChunkSize - Overlap :] if Current: Chunks.append(Current) return Chunks or [Content[:ChunkSize]]在检索实现上,项目没有一开始接入复杂向量数据库,而是先实现了一个本地稀疏检索:对中文做 2 到 6 字的 n-gram 扩展,对英文、数字、符号关键词直接保留,再根据标题命中、正文命中、完整查询命中等规则累积分数。这种方案不完美,但非常适合 MVP 阶段快速验证 RAG 链路。
源码摘录:Backend/RuntimeStore.py - 本地检索
def ExtractSearchTerms(self, Query): CleanQuery = Query.lower() Terms = re.findall(r"[a-zA-Z0-9_#+.%/-]+|[\u4e00-\u9fff]+", CleanQuery) ExpandedTerms = [] for Term in Terms: if re.fullmatch(r"[\u4e00-\u9fff]+", Term): ExpandedTerms.append(Term) for Size in (2, 3, 4, 5, 6): ExpandedTerms.extend(Term[Index : Index + Size] for Index in range(0, len(Term) - Size + 1)) else: ExpandedTerms.append(Term) StopTerms = { "我们", "公司", "这个", "那个", # ... 省略非关键实现 ... "进行", "当前", } return [Term for Term in ExpandedTerms if Term and Term not in StopTerms] def SearchKnowledge(self, Query, Limit=4): QueryTerms = self.ExtractSearchTerms(Query) Results = [] for Item in self.KnowledgeItems: SearchText = f"{Item.Title} {Item.Content}".lower() Score = 0 if Query.lower() in SearchText: Score += 24 for Term in QueryTerms: Count = SearchText.count(Term) if Count == 0: continue if Term in Item.Title.lower(): Score += Count * 5 elif len(Term) >= 3: Score += Count * 2 else: Score += Count if Term in Item.VectorTerms: Score += 1 if Score > 0: Snippet = Item.Content[:220] + ("..." if len(Item.Content) > 220 else "") # ... 省略非关键实现 ... ) ) Results.sort(key=lambda Item: Item.Score, reverse=True) return Results[:Limit]RAG 问答流程的关键不是“让模型回答”,而是让模型基于可追溯的来源回答。当没有命中 Sources 时,服务会直接提示需要先上传相关文档,避免模型编造不存在的依据。
DeepSeek API 调用封装
模型调用集中封装在 DeepSeekService 中。GenerateAnswer 会组装 system prompt、最近对话上下文和本轮用户 prompt,然后请求 DeepSeek 的 /chat/completions 接口。这里也处理了缺少 API Key、请求超时、HTTP 错误和网络错误等兜底逻辑。
源码摘录:Backend/DeepSeekService.py - 调用 DeepSeek API
async def GenerateAnswer(self, SystemPrompt, ConversationMessages, UserPrompt, Sources, ToolCalls): if not self.Settings.DeepSeekApiKey: return self.GenerateLocalFallbackAnswer(Sources, ToolCalls, "当前未配置 DEEPSEEK_API_KEY。") Messages = [{"role": "system", "content": SystemPrompt}] Messages.extend(ConversationMessages[-8:]) Messages.append({"role": "user", "content": UserPrompt}) Payload = { "model": self.Settings.DeepSeekModel, "messages": Messages, "stream": False, "temperature": 0.2, } Headers = { "Authorization": f"Bearer {self.Settings.DeepSeekApiKey}", "Content-Type": "application/json", } try: async with httpx.AsyncClient(timeout=self.Settings.RequestTimeoutSeconds) as Client: Response = await Client.post( f"{self.Settings.DeepSeekBaseUrl}/chat/completions", headers=Headers, json=Payload, ) Response.raise_for_status() Data = Response.json() except httpx.TimeoutException: return self.GenerateLocalFallbackAnswer(Sources, ToolCalls, "DeepSeek 请求超时,已返回本地执行摘要。") except httpx.HTTPStatusError as Error: return self.GenerateLocalFallbackAnswer(Sources, ToolCalls, f"DeepSeek API 返回 {Error.response.status_code},已返回本地执行摘要。") except httpx.RequestError as Error: Detail = str(Error) or Error.__class__.__name__ return self.GenerateLocalFallbackAnswer(Sources, ToolCalls, f"DeepSeek 网络请求失败:{Detail}。已返回本地执行摘要。") return Data["choices"][0]["message"]["content"].strip()4. Agent 工作流与工具调用
当项目从 RAG 问答升级到 Agent 自动化时,最大的变化是:系统不再只是检索资料并生成回答,而是需要判断用户意图、决定是否调用工具、执行工具并把结果整合进最终答案。NexusAgent 使用 LangGraph 把这个过程拆成固定节点,既方便调试,也方便前端展示执行轨迹。
源码摘录:Backend/AgentWorkflow.py - LangGraph 编排
class AgentWorkflowState(TypedDict, total=False): Message: str ConversationMessages: List[Dict[str, str]] Intent: str Sources: List[SourceItem] ToolPlans: List[Dict[str, Any]] ToolCalls: List[ToolCallItem] WorkflowSteps: List[WorkflowStepItem] Answer: str class AgentWorkflow: def __init__(self): Graph = StateGraph(AgentWorkflowState) Graph.add_node("IdentifyIntent", self.IdentifyIntent) Graph.add_node("RetrieveKnowledge", self.RetrieveKnowledge) Graph.add_node("PlanTools", self.PlanTools) Graph.add_node("ExecuteTools", self.ExecuteTools) Graph.add_node("GenerateAnswer", self.GenerateAnswer) Graph.set_entry_point("IdentifyIntent") Graph.add_edge("IdentifyIntent", "RetrieveKnowledge") Graph.add_edge("RetrieveKnowledge", "PlanTools") Graph.add_edge("PlanTools", "ExecuteTools") Graph.add_edge("ExecuteTools", "GenerateAnswer") Graph.add_edge("GenerateAnswer", END) self.CompiledGraph = Graph.compile() async def Run(self, Message, ConversationMessages): InitialState = { "Message": Message, "ConversationMessages": ConversationMessages, "WorkflowSteps": [], # ... 省略非关键实现 ... def AppendStep(self, State, StepName, Status, Detail): Steps = list(State.get("WorkflowSteps", [])) Steps.append(WorkflowStepItem(StepName=StepName, Status=Status, Detail=Detail)) return Steps工具规划采用关键词触发的轻量方案。比如用户提到“时间”“几点”会触发当前时间工具,提到“工时”“考勤”“月报”会触发工时统计,提到“销售”“奖金”会触发销售数据与奖金计算,提到“图表”“折线图”则会触发图表生成工具。
源码摘录:Backend/AgentTools.py - 工具规划
def PlanTools(self, Message): LowerMessage = Message.lower() Plans = [] WantsChart = any(Keyword in Message for Keyword in ("画图", "图表", "折线图", "柱形图", "柱状图", "趋势图", "可视化")) if any(Keyword in Message for Keyword in ("时间", "几点", "日期", "今天")): Plans.append({"ToolName": "GetCurrentTime", "Input": {"Timezone": "Asia/Shanghai"}}) if any(Keyword in Message for Keyword in ("工时", "打卡", "考勤", "月报", "出勤", "日报")): Plans.append({"ToolName": "QueryWorkHours", "Input": {"Month": self.ExtractMonth(Message)}}) if any(Keyword in Message for Keyword in ("销售", "奖金", "营收", "业绩", "华东", "华南", "华北")): Region = self.ExtractRegion(Message) Plans.append({"ToolName": "QuerySalesData", "Input": {"Region": Region}}) if "奖金" in Message: Plans.append({"ToolName": "CalculateSalesBonus", "Input": {"Region": Region}}) Expression = self.ExtractMathExpression(Message) if Expression: Plans.append({"ToolName": "SafeCalculate", "Input": {"Expression": Expression}}) if "calculate" in LowerMessage and not Expression: Plans.append({"ToolName": "SafeCalculate", "Input": {"Expression": Message}}) if WantsChart: Plans.append( { "ToolName": "RenderDocumentChart", "Input": { "ChartType": self.ExtractChartType(Message), "DataKind": self.ExtractChartDataKind(Message), "Query": Message, "Month": self.ExtractMonth(Message), }, } ) return Plans为了避免工具变成不可控的黑盒,项目把每次工具调用都包装成 ToolCallItem,其中包含工具名、输入、输出、状态和错误信息。这样前端可以把工具执行结果展示给用户,后端也可以把失败原因记录进运行日志。
源码摘录:Backend/AgentTools.py - 安全计算与图表生成
def SafeCalculate(self, Expression): if not Expression or len(Expression) > 120: raise ValueError("数学表达式为空或过长。") ParsedExpression = ast.parse(Expression, mode="eval") return {"Expression": Expression, "Result": self.EvaluateAst(ParsedExpression.body)} def EvaluateAst(self, Node): if isinstance(Node, ast.Constant) and isinstance(Node.value, (int, float)): return Node.value if isinstance(Node, ast.BinOp) and type(Node.op) in self.AllowedOperators: Left = self.EvaluateAst(Node.left) Right = self.EvaluateAst(Node.right) if isinstance(Node.op, ast.Pow) and abs(Right) > 8: raise ValueError("指数过大,已拒绝执行。") return self.AllowedOperators[type(Node.op)](Left, Right) if isinstance(Node, ast.UnaryOp) and type(Node.op) in self.AllowedOperators: return self.AllowedOperators[type(Node.op)](self.EvaluateAst(Node.operand)) raise ValueError("表达式包含不允许的内容。") def RenderDocumentChart(self, ChartType, DataKind, Query, Month): Series = self.BuildChartSeries(ChartType, DataKind, Query, Month) if len(Series["Points"]) < 2: raise ValueError("未在已上传文档中找到足够的数值数据,至少需要两个数据点才能画图。") FileName = f"Chart-{uuid4().hex[:12]}.svg" GeneratedChartsDirectory.mkdir(parents=True, exist_ok=True) ChartPath = GeneratedChartsDirectory / FileName ChartPath.write_text( self.BuildChartSvg( Series["Title"], Series["Points"], Series["ChartType"], Series["XLabel"], Series["YLabel"], ), encoding="utf-8", ) # ... 省略非关键实现 ... "ChartUrl": ChartUrl, "ChartMarkdown": f"![{Series['Title']}]({ChartUrl})", "Source": Series["Source"], }5. 用户系统、历史对话与运行日志
企业级 Agent 和个人 Demo 最大的区别之一,是需要区分用户、保留上下文并支持审计。NexusAgent 增加了注册、登录和 Token 会话机制,并把用户、对话和消息写入 PostgreSQL。这样同一个部署实例可以服务多个用户,每个用户只能访问自己的历史对话。
源码摘录:Backend/AuthService.py - 登录凭证与密码校验
def HashPassword(self, Password, Salt): Digest = hashlib.pbkdf2_hmac("sha256", Password.encode("utf-8"), Salt.encode("utf-8"), 120000) return Digest.hex() def BuildAuthResponse(self, User): Token = secrets.token_urlsafe(32) self.Sessions[Token] = { "UserName": User["UserName"], "DisplayName": User["DisplayName"], "CreatedAt": self.GetCurrentTimeStamp(), } return { "Token": Token, "UserName": User["UserName"], "DisplayName": User["DisplayName"], "Status": "Authenticated", } def Login(self, Request): User = self.FindUser(Request.UserName) if not User: raise HTTPException(status_code=401, detail="用户名或密码错误。") ExpectedHash = User.get("PasswordHash", "") ActualHash = self.HashPassword(Request.Password, User.get("PasswordSalt", "")) if not hmac.compare_digest(ExpectedHash, ActualHash): raise HTTPException(status_code=401, detail="用户名或密码错误。") return self.BuildAuthResponse(User)每次问答还会生成 RunId,记录用户问题、答案预览、来源数量、状态、耗时、Sources、ToolCalls、WorkflowSteps 和错误信息。这个设计让“模型为什么这样回答”不再只能靠猜,而是可以从一次运行记录回溯出来。
源码摘录:Backend/RuntimeStore.py - 对话消息与运行日志
def AppendConversationMessage(self, ConversationId, UserName, Role, Content): Conversation = self.EnsureConversation(ConversationId, UserName) Timestamp = self.GetCurrentTimeStamp() with self.GetConnection() as Connection: SequenceIndex = Connection.execute( """ SELECT COALESCE(MAX("SequenceIndex"), 0) + 1 AS "NextSequenceIndex" FROM "ConversationMessages" WHERE "ConversationId" = %s AND "UserName" = %s """, (ConversationId, UserName), ).fetchone()["NextSequenceIndex"] Connection.execute( """ INSERT INTO "ConversationMessages" ("ConversationId", "UserName", "Role", "Content", "SequenceIndex", "CreatedAt") VALUES (%s, %s, %s, %s, %s, %s) """, (ConversationId, UserName, Role, Content, SequenceIndex, Timestamp), ) Connection.execute( """ UPDATE "Conversations" SET "UpdatedAt" = %s WHERE "ConversationId" = %s AND "UserName" = %s """, (Timestamp, ConversationId, UserName), # ... 省略非关键实现 ... WHERE "ConversationId" = %s AND "UserName" = %s """, (Title, Timestamp, ConversationId, UserName), ) def AddRunLog( self, RunId, ConversationId, UserMessage, Answer, SourceCount, Status, LatencyMs, Sources=None, ToolCalls=None, WorkflowSteps=None, ErrorMessage=None, ): RunLog = RunLogItem( RunId=RunId, ConversationId=ConversationId, UserMessage=UserMessage, AnswerPreview=Answer[:160] + ("..." if len(Answer) > 160 else ""), SourceCount=SourceCount, Status=Status, CreatedAt=self.GetCurrentTimeStamp(), LatencyMs=LatencyMs, Sources=Sources or [], # ... 省略非关键实现 ... self.RunLogs.insert(0, RunLog) self.RunLogs = self.RunLogs[:50] return RunLog后续优化方向
- 代码由codex生成,结构上还未优化。
- 目前工具数量较少,离真正应用还差得远,只是一次技术学习与测试。
- 目前工具的调用依赖关键词,后续最好是能够实现根据需求动态生成代码并执行。