尧图网站建设 尧图网络
  • 首页
  • 关于我们
  • 服务项目
  • 案例展示
  • 建站流程
  • 资讯中心
  • 联系我们
首页/资讯中心/详情

LangGraph深度解析:从图基础到人机交互的AI工作流框架实践

LangGraph深度解析:从图基础到人机交互的AI工作流框架实践
📅 发布时间:2026/6/18 17:23:56

摘要:LangGraph作为新兴的AI工作流编排框架,通过图结构为Agent开发提供了全新的编程范式。本文从基础概念入手,深度剖析LangGraph的核心三要素(节点、边、状态),详解并行处理、记忆管理、工具集成等高级特性,并结合完整代码示例展示如何构建包含人机交互的智能工作流。无论您是AI开发者还是系统架构师,都能从本文获得实用的LangGraph工程化实践指南。

1. LangGraph核心概念:图编程范式

1.1 图结构的三要素解析

LangGraph将AI工作流抽象为图结构,这种设计理念带来了声明式编程的便利性。我们可以将其类比为水利系统:

  • 节点(Node):管道焊接处,代表数据处理单元

  • 边(Edge):管道本身,定义数据流向

  • 状态(State):水流状态,记录当前处理结果

这种设计模式的核心优势在于:

  • 可视化工作流:天然支持流程图展示

  • 模块化开发:节点间解耦,易于维护

  • 状态可追踪:完整记录执行轨迹

1.2 基础示例:状态传递机制

代码演示:

from typing import TypedDict, Annotated from langgraph.graph import StateGraph, START import operator # 定义状态结构 class State(TypedDict): aggregate: Annotated[list, operator.add] # 关键:使用归约器而非赋值 def node_a(state: State): return {'aggregate': ['A']} # 构建图结构 builder = StateGraph(State) builder.add_node('A', node_a) builder.add_edge(START, 'A') graph = builder.compile() # 执行工作流 result = graph.invoke({'aggregate': ['B']}) print(result) # 输出: {'aggregate': ['B', 'A']}

代码解析:

  • Annotated[list, operator.add]实现了累积式状态更新

  • 传统赋值操作会被覆盖,而operator.add实现了列表合并

  • 这种设计符合AI工作流中信息逐渐丰富的特点

2. 高级特性:并行处理与分支控制

2.1 基础并行结构

class ParallelState(TypedDict): aggregate: Annotated[list, operator.add] def process_a(state: ParallelState): return {'aggregate': ['A']} def process_b(state: ParallelState): return {'aggregate': ['B']} # 构建并行图 builder = StateGraph(ParallelState) builder.add_node('A', process_a) builder.add_node('B', process_b) builder.add_edge(START, 'A') builder.add_edge(START, 'B') # 关键:从起点到多个节点 graph = builder.compile() result = graph.invoke({'aggregate': ['START']}) print(result) # 输出顺序取决于执行顺序

这里的实现很简单,我们增加的只有一个节点‘B’,即增加了一个函数,这里的并行实现很简单,即通过增加边,让START不仅与A相连也与B相连,实现的'并行'。我们 还可以在并行中间再设置一条边即b -> b2 -> d,关键就是通过设定边来实现。

2.2 Map-Reduce模式实战

Map-Reduce是分布式计算的经典模式,在LangGraph中天然支持

这里其实不用想的那么复杂,可以理解成把任务拆成小块处理即Map过程再汇总即reduce过程,就是分布执行任务再把结果汇总得出结论,比如我们要基于一个主题去生成笑话,现根据主题生成内容,基于生成的内容分别生成笑话,这里比如是animals生成可能有monkey,tiger,elephant,我们分别对这三个内容进行生成笑话,这就可以看作Map过程。对分布任务的结果汇总:接下来对每个笑话进行一个汇总,借助大模型选出最好笑的一个笑话。

代码如下:

from pydantic import BaseModel, Field from langchain_openai import ChatOpenAI from langgraph.graph import Send # 定义数据结构 class Subjects(BaseModel): subjects: list[str] class Joke(BaseModel): joke: str class BestJoke(BaseModel): id: int = Field(description="Index of the best joke, starting with 0") # 定义完整工作流状态 class OverallState(TypedDict): topic: str subjects: list jokes: Annotated[list, operator.add] best_selected_joke: str def generate_topics(state: OverallState): """Map阶段:生成子主题""" prompt = f"Generate 3 sub-topics about {state['topic']}" response = model.with_structured_output(Subjects).invoke(prompt) return {"subjects": response.subjects} def generate_jokes(state: JokeState): """Map阶段:为每个子主题生成笑话""" prompt = f"Create a funny joke about {state['subject']}" response = model.with_structured_output(Joke).invoke(prompt) return {"jokes": [response.joke]} def continue_jokes(state: OverallState): """分发任务到并行节点""" return [Send("generate_jokes", {"subject": s}) for s in state["subjects"]] def best_joke(state: OverallState): """Reduce阶段:选择最佳笑话""" jokes = "\n\n".join(state["jokes"]) prompt = f"Select the best joke from:\n{jokes}" response = model.with_structured_output(BestJoke).invoke(prompt) return {"best_selected_joke": state["jokes"][response.id]} # 构建完整工作流 graph = StateGraph(OverallState) graph.add_node("generate_topics", generate_topics) graph.add_node("generate_jokes", generate_jokes) graph.add_node("best_joke", best_joke) # 设置工作流路径 graph.add_edge(START, "generate_topics") graph.add_conditional_edges("generate_topics", continue_jokes, ["generate_jokes"]) graph.add_edge("generate_jokes", "best_joke") graph.add_edge("best_joke", END) app = graph.compile()

本质上就是一个并行结构,他是在原来的基础上通过Send方式将生成词一个一个地发送给笑话生成节点生成笑话,然后再通过一个节点汇总结果选出最好笑的。

架构优势:

  • 任务并行化:同时处理多个子任务

  • 结果聚合:统一处理分散结果

  • 弹性扩展:轻松增加处理节点

3. 记忆管理:持久化状态追踪

3.1 检查点机制

讲解完了核心组成和并行结构,接下来讲解记忆-持久化管理:

设置记忆检查点与图形状态进行交互管理,用户的交互信息都可以发送到该检查点,检查点会保留其以前消息的记忆。

流程:

按照上文所讲内容构建图,另外加入两步:

1.设置检查点:memory = MemorySaver()

2.在编译图中加入:graph = builder.compile(checkpoint = memory)

其实操作很简单,但是我们下面举了一个例子是父子图,并在父图的编译器中放入记忆检查点

from langgraph.checkpoint import MemorySaver class SubState(TypedDict): foo: str bar: str # 构建子图 sub_builder = StateGraph(SubState) sub_builder.add_node("node1", lambda state: {"bar": "bar1"}) sub_builder.add_node("node2", lambda state: {"foo": state["foo"] + "tea"}) subgraph = sub_builder.compile() # 构建父图(包含记忆检查点) class ParentState(TypedDict): foo: str builder = StateGraph(ParentState) builder.add_node("node_1", lambda state: {"foo": "hi" + state["foo"]}) builder.add_node("node_2", subgraph) builder.add_edge(START, "node_1") builder.add_edge("node_1", "node_2") # 关键:添加记忆检查点 memory = MemorySaver() graph = builder.compile(memory) # 使用线程ID跟踪不同会话 config = {"configurable": {"thread_id": "session_1"}} for _, chunk in graph.stream({"foo": "milk"}, config, subgraphs=True): print(f"状态更新: {chunk}")

从图中我们可以看出,对于这种模式,父图和子图是可以共享记忆检查点的。这种模式的应用场景就可用于多步骤的推理和决策和一些复杂的工作流,比如客户支持和产品开发周期。这种模式能很好地让工作流中各部分保持一致性和共享上下文信息。

4. 工具集成:扩展AI能力边界

4.1 工具绑定机制

工具是Agent的重要组件,为什么要使用工具?我们的基座大模型能力有限,比如具有时限性,比如问今天NBA的一场球赛结果那么可能基座大模型无法回答,因为他的最后一次训练日期可能在今天之前,这里我们可以为大模型加上搜索工具让他具有上网查询的能力。所以工具对打模型而言至关重要。

在Langgraph中设置方式:

1.定义一个工具:@tool def....

2..bind_tools 为基座大模型加上工具

3.设置工作节点 .add_node("action",tool_node)

4.设置边后就能在工作流中利用工具了

from langchain.tools import tool
from langgraph.prebuilt import ToolNode
from langchain_core.messages import HumanMessage
from typing import Literal

# 定义自定义工具
@tool
def search_web(query: str) -> str:
"""实时搜索网络信息"""
return f"Search result for '{query}': Latest information..."

@tool
def calculate(expression: str) -> str:
"""执行数学计算"""
try:
result = eval(expression)
return f"Result: {result}"
except:
return "Invalid expression"

# 定义人机交互工具
class AskHuman(BaseModel):
"""向用户提问获取输入"""
question: str

# 绑定工具到大模型
tools = [search_web, calculate]
model = ChatOpenAI(model="gpt-4o-mini", temperature=0)
model_with_tools = model.bind_tools(tools + [AskHuman])

# 决策函数:判断下一步动作
def should_continue(state) -> Literal["action", "ask_human", END]:
messages = state["messages"]
last_message = messages[-1]

if isinstance(last_message, HumanMessage):
return END
if not hasattr(last_message, 'tool_calls') or not last_message.tool_calls:
return END
elif last_message.tool_calls[0]["name"] == "AskHuman":
return "ask_human"
else:
return "action"

# 定义消息状态
class MessagesState(TypedDict):
messages: Annotated[list, add_messages]

# 创建各功能节点
def call_model(state: MessagesState):
"""AI推理节点"""
messages = state["messages"]
chain = model_with_tools | StrOutputParser()
return {"messages": [chain.invoke(messages)]}

def ask_human(state: MessagesState):
"""人机交互节点(带中断)"""
interrupt({"human_question": "How are you today?"})
return {"messages": []} # 注意:中断后此返回值会被覆盖

# 构建工作流
workflow = StateGraph(MessagesState)
workflow.add_node("agent", call_model)
workflow.add_node("action", ToolNode(tools))
workflow.add_node("ask_human", ask_human)

# 设置工作流路径
workflow.add_edge(START, "agent")
workflow.add_conditional_edges(
"agent",
should_continue,
["action", "ask_human", END]
)
workflow.add_edge("action", "agent")
workflow.add_edge("ask_human", "agent")

# 添加记忆管理
memory = MemorySaver()
graph = workflow.compile(memory)

5. 人机交互:中断与恢复机制

5.1 中断机制深度解析

# 第一次执行:触发中断
config = {"configurable": {"thread_id": "thread_1"}}

print("=== 第一次执行(触发中断)===")
for state in graph.stream(
{"messages": [{"role": "user", "content": "Hi, I need help"}]},
config,
stream_mode="values"
):
if state["messages"]:
state["messages"][-1].pretty_print()

print("\n=== 中断等待用户输入 ===")

# 模拟用户输入
user_input = input("Please answer the question: ")

# 第二次执行:恢复工作流
print("\n=== 恢复执行 ===")
for event in graph.stream(
Command(resume=user_input), # 关键:Command恢复
config,
stream_mode="values"
):
if event.get("messages"):
event["messages"][-1].pretty_print()

关键机制:

  1. 状态快照:中断时自动保存完整执行上下文

  2. 非阻塞等待:工作流暂停,不占用计算资源

  3. 无缝恢复:用户输入作为参数传递到中断点

  4. 上下文保持:恢复后保持所有变量和状态

相关新闻

  • Java 开发最容易犯的 10 个错误
  • 意图识别深度原理解析:从向量空间到语义流形
  • MySQL 知识点复习- 6. ORDER BY, GROUP BY

最新新闻

  • AD pcb设计规则设置和DRC检查
  • 浙江闸阀厂家实力排行:基于工况适配性的客观盘点 - 起跑123
  • 2026无锡网站建设哪家口碑好:实测筛选3家本土靠谱建站服务商,避坑不踩雷 - wxxwlm
  • 2026年五大SEO优化公司推荐:从传统搜索到生成式引擎,五家值得关注的服务商深度选型评测 - 资讯纵览
  • 微交互设计:从状态反馈到情感化动效的工程化实现
  • 【毕业设计】基于 Python+Vue 的习题自测型自主学习系统的设计与实现 基于 Python+Vue 的轻量化线上自主学习服务系统(源码+文档+远程调试,全bao定制等)

日新闻

  • 2026年不锈钢卷板厂家推荐排行榜:冷轧热轧/304/201不锈钢卷板,高颜值耐腐蚀源头厂家实力精选 - 企业推荐官【官方】
  • FLUX.1-dev FP8模型实战指南:24GB以下显卡高效部署方案
  • 2026佛山长途搬家价目表:跨省跨市搬家费用完整计算指南 - 从来都是英雄出少年

周新闻

  • 3步解锁iOS设备:applera1n激活锁绕过完全指南
  • 39 2026 人工智能证书终极盘点,普通人选 AI 证书可以从这些方向入手
  • Redis 暴露公网有多危险?从端口检查到补救步骤

月新闻

  • 【总结】入门篇:50句话让你记住架构核心概念
  • WeChatMsg技术方案解析:实现Mac微信数据自主管理的完整解决方案
  • WeChatMsg:革新性微信数据备份方案,打造你的专属数字记忆库

关于尧图

  • 公司简介
  • 团队介绍
  • 企业文化
  • 荣誉资质

服务项目

  • 定制开发
  • 电商建站
  • UI 设计
  • 运维服务

快速链接

  • 案例展示
  • 建站流程
  • 常见问题
  • 资讯中心

联系方式

  • 📍北京市朝阳区互联网产业园 A 座 10 层
  • 📞400-888-8888
  • ✉️contact@rkmt.cn
  • 🕐周一至周日 9:00-21:00

© 2024 北京尧图网络科技有限公司 版权所有 | 京 ICP 备 XXXXXXXX 号