中断允许您在特定点暂停图执行,并等待外部输入后再继续。这实现了人工介入模式,在这种模式下您需要外部输入才能继续。当触发中断时,LangGraph 使用其持久化层保存图状态,并无限期等待直到您恢复执行。 中断通过在图的节点中调用 interrupt() 函数来工作。该函数接受任何 JSON 可序列化的值并将其暴露给调用者。当您准备好继续时,您通过使用 Command 重新调用图来恢复执行,然后该值成为节点内 interrupt() 调用的返回值。 与静态断点(在学习习之前或之后暂停特定节点)不同,中断是动态的:它们可以放置在代码的任何位置,并且可以基于应用程序逻辑有条件地执行。
  • 检查点保存您的位置: 检查点保存器写入确切的状态,这样您以后可以恢复,即使处于错误状态。
  • thread_id 是您的指针: 设置 config={"configurable": {"thread_id": ...}} 来告诉检查点保存器加载哪个状态。
  • 中断有效载荷通过 chunk["interrupts"] 暴露: 当使用 version="v2" 流式传输时,传递给 interrupt() 的值会出现在 values 流部分的 interrupts 字段中,这样您就知道图在等待什么。
您选择的 thread_id 实际上是您的持久游标。重用它会恢复相同的检查点;使用新值会启动一个具有空状态的全新线程。

使用 interrupt 暂停

interrupt 函数暂停图执行并向调用者返回一个值。当您在节点内调用interrupt时,LangGraph 保存当前图状态并等待您恢复执行。 要使用interrupt,您需要:
  1. 一个检查点保存器来持久化图状态(在生产环境中使用持久检查点保存器)
  2. 在配置中设置线程 ID,以便运行时知道从哪个状态恢复
  3. 在您想要暂停的地方调用 interrupt()(有效载荷必须是 JSON 可序列化的)
from langgraph.types import interrupt

def approval_node(state: State):
    # 暂停并请求批准
    approved = interrupt("您批准此操作吗?")

    # 当您恢复时,Command(resume=...) 会将值返回这里
    return {"approved": approved}
当您调用interrupt时,会发生以下情况:
  1. 图执行在interrupt被调用的确切位置被挂起
  2. 状态被保存到检查点保存器,以便以后可以恢复执行,在生产环境中,这应该是持久检查点保存器(例如由数据库支持)
  3. 值被返回给调用者,键为 __interrupt__;它可以是任何 JSON 可序列化的值(字符串、对象、数组等)
  4. 图无限期等待直到您用响应恢复执行
  5. 当您恢复时,响应被传递回节点,成为 interrupt() 调用的返回值

恢复中断

在中断暂停执行后,您可以通过使用包含恢复值的 Command 再次调用图来恢复图。恢复值被传递回 interrupt 调用,允许节点使用外部输入继续执行。
from langgraph.types import Command

# 初始运行 - 命中中断并暂停
# thread_id 是持久指针(在生产环境中存储稳定 ID)
config = {"configurable": {"thread_id": "thread-1"}}
result = graph.invoke({"input": "data"}, config=config, version="v2")

# result 是一个 GraphOutput,包含 .value 和 .interrupts
# .interrupts 包含传递给 interrupt() 的有效载荷
print(result.interrupts)
# > (Interrupt(value='您批准此操作吗?'),)

# 用人类的响应恢复
# 恢复有效载荷成为 interrupt() 内部的返回值
graph.invoke(Command(resume=True), config=config, version="v2")
关于恢复的关键点:
  • 恢复时必须使用相同的线程 ID
  • 传递给 Command(resume=...) 的值成为interrupt调用的返回值
  • 节点在恢复时从头重新启动interrupt被调用的节点,因此 interrupt() 之前的任何代码都会再次运行
  • 您可以传递任何 JSON 可序列化的值作为恢复值
Command(resume=...) 是作为 invoke()/stream() 输入的唯一 Command 模式。其他 Command 参数(updategotograph)专为从节点函数返回而设计。不要传递 Command(update=...) 作为输入来继续多轮对话——而是传递普通输入字典。

常见模式

中断解锁的关键是暂停执行并等待外部输入的能力。这对各种用例很有用,包括:

使用人工介入中断进行流式传输

在构建具有人工介入工作流的交互式 Agent 时,您可以同时流式传输消息块和节点更新,以在处理中断时提供实时反馈。 使用多个流模式("messages""updates")以及 subgraphs=True(如果存在子图)来:
  • 实时流式传输 AI 响应
  • 检测图何时遇到中断
  • 无缝处理用户输入并恢复执行
async for chunk in graph.astream(
    initial_input,
    stream_mode=["messages", "updates"],
    subgraphs=True,
    config=config,
    version="v2",
):
    if chunk["type"] == "messages":
        # 处理流式传输的消息内容
        msg, _ = chunk["data"]
        if isinstance(msg, AIMessageChunk) and msg.content:
            display_streaming_content(msg.content)

    elif chunk["type"] == "updates":
        # 检查更新数据中的中断
        if "__interrupt__" in chunk["data"]:
            interrupt_info = chunk["data"]["__interrupt__"][0].value
            user_response = get_user_input(interrupt_info)
            initial_input = Command(resume=user_response)
            break
        else:
            current_node = list(chunk["data"].keys())[0]
  • version="v2":所有块都是具有 typensdata 键的 StreamPart 字典
  • chunk["type"]:按流模式("messages""updates" 等)进行类型推断
  • chunk["ns"]:标识源图(根图为空元组,子图则填充)
  • subgraphs=True:对于嵌套图中的中断检测是必需的
  • Command(resume=...):用用户提供的数据恢复图执行

处理多个中断

当并行分支同时中断时(例如,扇出到多个节点,每个节点都调用 interrupt()),您可能需要在单个调用中恢复多个中断。 当用单个调用恢复多个中断时,将每个中断 ID 映射到其恢复值。 这确保每个响应在运行时与正确的中断配对。
from typing import Annotated, TypedDict
import operator

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import START, END, StateGraph
from langgraph.types import Command, interrupt


class State(TypedDict):
    vals: Annotated[list[str], operator.add]


def node_a(state):
    answer = interrupt("question_a")
    return {"vals": [f"a:{answer}"]}


def node_b(state):
    answer = interrupt("question_b")
    return {"vals": [f"b:{answer}"]}


graph = (
    StateGraph(State)
    .add_node("a", node_a)
    .add_node("b", node_b)
    .add_edge(START, "a")
    .add_edge(START, "b")
    .add_edge("a", END)
    .add_edge("b", END)
    .compile(checkpointer=InMemorySaver())
)

config = {"configurable": {"thread_id": "1"}}