LangGraph create_agent 之 middleware 参数完全指南
一、让 Agent 真正为你所用
在 LangChain 1.0 时代的智能体开发中,create_agent 取代了旧版 LangGraph 中的 create_react_agent,成为构建生产级智能体的核心入口。但很多人只用了它的基础功能——传一个大模型和几个工具,然后就开始和 Agent 对话。然而,真正赋能 Agent、让它能够适配复杂业务场景的关键,是 middleware 参数。
Middleware(中间件)允许你在 Agent 执行的关键节点插入自定义逻辑,而无需修改 Agent 本身的源码。无论是给 Agent 添加日志监控、实现动态模型切换、强制返回结构化输出,还是建立人工审批流程,中间件都能以一种优雅、可复用的方式帮你完成。本文将深入剖析 create_agent 中 middleware 参数的核心用法,通过实用例程带你上手,并提供常见问题的实用解答。
二、核心概念与架构
2.1 什么是 Agent Middleware
Agent Middleware 是 LangChain 1.0 中引入的一套拦截机制,用于在智能体执行的各个关键时刻增强或修改 Agent 的行为。create_agent 本质上是在 LangGraph 之上构建了一个标准化的执行图,包含模型调用节点(Model Node)、工具执行节点(Tools Node)以及条件路由边。中间件挂载在这个图的多个钩子点(Interception Points)上,形成了一条有序的处理链。
2.2 中间件执行的核心钩子
LangChain 中间件系统提供了以下拦截点位:
| 钩子函数 / 装饰器 | 触发时机 | 适用场景 |
|---|---|---|
@wrap_model_call | 每次调用 LLM 之前 | 动态切换模型、提示词注入、请求日志 |
@wrap_tool_call | 每次调用工具之前 | 工具调用审计、参数改写、结果缓存 |
before_agent | Agent 开始执行前(仅一次) | 注入全局上下文、初始化会话 |
after_agent | Agent 执行完毕后(仅一次) | 最终结果后处理、清理资源 |
before_model | 每次调用 LLM 之前 | 与 wrap_model_call 类似,但签名不同 |
after_model | 每次调用 LLM 之后 | 模型输出后处理、降级逻辑 |
wrap_model_call | 包裹整个模型调用 | 更细粒度的控制(推荐使用) |
2.3 AgentMiddleware 基类与装饰器写法
LangChain 提供了两种编写中间件的方式。第一种是继承 AgentMiddleware 基类并覆盖相应的方法;第二种是使用 @wrap_model_call 和 @wrap_tool_call 装饰器来定义轻量级中间件功能。本文主要采用第二种装饰器写法,因为它更直观且适合大部分场景。
三、实用例程
3.1 准备工作
以下示例默认已完成依赖安装:
pip install langchain langchain-openai langgraph
3.2 例程一:日志与执行追踪中间件
这是最简单也最实用的中间件之一,用于打印 Agent 每次模型调用和工具调用的详细信息,便于调试和生产监控。
from langchain.agents import create_agent
from langchain.agents.middleware import wrap_model_call, wrap_tool_call
from langchain_openai import ChatOpenAI
from langchain.tools import tool
from langchain.tools.tool_node import ToolCallRequest
from typing import Callable
@wrap_model_call
def logging_model_middleware(request, handler):
"""记录模型调用信息"""
print(f"[模型调用] 模型: {request.model}")
print(f"[模型调用] 消息数量: {len(request.state.get('messages', []))}")
# 继续执行原有的模型调用
response = handler(request)
print(f"[模型调用] 响应 Token 估算: {len(str(response.content)) // 4}")
return response
@wrap_tool_call
def logging_tool_middleware(request: ToolCallRequest, handler: Callable):
"""记录工具调用信息"""
tool_name = request.tool_call.get("name", "unknown")
print(f"[工具调用] 开始执行: {tool_name}")
print(f"[工具调用] 参数: {request.tool_call.get('args', {})}")
result = handler(request)
print(f"[工具调用] 完成: {tool_name}")
return result
# 定义一个简单的测试工具
@tool
def get_weather(city: str) -> str:
"""获取指定城市的天气"""
return f"{city} 的天气是晴天,25°C"
model = ChatOpenAI(model="gpt-4o-mini")
agent = create_agent(
model=model,
tools=[get_weather],
middleware=[logging_model_middleware, logging_tool_middleware]
)
response = agent.invoke({"messages": [{"role": "user", "content": "北京天气怎么样?"}]})
运行上述代码后,你将看到模型调用信息和工具调用信息依次输出,形成完整的执行追踪日志。
3.3 例程二:动态模型切换中间件
在实际生产环境中,简单问题用轻量模型(成本低、响应快),复杂问题用高性能模型(推理能力强),可以大幅降低成本。下面是一个根据输入消息长度自动切换模型实例的中间件。
from langchain.agents import create_agent
from langchain.agents.middleware import wrap_model_call
from langchain_openai import ChatOpenAI
# 两个不同的模型实例
light_model = ChatOpenAI(model="gpt-4o-mini", temperature=0.5) # 轻量模型
pro_model = ChatOpenAI(model="gpt-4o", temperature=0.5) # 高性能模型
@wrap_model_call
def dynamic_model_middleware(request, handler):
"""根据输入消息的长度动态切换模型"""
messages = request.state.get("messages", [])
# 计算所有消息的字符总数
total_chars = sum(len(msg.get("content", "")) for msg in messages if isinstance(msg, dict))
if total_chars > 1000 or "复杂" in str(messages[-1]):
# 复杂场景:使用高性能模型
new_request = request.override(model=pro_model)
print("[动态模型] 切换至高性能模型 gpt-4o")
else:
# 简单场景:使用轻量模型
new_request = request.override(model=light_model)
print("[动态模型] 使用轻量模型 gpt-4o-mini")
return handler(new_request)
agent = create_agent(
model=light_model, # 默认模型,中间件会进行覆盖
middleware=[dynamic_model_middleware]
)
3.4 例程三:工具调用状态更新中间件
在工具执行后需要更新 Agent 的全局状态时,不能直接 mutate request.state,而需要使用 Command(update=...) 来通知 LangGraph 真正的状态变更。
from langchain.agents import create_agent
from langchain.agents.middleware import wrap_tool_call, AgentState
from langchain.tools.tool_node import ToolCallRequest
from langgraph.types import Command
from langchain.tools import tool
from typing import Callable, TypedDict
class CustomState(AgentState):
"""扩展 Agent 状态,添加用户偏好字段"""
user_preferences: dict
@wrap_tool_call
def tool_state_middleware(request: ToolCallRequest, handler: Callable):
"""工具执行后更新 Agent 的全局状态"""
# 执行工具
result = handler(request)
# 计算需要更新的用户偏好
tool_name = request.tool_call.get("name", "unknown")
prefs = dict(request.state.get("user_preferences") or {})
prefs["last_tool"] = tool_name
prefs["tool_count"] = prefs.get("tool_count", 0) + 1
# 使用 Command 返回状态更新
if isinstance(result, Command):
# 如果已有 Command,合并更新
update = result.update or {}
if isinstance(update, dict):
merged = {**update, "user_preferences": prefs}
return Command(graph=result.graph, update=merged, resume=result.resume, goto=result.goto)
return result
# 如果是普通 ToolMessage,包装成 Command
from langchain.messages import ToolMessage
assert isinstance(result, ToolMessage)
return Command(update={
"messages": [result],
"user_preferences": prefs,
})
@tool
def search_web(query: str) -> str:
"""模拟网络搜索"""
return f"'{query}' 的搜索结果:找到了 10 条相关内容"
model = ChatOpenAI(model="gpt-4o-mini")
agent = create_agent(
model=model,
tools=[search_web],
middleware=[tool_state_middleware],
state_schema=CustomState, # 必须传入自定义状态 Schema
)
response = agent.invoke({
"messages": [{"role": "user", "content": "搜索 LangGraph 的最新信息"}],
"user_preferences": {} # 初始状态
})
# 执行后,user_preferences 已被更新
print(response.get("user_preferences")) # 输出: {'last_tool': 'search_web', 'tool_count': 1}
3.5 例程四:Human-in-the-Loop(人工审批)中间件
在处理敏感操作(如 SQL 执行、文件删除、发送邮件)时,引入人工确认是一个重要的安全机制。HumanInTheLoopMiddleware 让这一需求开箱即用。
from langchain.agents import create_agent
from langchain.agents.middleware import HumanInTheLoopMiddleware
from langgraph.checkpoint.memory import InMemorySaver
from langchain.tools import tool
from langgraph.types import Command
@tool
def delete_sensitive_data(table_name: str) -> str:
"""删除数据库中的敏感数据(需要审批)"""
return f"已删除表 {table_name} 中的数据"
@tool
def get_user_info(user_id: str) -> str:
"""获取用户信息(不敏感,无须审批)"""
return f"用户 {user_id} 的信息:用户名张三"
# 配置人工审批中间件——只拦截 delete_sensitive_data 这个工具
middleware = [
HumanInTheLoopMiddleware(
interrupt_on={"delete_sensitive_data": True},
description_prefix="[安全审计] 高危操作,请审批以下请求"
)
]
agent = create_agent(
model=ChatOpenAI(model="gpt-4o-mini"),
tools=[delete_sensitive_data, get_user_info],
middleware=middleware,
checkpointer=InMemorySaver(), # 必须提供存档器以支持中断恢复
)
config = {"configurable": {"thread_id": "session-001"}}
# 第一次调用,Agent 在执行受保护的工具时会被中断
for step in agent.stream(
{"messages": [{"role": "user", "content": "删除 users 表中的数据"}]},
config,
stream_mode="values"
):
if "__interrupt__" in step:
print("遇到中断,等待人工审批...")
# 展示审批请求
interrupt_data = step["__interrupt__"][0]
print(f"审批信息: {interrupt_data.value}")
# 人工审批通过,恢复执行
for step in agent.stream(
Command(resume={"decisions": [{"type": "approve"}]}),
config,
stream_mode="values"
):
if "messages" in step:
step["messages"][-1].pretty_print()
3.6 例程五:摘要压缩中间件
当对话轮次过多、消息总 Token 逼近模型上下文上限时,我们可以利用 SummarizationMiddleware 自动将旧对话压缩为摘要。
from langchain.agents import create_agent
from langchain.agents.middleware import SummarizationMiddleware
from langgraph.checkpoint.memory import InMemorySaver
agent = create_agent(
model="openai:gpt-4o-mini",
middleware=[
SummarizationMiddleware(
model="openai:gpt-4o-mini", # 用于生成摘要的模型
trigger=("tokens", ">=", 3000), # 消息 token 超过 3000 时触发压缩
keep=("messages", "=", 5), # 压缩后保留最近 5 条消息
summary_prompt=(
"请将以下对话内容压缩为一段简洁的摘要,重点保留:"
"用户身份、核心诉求、已解决的问题、未完成的任务。\n\n{messages}"
)
)
],
checkpointer=InMemorySaver()
)
四、FAQ
Q1: 中间件和直接在 Agent 中修改状态有什么区别?
直接修改 request.state 不会被 LangGraph 持久化,因为 Graph 只认通过节点返回的状态变更。中间件中正确的状态更新需要通过 Command(update=...) 来返回,这与普通节点更新状态的方式是一致的。
Q2: 多个中间件之间的执行顺序是怎样的?
中间件按照传入 middleware 列表的顺序依次执行,遵循“洋葱模型”。对于 wrap_model_call:第一个中间件在最外层(最先拦截请求,最后收到响应),最后一个中间件在最内层(最接近实际的模型调用)。将日志类中间件放在最外层可以捕获最完整的信息。
Q3: 中间件中可以添加异步支持吗?
完全可以。使用 async def 定义中间件函数即可,LangChain 中间件体系原生支持 Python 异步。例如:
@wrap_model_call
async def async_logging_middleware(request, handler):
print("Async: before model call")
response = await handler(request)
print("Async: after model call")
return response
Q4: 如何调试中间件问题?
LangGraph Platform 提供了官方调试方案:设置环境变量 LOG_LEVEL=json 可输出结构化 JSON 日志,便于在日志系统中检索。此外,在生产部署中加入结构化日志中间件(如结合 structlog)可以显著提升可观测性。
Q5: 中间件可以访问用户身份(如 user_id)吗?
可以。通过 Runtime 类(LangGraph v0.6.0+ 引入)可以在中间件中访问用户元数据。例如在 before_agent 或 wrap_model_call 中可以通过 request.runtime.context 获取认证信息。
Q6: 中间件失败会影响整个 Agent 执行吗?
默认情况下,中间件中抛出的异常会中断整个 Agent 执行。你需要在中间件内部进行 try/except 捕获并进行容错处理(如降级、记录错误后继续)。推荐的生产实践是在外层中间件统一添加异常捕获和日志记录。
Q7: 内置中间件有哪些?
LangChain 官方提供了多个内置中间件,按使用频率排序包括 HumanInTheLoopMiddleware(人工审批)、SummarizationMiddleware(摘要压缩)、TodolistMiddleware(任务清单)、ToolRetryMiddleware(工具调用自动重试)、ToolEmojiMiddleware(工具调用可视化)等。更多内置中间件请参考 LangChain 官方文档。
Q8: 中间件和 StateGraph 自定义节点之间应该如何选择?
如果你的逻辑只需在单个模型调用或工具调用时触发,中间件是更简洁的选择。如果涉及跨多个节点、需要复杂的状态流转逻辑,使用 LangGraph 的 StateGraph 手动构建图更合适。通常的选型策略是:优先尝试中间件方案,当中间件不足以表达业务流程时,再降级到完整的 StateGraph 构筑。
Q9: 如何在中间件中实现模型调用的重试逻辑?
你可以用 @wrap_model_call 装饰器实现重试逻辑,核心是在中间件内部使用循环,并用 try/except 捕获模型调用异常。捕获异常后可以基于请求上下文决定是否重试,也可以利用 request.override() 在重试时切换到备用模型,实现降级容错。建议设置最大重试次数(如 3 次),并在连续失败后返回友好的错误信息。
Q10: 中间件能否动态添加或移除?
中间件是在 create_agent 构建 Agent 时指定的,Graph 创建后不可动态改变中间件栈。如果你需要在运行时控制某些功能是否生效,可以在中间件内部通过条件判断开关来实现逻辑的路由选择,例如读取 request.state 中的配置项来决定是否执行特定逻辑。