State 深度解析:Reducer、Schema 与多状态设计
写在前面
第一期我们搭了一个最简单的 StateGraph,见识了三个概念:State、Node、Edge。其中State是最容易被低估的——很多人把它简单理解成"一个存数据的 dict",然后就跳过去了。
但等你真正开始写复杂的 Agent,你会发现程序里 80% 的 bug 都出在 State 上:
“为什么消息被覆盖了?” “为什么 Node 收到了我没有传的参数?” “为什么 invoke 返回的数据比我想象的多?”
这一期,我们彻底搞懂 State。
1. State 的本质:一张"全局黑板"
想象一下:你站在一块大黑板前写解题过程。每写一步,下一个人都在同一块黑板上接着写。你不需要每次把全部解题过程传给他——他只要看黑板上多出来的部分就行。
这就是 State 的工作方式。
- State 是图里所有节点共享的一块数据空间
- 每个 Node 可以读整个 State
- 每个 Node 只返回自己修改的那部分,框架负责合并
# State 的初始值 {"messages": [...], "step": 0, "documents": []} # Node A 只返回它修改的部分 {"step": 1} # 框架自动把 step 合并到 State 中 # Node B 返回另一部分 {"messages": [new_msg]} # 框架追加到 messages 列表中这个"合并"过程,就是Reducer在起作用。
2. Reducer 机制:覆盖 vs 追加
Reducer 是一个函数,它决定:当某个 key 收到新的更新时,怎么和旧值合并。
2.1 默认行为:覆盖
如果不指定 Reducer,默认行为就是覆盖:
from typing import TypedDict from langgraph.graph import StateGraph, START, END class ProfileState(TypedDict): name: str age: int def step1(state: ProfileState) -> ProfileState: return {"name": "张三", "age": 25} # 第一次写入 def step2(state: ProfileState) -> ProfileState: # 只返回 name,age 保持不变(因为没传) return {"name": "李四"} # 构建 builder = StateGraph(ProfileState) builder.add_node("s1", step1) builder.add_node("s2", step2) builder.add_edge(START, "s1") builder.add_edge("s1", "s2") builder.add_edge("s2", END) graph = builder.compile() result = graph.invoke({"name": "", "age": 0}) print(result) # 输出:{'name': '李四', 'age': 25}注意:step2只返回了{"name": "李四"},但结果里age依然是 25。因为覆盖只作用于被更新的 key,没传的 key 原封不动。
2.2 追加行为:operator.add
有些场景下,覆盖就不合适了。比如消息列表——如果 Node 返回了一条新消息,你应该追加到列表末尾,而不是把整条历史消息都删了。
from typing import Annotated from typing_extensions import TypedDict import operator class CartState(TypedDict): items: Annotated[list[str], operator.add] # 追加模式 total: float # 覆盖模式(默认) def add_item(state: CartState) -> CartState: return {"items": ["苹果"]} # 这里只加了一个苹果 def add_another(state: CartState) -> CartState: return {"items": ["香蕉", "橘子"]} # 这里加了两个水果 builder = StateGraph(CartState) builder.add_node("a1", add_item) builder.add_node("a2", add_another) builder.add_edge(START, "a1") builder.add_edge("a1", "a2") builder.add_edge("a2", END) graph = builder.compile() result = graph.invoke({"items": [], "total": 0.0}) print(result) # 输出:{'items': ['苹果', '香蕉', '橘子'], 'total': 0.0}看到区别了吗?items用operator.add,所以两次节点的返回值被合并成了一个列表。如果是默认覆盖模式,结果会变成{'items': ['香蕉', '橘子']}——因为后一次覆盖了前一次。
这就是
MessagesState里messages字段的设计原理——用operator.add把每个 Node 返回的消息追加到一起。
2.3 对比总结
| 更新方式 | 语法 | 行为 | 适用场景 |
|---|---|---|---|
| 覆盖 | 不加 Annotated | 新值替换旧值 | 计数器、状态标志、用户设置 |
| 追加 | Annotated[T, operator.add] | 新值追加到旧值末尾 | 消息列表、日志、收集结果 |
3. MessagesState 的 add_messages 深入分析
其实 LangGraph 的MessagesState里,messages字段用的并不是operator.add,而是一个更智能的 reducer——add_messages:
from langgraph.graph import add_messages class MyState(TypedDict): messages: Annotated[list, add_messages]add_messages比operator.add强在哪?看这张表:
| 场景 | operator.add | add_messages |
|---|---|---|
| 追加新消息 | ✅ | ✅ |
| 按 tool_call_id 更新已有消息 | ❌ | ✅ |
| 删除指定消息 | ❌ | ✅ |
| 消息去重 | ❌ | ✅ |
来看一个演示,展示add_messages的智能更新能力:
from langgraph.graph import add_messages from langchain_core.messages import AIMessage, HumanMessage, ToolMessage class ChatState(TypedDict): messages: Annotated[list, add_messages] def demo_reducer(): state = {"messages": [ HumanMessage(content="1+1等于几?"), AIMessage(content="让我算算", id="ai_1"), ]} # 追加一条新消息 update1 = {"messages": [AIMessage(content="结果是2", id="ai_2")]} merged = add_messages(state["messages"], update1["messages"]) print(f"追加后共 {len(merged)} 条消息") # 追加后共 3 条消息 # 更新已有消息(用相同 id) update2 = {"messages": [AIMessage(content="让我重新思考...", id="ai_1")]} merged2 = add_messages(merged, update2["messages"]) for m in merged2: print(f" [{type(m).__name__}] id={m.id[:8]}... → {m.content}") # 注意:ai_1 的 content 变成了"让我重新思考...",而不是追加一条新消息 demo_reducer()运行结果中,id 为ai_1的那条消息被替换了,而不是新添加一条。这在Human-in-the-loop场景下特别有用——人修改了某条消息,框架能准确地找到原消息并替换它。
4. 多 Schema 设计:Input / Output / Private State
绝大多数教程在这里就停了,但真实项目中,你会发现只用一个 State Schema 是不够的:
- 输入只需要用户消息就行
- 内部处理需要记忆中间变量
- 输出只需要最终的回复
LangGraph 支持三个层级的 Schema 设计:
from typing import TypedDict from langgraph.graph import StateGraph, END, START # 用户输入时只需要这个 class InputState(TypedDict): user_question: str # 对外输出时只暴露这个 class OutputState(TypedDict): final_answer: str # 内部流转用的完整状态 class OverallState(TypedDict): user_question: str search_results: list[str] analysis: str final_answer: str # 某些节点私用的状态(不对外暴露) class PrivateState(TypedDict): raw_html: str token_count: int def search_node(state: InputState) -> OverallState: # 输入:只拿到 user_question # 输出:写入 search_results return {"search_results": [f"关于{state['user_question']}的搜索结果"]} def analyze_node(state: OverallState) -> PrivateState: # 读 OverallState,写入 PrivateState html = state["search_results"][0] return {"raw_html": html, "token_count": len(html)} def answer_node(state: PrivateState) -> OutputState: # 读 PrivateState,输出最终答案 return {"final_answer": f"分析完成,基于{state['token_count']}字的内容"} # 核心:定义图时传入三个 Schema builder = StateGraph( OverallState, # 内部完整状态 input_schema=InputState, # 输入限制 output_schema=OutputState, # 输出限制 ) builder.add_node("search", search_node) builder.add_node("analyze", analyze_node) builder.add_node("answer", answer_node) builder.add_edge(START, "search") builder.add_edge("search", "analyze") builder.add_edge("analyze", "answer") builder.add_edge("answer", END) graph = builder.compile() # 调用时,只传 InputState 需要的字段 result = graph.invoke({"user_question": "什么是LangGraph?"}) print(result) # 输出:{'final_answer': '分析完成,基于20字的内容'} # 注意!result 里没有 search_results / raw_html # 因为 output_schema=OutputState 约束了返回值三个 Schema 各自的角色:
用户输入 用户输出 │ ▲ ▼ │ ┌─────────┐ ┌──────────┐ ┌──────────┐ │ Input │───▶│ Overall │───▶│ Output │ │ State │ │ State │ │ State │ └─────────┘ └────┬─────┘ └──────────┘ │ ▼ ┌──────────┐ │ Private │ ← 某些节点独享 │ State │ └──────────┘为什么要有 PrivateState?
想象你有一个"爬虫节点"。它需要暂时存储raw_html和token_count,但这些数据对其他节点毫无意义,也不应该出现在最终的输出里。把这类数据放进PrivateState,让你的 State 设计更干净——每个节点只关心它需要的字段。
⚠️注意:PrivateState 在用
invoke时不会被返回,但用stream_mode="values"流式输出时仍然会暴露。这算是一个"潜规则",设计时要注意。
5. Pydantic BaseModel 做 State
TypedDict 轻量又好用,但它有一个短板:没有运行时校验。如果你需要严格的类型验证,可以用 Pydantic 的 BaseModel:
from pydantic import BaseModel, Field from langgraph.graph import StateGraph, START, END class PydanticState(BaseModel): name: str = Field(description="用户名") age: int = Field(ge=0, le=150, description="年龄(0-150)") tags: list[str] = Field(default_factory=list) def validate_node(state: PydanticState) -> PydanticState: # 这里 state.age 已经被 Pydantic 校验过 return {"name": state.name.upper()} builder = StateGraph(PydanticState) builder.add_node("v1", validate_node) builder.add_edge(START, "v1") builder.add_edge("v1", END) graph = builder.compile() # 传非法数据会怎样? try: graph.invoke({"name": "张三", "age": 999, "tags": []}) except Exception as e: print(f"被拦截了:{e}") # Pydantic 会在编译时或运行时抛出验证错误TypedDict vs BaseModel:怎么选?
| 维度 | TypedDict | Pydantic BaseModel |
|---|---|---|
| 性能 | ⚡ 更快 | 慢一些(有校验开销) |
| 类型提示 | ✅ 基础 | ✅ 完整 |
| 运行时验证 | ❌ 无 | ✅ 有 |
| 字段默认值 | 需用 dataclass | ✅ 原生支持 |
| 序列化 | 手动 | ✅ 自动 |
| 嵌套验证 | ❌ | ✅ 递归验证 |
我的建议:
- 开发阶段用TypedDict(快、轻量)
- 生产环境涉及用户输入校验时用BaseModel
- 接口对接(API 入参校验)用BaseModel
6. 实战:设计一个多步骤问答 Agent 的 State
把上面学到的串起来,设计一个真实的 State:
from typing import Annotated, TypedDict from typing_extensions import TypedDict from langgraph.graph import add_messages, StateGraph, START, END from langchain_core.messages import AnyMessage # ─── 输入层:用户只传问题 ─── class QAInput(TypedDict): question: str # ─── 内部状态:包含所有中间结果 ─── class QAState(TypedDict): question: str # 原始问题,覆盖模式 messages: Annotated[list, add_messages] # 对话历史,智能追加 context: list[str] # 检索到的上下文,追加模式 search_count: int # 搜索次数计数,覆盖模式 is_answered: bool # 是否已回答,覆盖模式 # ─── 输出层:只返回最终结果 ─── class QAOutput(TypedDict): answer: str sources: list[str] # ─── 节点函数 ─── def retrieve(state: QAInput) -> dict: """检索引擎(模拟)""" return { "messages": [], # 初始化为空消息列表 "context": [f"关于 '{state['question']}' 的参考文档"], "search_count": 1, } def generate(state: QAState) -> dict: """生成回答(模拟)""" if state.search_count > 3: return {"is_answered": True} # 模拟 LLM 调用 return { "messages": [("ai", f"回答:基于 {len(state.context)} 篇文档生成")], "is_answered": True, } # ─── 构建 ─── builder = StateGraph( QAState, input_schema=QAInput, output_schema=QAOutput, ) builder.add_node("retrieve", retrieve) builder.add_node("generate", generate) builder.add_edge(START, "retrieve") builder.add_edge("retrieve", "generate") builder.add_edge("generate", END) graph = builder.compile() # ─── 调用 ─── result = graph.invoke({"question": "什么是Reducer?"}) print(result) # 输出: # { # 'answer': "回答:基于 1 篇文档生成", # 'sources': ['关于 \'什么是Reducer?\' 的参考文档'] # }这个例子里:
question是输入(从 QAInput 来)context和search_count在内部流转,对外不可见messages用add_messages保留对话历史answer和sources是最终输出(通过 QAOutput 暴露)
7. 本期核心总结
| 知识点 | 一句话记住 |
|---|---|
| State | 图里节点共享的一块"黑板" |
| 覆盖模式 | 不加 Annotated,新值覆盖旧值 |
| 追加模式 | Annotated[T, operator.add]追加到末尾 |
| add_messages | 比 operator.add 更聪明,支持更新和删除 |
| 多 Schema | InputState 管输入、OutputState 管输出、PrivateState 管内部 |
| Pydantic | 需要校验时上场,但性能不如 TypedDict |
下期预告
第 3 期:Node 与 Edge——构建可分支、可循环的 Agent 流程
从本期开始,我们终于要让 Agent 真正"思考"了。你会学到:
- 条件边:让 Agent 根据状态自动决策
- 循环结构:经典的"LLM → Tool → LLM"循环
- 图可视化:用 Mermaid 展示完整流程
参考资料
- LangGraph State API:StateGraph | langgraph | LangChain Reference
- LangGraph Reducer 指南:Graph API overview - Docs by LangChain
- Pydantic 文档:https://docs.pydantic.dev/