概述

LangGraph 通过检查点支持时间旅行:
  • 重放:从先前的检查点重试。
  • 分叉:从带有修改状态的先前检查点创建分支,以探索替代路径。
两者都通过从先前的检查点恢复来工作。检查点之前的节点不会重新执行(结果已保存)。检查点之后的节点会重新执行,包括任何 LLM 调用、API 请求和中断(可能产生不同的结果)。

重放

使用先前检查点的配置调用图以从该点重放。
重放会重新执行节点——它不只是从缓存读取。LLM 调用、API 请求和中断会再次触发,可能返回不同的结果。从最终检查点重放(没有 next 节点)是一个空操作。
重放 使用get_state_history找到要重放的检查点,然后使用该检查点的配置调用invoke
from langgraph.graph import StateGraph, START
from langgraph.checkpoint.memory import InMemorySaver
from typing_extensions import TypedDict, NotRequired
from langchain_core.utils.uuid import uuid7

class State(TypedDict):
    topic: NotRequired[str]
    joke: NotRequired[str]


def generate_topic(state: State):
    return {"topic": "socks in the dryer"}


def write_joke(state: State):
    return {"joke": f"Why do {state['topic']} disappear? They elope!"}


checkpointer = InMemorySaver()
graph = (
    StateGraph(State)
    .add_node("generate_topic", generate_topic)
    .add_node("write_joke", write_joke)
    .add_edge(START, "generate_topic")
    .add_edge("generate_topic", "write_joke")
    .compile(checkpointer=checkpointer)
)

# 步骤 1:运行图
config = {"configurable": {"thread_id": str(uuid7())}}
result = graph.invoke({}, config)

# 步骤 2:找到要重放的检查点
history = list(graph.get_state_history(config))
# 历史记录按倒序排列
for state in history:
    print(f"next={state.next}, checkpoint_id={state.config['configurable']['checkpoint_id']}")

# 步骤 3:从特定检查点重放
# 找到 write_joke 之前的检查点
before_joke = next(s for s in history if s.next == ("write_joke",))
replay_result = graph.invoke(None, before_joke.config)
# write_joke 重新执行(再次运行),generate_topic 不运行

分叉

分叉从过去的检查点创建带有修改状态的新分支。在先前检查点上调用update_state以创建分叉,然后使用 None 调用invoke以继续执行。 分叉
update_state 不会回滚线程。它创建一个从指定点分支的新检查点。原始执行历史保持不变。
# 找到 write_joke 之前的检查点
history = list(graph.get_state_history(config))
before_joke = next(s for s in history if s.next == ("write_joke",))

# 分叉:更新状态以更改主题
fork_config = graph.update_state(
    before_joke.config,
    values={"topic": "chickens"},
)

# 从分叉恢复 — write_joke 使用新主题重新执行
fork_result = graph.invoke(None, fork_config)
print(fork_result["joke"])  # A joke about chickens, not socks

从特定节点

当您调用update_state时,值使用指定节点的写入器(包括归约器)应用。检查点记录该节点产生了更新,执行从该节点的后继者恢复。 默认情况下,LangGraph 从检查点的版本历史推断 as_node。从特定检查点分叉时,这种推断几乎总是正确的。 在以下情况下显式指定 as_node
  • 并行分支:多个节点在同一步骤中更新了状态,LangGraph 无法确定最后一个是哪个(InvalidUpdateError)。
  • 没有执行历史:在全新线程上设置状态(在测试中很常见)。
  • 跳过节点:将 as_node 设置为较晚的节点,使图认为该节点已运行。
# 图:generate_topic -> write_joke

# 将此更新视为 generate_topic 产生的。
# 执行从 write_joke 恢复(generate_topic 的后继者)。
fork_config = graph.update_state(
    before_joke.config,
    values={"topic": "chickens"},
    as_node="generate_topic",
)

中断

如果您的图使用interrupt进行人工介入工作流,中断总是在时间旅行期间重新触发。包含中断的节点会重新执行,interrupt() 暂停等待新的 Command(resume=...)
from langgraph.types import interrupt, Command

class State(TypedDict):
    value: list[str]

def ask_human(state: State):
    answer = interrupt("What is your name?")
    return {"value": [f"Hello, {answer}!"]}

def final_step(state: State):
    return {"value": ["Done"]}

graph = (
    StateGraph(State)
    .add_node("ask_human", ask_human)
    .add_node("final_step", final_step)
    .add_edge(START, "ask_human")
    .add_edge("ask_human", "final_step")
    .compile(checkpointer=InMemorySaver())
)

config = {"configurable": {"thread_id": "1"}}

# 首次运行:遇到中断
graph.invoke({"value": []}, config)
# 使用答案恢复
graph.invoke(Command(resume="Alice"), config)

# 从 ask_human 之前重放
history = list(graph.get_state_history(config))
before_ask = [s for s in history if s.next == ("ask_human",)][-1]

replay_result = graph.invoke(None, before_ask.config)
# 暂停在中断 — 等待新的 Command(resume=...)

# 从 ask_human 之前分叉
fork_config = graph.update_state(before_ask.config, {"value": ["forked"]})
fork_result = graph.invoke(None, fork_config)
# 暂停在中断 — 等待新的 Command(resume=...)

# 使用不同答案恢复分叉的中断
graph.invoke(Command(resume="Bob"), fork_config)
# 结果:{"value": ["分叉的", "您好,Bob!", "完成"]}

多个中断

如果您的图在多个点收集输入(例如,多步骤表单),您可以从中断之间分叉,以更改后续答案而不重新询问先前的问题。
def ask_name(state):
    name = interrupt("您叫什么名字?")
    return {"value": [f"姓名:{name}"]}

def ask_age(state):
    age = interrupt("您多大了?")
```python
def ask_name(state):
    name = interrupt("What is your name?")
    return {"value": [f"name:{name}"]}

def ask_age(state):
    age = interrupt("How old are you?")
    return {"value": [f"age:{age}"]}

# Graph: ask_name -> ask_age -> final
# After completing both interrupts:

# Fork from BETWEEN the two interrupts (after ask_name, before ask_age)
history = list(graph.get_state_history(config))
between = [s for s in history if s.next == ("ask_age",)][-1]

fork_config = graph.update_state(between.config, {"value": ["modified"]})
result = graph.invoke(None, fork_config)
# ask_name result preserved ("name:Alice")
# ask_age pauses at interrupt — waiting for new answer

Subgraphs

Time travel with subgraphs depends on whether the subgraph has its own checkpointer. This determines the granularity of checkpoints you can time travel from.
By default, a subgraph inherits the parent’s checkpointer. The parent treats the entire subgraph as a single super-step — there is only one parent-level checkpoint for the whole subgraph execution. Time traveling from before the subgraph re-executes it from scratch.You cannot time travel to a point between nodes in a default subgraph — you can only time travel from the parent level.
# Subgraph without its own checkpointer (default)
subgraph = (
    StateGraph(State)
    .add_node("step_a", step_a)       # Has interrupt()
    .add_node("step_b", step_b)       # Has interrupt()
    .add_edge(START, "step_a")
    .add_edge("step_a", "step_b")
    .compile()  # No checkpointer — inherits from parent
)

graph = (
    StateGraph(State)
    .add_node("subgraph_node", subgraph)
    .add_edge(START, "subgraph_node")
    .compile(checkpointer=InMemorySaver())
)

config = {"configurable": {"thread_id": "1"}}

# Complete both interrupts
graph.invoke({"value": []}, config)            # Hits step_a interrupt
graph.invoke(Command(resume="Alice"), config)  # Hits step_b interrupt
graph.invoke(Command(resume="30"), config)     # Completes

# Time travel from before the subgraph
history = list(graph.get_state_history(config))
before_sub = [s for s in history if s.next == ("subgraph_node",)][-1]

fork_config = graph.update_state(before_sub.config, {"value": ["forked"]})
result = graph.invoke(None, fork_config)
# The entire subgraph re-executes from scratch
# You cannot time travel to a point between step_a and step_b
See subgraph persistence for more on configuring subgraph checkpointers.