[智能体-333]:LangGraph代码示例,详细注解:基础线性图、条件分支、循环、人在回路
下面提供4 个递进式完整代码示例,附带逐行详细注解,覆盖:
基础线性图、条件分支、循环、人在回路,全部可直接运行,基于官方标准写法。
先执行安装依赖:
bash
运行
pip install langgraph langchain示例 1:基础线性图(最简入门)
功能:串行执行两个节点,理解State、Node、普通边、编译 & 执行全流程。
python
运行
# 1. 导入依赖 from typing import TypedDict, Annotated import operator # LangGraph 核心类与内置起点/终点 from langgraph.graph import StateGraph, START, END # ===================== 第一步:定义全局状态 State ===================== # 使用 TypedDict 约束状态结构,IDE 代码提示、类型校验 class WorkState(TypedDict): # 普通字段:字符串类型,默认更新规则 = 覆盖 user_query: str # 列表字段:Annotated + operator.add 表示【新内容追加,不覆盖】 chat_history: Annotated[list[str], operator.add] # 结果字段 final_result: str # ===================== 第二步:定义节点 Node(业务执行单元) ===================== # 节点本质:Python 函数,入参=当前状态,出参=待更新的字典 def analyze_query(state: WorkState) -> dict: """节点1:解析用户问题""" # 读取全局状态中的用户输入 query = state["user_query"] print(f"【解析节点】正在分析问题:{query}") # 返回需要更新的字段(只写变化部分,无需返回完整 state) return { "chat_history": [f"已解析问题:{query}"] } def generate_reply(state: WorkState) -> dict: """节点2:生成回复""" query = state["user_query"] print("【回复节点】开始生成答案...") answer = f"针对「{query}」的标准回复内容" return { "final_result": answer, "chat_history": ["已生成最终回复"] } # ===================== 第三步:构建图 Graph ===================== # 1. 实例化状态图,绑定自定义状态类 graph_builder = StateGraph(WorkState) # 2. 注册节点:参数(节点唯一名称, 对应函数) graph_builder.add_node("analyze", analyze_query) graph_builder.add_node("reply", generate_reply) # 3. 配置流向:普通边 add_edge(起点节点名, 终点节点名) graph_builder.add_edge(START, "analyze") # 流程起点 → 解析节点 graph_builder.add_edge("analyze", "reply") # 解析节点 → 回复节点 graph_builder.add_edge("reply", END) # 回复节点 → 流程终点 # ===================== 第四步:编译图(必须编译才能运行) ===================== app = graph_builder.compile() # ===================== 第五步:执行图 ===================== if __name__ == "__main__": # 初始输入:对应 State 定义的字段 initial_input = { "user_query": "LangGraph 是什么?", "chat_history": [], # 对话历史初始为空列表 "final_result": "" } # invoke:同步执行全流程,返回最终完整状态 result_state = app.invoke(initial_input) # 打印最终结果 print("\n===== 执行完毕,最终状态 =====") print("对话记录:", result_state["chat_history"]) print("最终回复:", result_state["final_result"])示例 2:条件分支图(add_conditional_edges)
功能:根据状态字段动态路由,实现二分支选择,模拟「是否需要调用工具」。
python
运行
from typing import TypedDict, Annotated import operator from langgraph.graph import StateGraph, START, END # ===================== 1. 定义状态 ===================== class AgentState(TypedDict): question: str need_tool: bool # 标记:是否需要调用外部工具 tool_output: str # 工具返回结果 chat_msg: Annotated[list[str], operator.add] answer: str # ===================== 2. 定义节点 ===================== def judge_intent(state: AgentState) -> dict: """节点1:意图判断,决定是否要用工具""" print("【意图判断节点】") q = state["question"] # 模拟规则:包含"查询"则判定需要工具 need = "查询" in q return { "need_tool": need, "chat_msg": [f"意图判断完成,是否调用工具:{need}"] } def call_external_tool(state: AgentState) -> dict: """节点2:工具调用节点""" print("【工具调用节点】") return { "tool_output": "从外部接口拿到业务数据", "chat_msg": ["工具调用成功"] } def direct_answer(state: AgentState) -> dict: """节点3:直接回答节点(无需工具)""" print("【直接回答节点】") return { "answer": "基于常识直接给出答案", "chat_msg": ["无需工具,直接作答"] } def combine_answer(state: AgentState) -> dict: """节点4:结合工具结果生成答案""" print("【整合答案节点】") return { "answer": f"结合工具数据:{state['tool_output']} 生成答案", "chat_msg": ["整合工具结果完成作答"] } # ===================== 3. 定义路由函数(分支核心) ===================== def route_tool_or_not(state: AgentState) -> str: """ 路由判断函数: 入参:当前全局状态 返回:目标节点名称字符串(必须和 path_map 对应) """ if state["need_tool"]: return "call_tool" else: return "direct_ans" # ===================== 4. 构建图 + 配置分支 ===================== builder = StateGraph(AgentState) # 注册所有节点 builder.add_node("judge", judge_intent) builder.add_node("call_tool", call_external_tool) builder.add_node("direct_ans", direct_answer) builder.add_node("combine_ans", combine_answer) # 入口流向 builder.add_edge(START, "judge") # 配置条件分支:从source节点开始,到其他节点:path_map,判决的依据:path。 builder.add_conditional_edges( source="judge", # 分支起始节点 path=route_tool_or_not, # 路由判断函数 path_map={ # 映射:函数返回值 → 真实节点名 "call_tool": "call_tool", "direct_ans": "direct_ans" } ) # 配置后续固定流向 builder.add_edge("call_tool", "combine_ans") # 工具执行后 → 整合答案 builder.add_edge("direct_ans", END) # 直接回答 → 结束 builder.add_edge("combine_ans", END) # 整合答案 → 结束 # 编译 & 执行 app = builder.compile() if __name__ == "__main__": # 测试1:需要工具 print("===== 测试1:包含查询,走工具分支 =====") # invoke负责触发执行图中的所有节点,包括分支和循环 res1 = app.invoke({"question": "查询今日订单", "chat_msg": [], "need_tool": False, "tool_output": "", "answer": ""}) print("\n===== 测试2:普通问题,不走工具 =====") # 测试2:不需要工具 res2 = app.invoke({"question": "你好", "chat_msg": [], "need_tool": False, "tool_output": "", "answer": ""})示例 3:循环流程图(有环图,条件边回跳)
功能:模拟「多次检索直到拿到有效数据」,利用条件边回跳上游节点实现循环。
python
运行
from typing import TypedDict, Annotated import operator from langgraph.graph import StateGraph, START, END # ===================== 1. 状态定义 ===================== class LoopState(TypedDict): content: str search_result: str loop_times: int # 循环计数器 log: Annotated[list[str], operator.add] # ===================== 2. 节点定义 ===================== def think_step(state: LoopState) -> dict: """思考节点""" cnt = state["loop_times"] print(f"【第 {cnt} 轮思考】") return {"log": [f"第{cnt}轮思考完成"]} def search_data(state: LoopState) -> dict: """检索节点,模拟多次查询""" new_cnt = state["loop_times"] + 1 print(f"【第 {new_cnt} 轮检索】") # 模拟逻辑:循环 >=2 次才拿到有效数据 if new_cnt >= 2: return { "search_result": "有效业务数据", "loop_times": new_cnt, "log": ["检索到有效数据"] } else: return { "search_result": "", "loop_times": new_cnt, "log": ["数据不足,继续检索"] } # ===================== 3. 循环路由函数 ===================== def check_loop_finish(state: LoopState) -> str: """判断是否继续循环:无结果 → 回到思考节点;有结果 → 结束""" if not state["search_result"]: return "think" # 回跳上游节点 = 开启循环 else: return END # 结束流程 # ===================== 4. 构建有环图 ===================== builder = StateGraph(LoopState) builder.add_node("think", think_step) builder.add_node("search", search_data) # 基础流向 builder.add_edge(START, "think") builder.add_edge("think", "search") # 循环条件边:检索完成后判断是否回跳 builder.add_conditional_edges( source="search", path=check_loop_finish, path_map={ "think": "think", END: END } ) app = builder.compile() # ===================== 5. 执行 ===================== if __name__ == "__main__": init_input = { "content": "查找资料", "search_result": "", "loop_times": 0, "log": [] } final = app.invoke(init_input) print("\n完整日志:", final["log"]) print("最终数据:", final["search_result"])示例 4:人在回路(Human-in-the-Loop)
功能:流程中途暂停,人工修改状态后继续执行,核心使用interrupt_before。
python
运行
from typing import TypedDict, Annotated import operator from langgraph.graph import StateGraph, START, END # ===================== 1. 状态定义 ===================== class ReviewState(TypedDict): raw_text: str # 原始文本 reviewed_text: str # 人工审核后文本 log: Annotated[list[str], operator.add] # ===================== 2. 节点定义 ===================== def pre_handle(state: ReviewState) -> dict: """前置处理节点""" print("【前置处理】完成文本预处理") return {"log": ["预处理完成"]} def final_output(state: ReviewState) -> dict: """最终输出节点(审核后执行)""" print("【最终输出】") return { "log": ["流程全部结束"], "reviewed_text": state["reviewed_text"] } # ===================== 3. 构建图 ===================== builder = StateGraph(ReviewState) builder.add_node("pre_handle", pre_handle) builder.add_node("final_out", final_output) builder.add_edge(START, "pre_handle") builder.add_edge("pre_handle", "final_out") builder.add_edge("final_out", END) # ========== 关键配置:在 final_out 节点执行前暂停 ========== # interrupt_before:指定节点执行“前”中断流程 app = builder.compile(interrupt_before=["final_out"]) # ===================== 4. 带人工介入执行 ===================== if __name__ == "__main__": # thread_id:会话唯一标识,区分不同会话、保存状态 session_config = {"configurable": {"thread_id": "session_001"}} # 初始输入 init_input = { "raw_text": "待审核的原始内容", "reviewed_text": "", "log": [] } # 第一轮执行:运行到暂停点 print("===== 第一轮执行,触发暂停 =====") app.invoke(init_input, config=session_config) # 1. 获取当前状态 current_state = app.get_state(session_config) print("当前待处理文本:", current_state.values["raw_text"]) # 2. 人工修改状态(核心:人工介入) app.update_state( config=session_config, values={"reviewed_text": "人工审核并修正后的正式内容"} ) print("===== 已完成人工修改状态 =====") # 3. 恢复流程继续执行(入参传 None 代表继续沿用原有状态) final_state = app.invoke(None, config=session_config) print("\n最终结果:", final_state["reviewed_text"])通用编写模板(记忆口诀)
- 定义 State:
TypedDict+Annotated+operator.add处理列表追加 - 编写 Node:函数接收 state,返回更新字典
- 创建 Graph:
StateGraph(状态类)→add_node注册节点 - 配置流向
- 线性:
add_edge - 分支 / 循环:
add_conditional_edges+ 路由函数
- 线性:
- 编译:
compile(),人在回路加interrupt_before - 执行:
invoke(初始数据)/ 配合thread_id做会话隔离
关键 API 速查
表格
| 功能 | API |
|---|---|
| 注册节点 | add_node(节点名, 函数) |
| 普通串行边 | add_edge(起点, 终点) |
| 条件分支边 | add_conditional_edges() |
| 编译 + 暂停 | compile(interrupt_before=[节点名]) |
| 获取当前状态 | get_state(config) |
| 修改状态 | update_state(config, values={}) |
| 同步执行 | app.invoke(inputs, config) |
