环企首页
LangGraph 中的人工干预与动态断点
LangGraph 提供了强大的人工干预机制,允许在 Agent 执行过程中暂停、审查、修改状态,然后继续执行。这对于需要人工审核、纠错或提供额外信息的场景非常有用。
1. 核心概念
断点(Breakpoints)
在图执行的关键节点(节点之前或之后)设置暂停点,让开发者有机会介入。
人工干预(Human-in-the-loop)
在图暂停时,人工可以:
- 查看当前状态
- 修改状态中的内容(如修正错误、补充信息)
- 批准或拒绝某个操作
- 跳过某些步骤
2. 设置静态断点
在编译图时指定断点
from langgraph.graph import StateGraph, MessagesState
from langgraph.checkpoint import MemorySaver
# 创建图
builder = StateGraph(MessagesState)
# 添加节点
builder.add_node("assistant", assistant)
builder.add_node("tools", tool_node)
# 添加边
builder.add_edge("assistant", "tools")
builder.add_edge("tools", "assistant")
builder.set_entry_point("assistant")
# 设置检查点(必须)
checkpointer = MemorySaver()
# 编译时指定断点
graph = builder.compile(
checkpointer=checkpointer,
interrupt_before=["tools"], # 在执行 tools 节点前暂停
interrupt_after=["assistant"] # 在执行 assistant 节点后暂停
)
参数说明:
interrupt_before: 在进入指定节点前暂停interrupt_after: 在执行完指定节点后暂停
3. 动态断点
动态断点允许在运行时根据状态决定是否暂停,而不是预先固定。
方式一:在节点内使用 Command 暂停
from langgraph.types import Command, Interrupt
def assistant(state: MessagesState):
# 获取最后一条用户消息
last_message = state["messages"][-1].content
# 检测关键词,决定是否触发中断
if "删除" in last_message or "审核" in last_message:
# 暂停并等待人工输入
response = interrupt({
"type": "approval_required",
"message": f"用户请求: {last_message}",
"needs_approval": True
})
if response.get("approved") is False:
return {"messages": [AIMessage(content="操作已取消")]}
# 正常执行
return {"messages": [AIMessage(content=f"处理: {last_message}")]}
方式二:使用 Command(resume=...) 恢复执行
from langgraph.types import Command
# 第一次调用(会暂停)
result = graph.invoke(
{"messages": [HumanMessage(content="请删除我的账户")]},
config={"configurable": {"thread_id": "user-123"}}
)
# 检查是否有中断
if result.get("__interrupt__"):
print("等待人工审批:", result["__interrupt__"])
# 人工决策后继续执行
result = graph.invoke(
Command(resume={"approved": True}),
config={"configurable": {"thread_id": "user-123"}}
)
4. 完整的实例:敏感操作审批流程
from typing import Annotated, TypedDict
from langgraph.graph import StateGraph, add_messages
from langgraph.checkpoint import MemorySaver
from langgraph.types import Command, Interrupt
from langchain_core.messages import HumanMessage, AIMessage
# 定义状态
class WorkflowState(TypedDict):
messages: Annotated[list, add_messages]
pending_approval: dict
approved: bool
# 定义节点
def analyzer(state: WorkflowState):
"""分析请求是否需要审批"""
last_msg = state["messages"][-1].content
# 检测敏感操作
sensitive_keywords = ["删除", "转账", "修改密码", "封禁"]
for keyword in sensitive_keywords:
if keyword in last_msg:
# 触发中断,等待审批
approval_result = Interrupt({
"type": "human_approval",
"operation": keyword,
"request": last_msg,
"requires": "admin_approval"
})
# 根据审批结果决定后续
if not approval_result.get("approved", False):
return {
"messages": [AIMessage(content=f"操作 '{keyword}' 已被拒绝")],
"approved": False
}
return {
"messages": [AIMessage(content=f"自动处理: {last_msg}")],
"approved": True
}
# 构建图
builder = StateGraph(WorkflowState)
builder.add_node("analyzer", analyzer)
builder.set_entry_point("analyzer")
builder.set_finish_point("analyzer")
# 设置检查点(必须)
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)
# === 使用示例 ===
config = {"configurable": {"thread_id": "session-001"}}
# 启动工作流
try:
result = graph.invoke(
{"messages": [HumanMessage(content="转账1000元给用户123")]},
config=config
)
except Exception as e:
# 如果是中断,这里会被捕获或直接返回中断信息
pass
# 方法一:直接从结果获取中断状态
if hasattr(result, '__interrupt__') and result.__interrupt__:
interrupt_info = result.__interrupt__[0].value
print(f"需要审批: {interrupt_info}")
# 人工批准
user_decision = input("是否批准?(y/n): ")
response = graph.invoke(
Command(resume={"approved": user_decision.lower() == 'y'}),
config=config
)
print(response["messages"][-1].content)
# 方法二:使用 get_state 检查中断
current_state = graph.get_state(config)
if current_state.next:
print("图有下一个节点,未完成")
if current_state.tasks and hasattr(current_state.tasks[0], 'interrupt'):
interrupt_value = current_state.tasks[0].interrupt
print(f"中断值: {interrupt_value}")
# 恢复执行
response = graph.invoke(Command(resume={"approved": True}), config=config)
5. 高级用法:动态条件断点
根据状态动态决定是否在某个节点前暂停:
from langgraph.graph import StateGraph, MessagesState
from langgraph.checkpoint import MemorySaver
def dynamic_interrupt_condition(state: MessagesState):
"""动态判断是否需要中断"""
import random
return random.random() < 0.3 # 30% 概率触发断点
# 自定义节点
def process_with_maybe_break(state: MessagesState):
content = state["messages"][-1].content
# 动态判断
if dynamic_interrupt_condition(state):
# 触发人工介入
result = interrupt({
"message": f"需要确认: {content}",
"options": ["approve", "reject", "modify"]
})
if result.get("action") == "reject":
return {"messages": [AIMessage(content="操作已拒绝")]}
elif result.get("action") == "modify":
# 使用人工修改后的内容
modified = result.get("modified_content", content)
return {"messages": [AIMessage(content=f"处理修改内容: {modified}")]}
return {"messages": [AIMessage(content=f"处理: {content}")]}
6. 多线程/多会话中的断点管理
# 每个会话可以有独立的断点状态
configs = [
{"configurable": {"thread_id": "user-001"}},
{"configurable": {"thread_id": "user-002"}},
]
for cfg in configs:
try:
result = graph.invoke(
{"messages": [HumanMessage(content="高风险操作")]},
config=cfg
)
except:
# 不同会话需要分别恢复
decision = get_approval(cfg["configurable"]["thread_id"])
graph.invoke(Command(resume={"approved": decision}), config=cfg)
7. 最佳实践
| 场景 | 推荐方法 |
|---|---|
| 固定的审批节点 | 编译时设置 interrupt_before |
| 条件触发的中断 | 在节点内使用 interrupt() |
| 需要人工输入/修正 | 使用 Command(resume=data) |
| 多轮审批 | 状态追踪 + 多个中断点 |
| 生产环境 | 必须使用持久化检查点(如 SqliteSaver) |
# 生产环境使用持久化存储
from langgraph.checkpoint.sqlite import SqliteSaver
# 连接 SQLite 数据库
checkpointer = SqliteSaver.from_conn_string("checkpoints.db")
graph = builder.compile(checkpointer=checkpointer)
8. 完整可运行示例
from langgraph.graph import StateGraph, MessagesState
from langgraph.checkpoint import MemorySaver
from langgraph.types import Command, interrupt
from langchain_core.messages import HumanMessage, AIMessage
# 构建简单图
def safe_node(state: MessagesState):
msg = state["messages"][-1].content
if "危险" in msg:
# 需要人工批准
result = interrupt({
"request": msg,
"risk": "high",
"action": "dangerous_operation"
})
if result.get("approved") is False:
return {"messages": [AIMessage(content="安全拦截: 危险操作已拒绝")]}
return {"messages": [AIMessage(content=f"执行: {msg}")]}
builder = StateGraph(MessagesState)
builder.add_node("safe_executor", safe_node)
builder.set_entry_point("safe_executor")
builder.set_finish_point("safe_executor")
graph = builder.compile(checkpointer=MemorySaver())
# 使用
config = {"configurable": {"thread_id": "test-001"}}
try:
graph.invoke({"messages": [HumanMessage(content="这是一个危险操作")]}, config=config)
except:
pass
# 恢复(人工批准)
result = graph.invoke(Command(resume={"approved": True}), config=config)
print(result["messages"][-1].content)
💎 总结
LangGraph 的人工干预机制提供了:
- 静态断点:
interrupt_before/after在编译时固定 - 动态断点:节点内使用
interrupt()根据状态触发 - 状态恢复:通过
Command(resume=...)传递人工输入 - 会话隔离:通过
thread_id管理不同用户的独立状态
这种设计使得复杂的工作流(如需要审批、人工修正、条件暂停)实现起来非常优雅。