• 微信:WANCOME
  • 扫码加微信,提供专业咨询
  • 服务热线
  • 13215191218
    13027920428

  • 微信扫码访问本页
desc-2
环企首页

LangGraph 中的人工干预与动态断点

LangGraph 提供了强大的人工干预机制,允许在 Agent 执行过程中暂停、审查、修改状态,然后继续执行。这对于需要人工审核、纠错或提供额外信息的场景非常有用。


1. 核心概念

断点(Breakpoints)

在图执行的关键节点(节点之前或之后)设置暂停点,让开发者有机会介入。

人工干预(Human-in-the-loop)

在图暂停时,人工可以:

  • 查看当前状态
  • 修改状态中的内容(如修正错误、补充信息)
  • 批准或拒绝某个操作
  • 跳过某些步骤

2. 设置静态断点

在编译图时指定断点

    from langgraph.graph import StateGraph, MessagesState
from langgraph.checkpoint import MemorySaver

# 创建图
builder = StateGraph(MessagesState)

# 添加节点
builder.add_node("assistant", assistant)
builder.add_node("tools", tool_node)

# 添加边
builder.add_edge("assistant", "tools")
builder.add_edge("tools", "assistant")
builder.set_entry_point("assistant")

# 设置检查点(必须)
checkpointer = MemorySaver()

# 编译时指定断点
graph = builder.compile(
    checkpointer=checkpointer,
    interrupt_before=["tools"],     # 在执行 tools 节点前暂停
    interrupt_after=["assistant"]   # 在执行 assistant 节点后暂停
)

参数说明

  • interrupt_before: 在进入指定节点前暂停
  • interrupt_after: 在执行完指定节点后暂停

3. 动态断点

动态断点允许在运行时根据状态决定是否暂停,而不是预先固定。

方式一:在节点内使用 Command 暂停

    from langgraph.types import Command, Interrupt

def assistant(state: MessagesState):
    # 获取最后一条用户消息
    last_message = state["messages"][-1].content
    
    # 检测关键词,决定是否触发中断
    if "删除" in last_message or "审核" in last_message:
        # 暂停并等待人工输入
        response = interrupt({
            "type": "approval_required",
            "message": f"用户请求: {last_message}",
            "needs_approval": True
        })
        
        if response.get("approved") is False:
            return {"messages": [AIMessage(content="操作已取消")]}
    
    # 正常执行
    return {"messages": [AIMessage(content=f"处理: {last_message}")]}

方式二:使用 Command(resume=...) 恢复执行

from langgraph.types import Command

# 第一次调用(会暂停)
result = graph.invoke(
    {"messages": [HumanMessage(content="请删除我的账户")]},
    config={"configurable": {"thread_id": "user-123"}}
)

# 检查是否有中断
if result.get("__interrupt__"):
    print("等待人工审批:", result["__interrupt__"])
    
    # 人工决策后继续执行
    result = graph.invoke(
        Command(resume={"approved": True}),
        config={"configurable": {"thread_id": "user-123"}}
    )

4. 完整的实例:敏感操作审批流程

    from typing import Annotated, TypedDict
from langgraph.graph import StateGraph, add_messages
from langgraph.checkpoint import MemorySaver
from langgraph.types import Command, Interrupt
from langchain_core.messages import HumanMessage, AIMessage

# 定义状态
class WorkflowState(TypedDict):
    messages: Annotated[list, add_messages]
    pending_approval: dict
    approved: bool

# 定义节点
def analyzer(state: WorkflowState):
    """分析请求是否需要审批"""
    last_msg = state["messages"][-1].content
    
    # 检测敏感操作
    sensitive_keywords = ["删除", "转账", "修改密码", "封禁"]
    
    for keyword in sensitive_keywords:
        if keyword in last_msg:
            # 触发中断,等待审批
            approval_result = Interrupt({
                "type": "human_approval",
                "operation": keyword,
                "request": last_msg,
                "requires": "admin_approval"
            })
            
            # 根据审批结果决定后续
            if not approval_result.get("approved", False):
                return {
                    "messages": [AIMessage(content=f"操作 '{keyword}' 已被拒绝")],
                    "approved": False
                }
    
    return {
        "messages": [AIMessage(content=f"自动处理: {last_msg}")],
        "approved": True
    }

# 构建图
builder = StateGraph(WorkflowState)
builder.add_node("analyzer", analyzer)
builder.set_entry_point("analyzer")
builder.set_finish_point("analyzer")

# 设置检查点(必须)
checkpointer = MemorySaver()
graph = builder.compile(checkpointer=checkpointer)

# === 使用示例 ===
config = {"configurable": {"thread_id": "session-001"}}

# 启动工作流
try:
    result = graph.invoke(
        {"messages": [HumanMessage(content="转账1000元给用户123")]},
        config=config
    )
except Exception as e:
    # 如果是中断,这里会被捕获或直接返回中断信息
    pass

# 方法一:直接从结果获取中断状态
if hasattr(result, '__interrupt__') and result.__interrupt__:
    interrupt_info = result.__interrupt__[0].value
    print(f"需要审批: {interrupt_info}")
    
    # 人工批准
    user_decision = input("是否批准?(y/n): ")
    response = graph.invoke(
        Command(resume={"approved": user_decision.lower() == 'y'}),
        config=config
    )
    print(response["messages"][-1].content)

# 方法二:使用 get_state 检查中断
current_state = graph.get_state(config)
if current_state.next:
    print("图有下一个节点,未完成")
    
    if current_state.tasks and hasattr(current_state.tasks[0], 'interrupt'):
        interrupt_value = current_state.tasks[0].interrupt
        print(f"中断值: {interrupt_value}")
        
        # 恢复执行
        response = graph.invoke(Command(resume={"approved": True}), config=config)


5. 高级用法:动态条件断点

根据状态动态决定是否在某个节点前暂停:

    from langgraph.graph import StateGraph, MessagesState
from langgraph.checkpoint import MemorySaver

def dynamic_interrupt_condition(state: MessagesState):
    """动态判断是否需要中断"""
    import random
    return random.random() < 0.3  # 30% 概率触发断点

# 自定义节点
def process_with_maybe_break(state: MessagesState):
    content = state["messages"][-1].content
    
    # 动态判断
    if dynamic_interrupt_condition(state):
        # 触发人工介入
        result = interrupt({
            "message": f"需要确认: {content}",
            "options": ["approve", "reject", "modify"]
        })
        
        if result.get("action") == "reject":
            return {"messages": [AIMessage(content="操作已拒绝")]}
        elif result.get("action") == "modify":
            # 使用人工修改后的内容
            modified = result.get("modified_content", content)
            return {"messages": [AIMessage(content=f"处理修改内容: {modified}")]}
    
    return {"messages": [AIMessage(content=f"处理: {content}")]}


6. 多线程/多会话中的断点管理

# 每个会话可以有独立的断点状态
configs = [
    {"configurable": {"thread_id": "user-001"}},
    {"configurable": {"thread_id": "user-002"}},
]

for cfg in configs:
    try:
        result = graph.invoke(
            {"messages": [HumanMessage(content="高风险操作")]},
            config=cfg
        )
    except:
        # 不同会话需要分别恢复
        decision = get_approval(cfg["configurable"]["thread_id"])
        graph.invoke(Command(resume={"approved": decision}), config=cfg)


7. 最佳实践

场景 推荐方法
固定的审批节点 编译时设置 interrupt_before
条件触发的中断 在节点内使用 interrupt()
需要人工输入/修正 使用 Command(resume=data)
多轮审批 状态追踪 + 多个中断点
生产环境 必须使用持久化检查点(如 SqliteSaver
    # 生产环境使用持久化存储
from langgraph.checkpoint.sqlite import SqliteSaver

# 连接 SQLite 数据库
checkpointer = SqliteSaver.from_conn_string("checkpoints.db")
graph = builder.compile(checkpointer=checkpointer)


8. 完整可运行示例

from langgraph.graph import StateGraph, MessagesState
from langgraph.checkpoint import MemorySaver
from langgraph.types import Command, interrupt
from langchain_core.messages import HumanMessage, AIMessage

# 构建简单图
def safe_node(state: MessagesState):
    msg = state["messages"][-1].content
    
    if "危险" in msg:
        # 需要人工批准
        result = interrupt({
            "request": msg,
            "risk": "high",
            "action": "dangerous_operation"
        })
        
        if result.get("approved") is False:
            return {"messages": [AIMessage(content="安全拦截: 危险操作已拒绝")]}
    
    return {"messages": [AIMessage(content=f"执行: {msg}")]}

builder = StateGraph(MessagesState)
builder.add_node("safe_executor", safe_node)
builder.set_entry_point("safe_executor")
builder.set_finish_point("safe_executor")

graph = builder.compile(checkpointer=MemorySaver())

# 使用
config = {"configurable": {"thread_id": "test-001"}}

try:
    graph.invoke({"messages": [HumanMessage(content="这是一个危险操作")]}, config=config)
except:
    pass

# 恢复(人工批准)
result = graph.invoke(Command(resume={"approved": True}), config=config)
print(result["messages"][-1].content)


💎 总结

LangGraph 的人工干预机制提供了:

  1. 静态断点interrupt_before/after 在编译时固定
  2. 动态断点:节点内使用 interrupt() 根据状态触发
  3. 状态恢复:通过 Command(resume=...) 传递人工输入
  4. 会话隔离:通过 thread_id 管理不同用户的独立状态

这种设计使得复杂的工作流(如需要审批、人工修正、条件暂停)实现起来非常优雅。