LangGraph 内置了持久化层,将图状态保存为检查点。当您使用检查点保存器编译图时,图状态的快照会在执行的每一步保存,并组织成线程。这支持人工介入工作流、对话记忆、时间旅行调试和容错执行。 检查点
Agent Server 自动处理检查点 当使用 Agent Server 时,您无需手动实现或配置检查点保存器。服务器会在幕后处理所有持久化基础设施。

为什么使用持久化

以下功能需要持久化:
  • 人工介入:检查点保存器通过允许人工检查、中断和批准图步骤来促进人工介入工作流。这些工作流需要检查点保存器,因为人工必须能够随时查看图的状态,并且图必须在人工对状态进行任何更新后恢复执行。请参阅中断查看示例。
  • 记忆:检查点保存器允许两次交互之间的”记忆”。对于重复的人工交互(如对话),任何后续消息都可以发送到该线程,它将保留对之前交互的记忆。请参阅添加记忆了解如何使用检查点保存器添加和管理对话记忆。
  • 时间旅行:检查点保存器允许”时间旅行”,允许用户重放先前的图执行以审查和/或调试特定的图步骤。此外,检查点保存器使在任意检查点处分支图状态成为可能,以探索替代轨迹。
  • 容错:检查点提供容错和错误恢复:如果一个或多个节点在给定的超级步骤中失败,您可以从最后一个成功步骤重新启动图。
  • 待写入:当图节点在给定的超级步骤中途执行失败时,LangGraph 会存储该超级步骤中其他成功完成的节点的待检查点写入。当您从该超级步骤恢复图执行时,您无需重新运行成功的节点。

核心概念

线程

线程是分配给检查点保存器保存的每个检查点的唯一 ID 或线程标识符。它包含一系列运行的累积状态。当执行运行时,助手底层图的状态将被持久化到线程中。 当使用检查点保存器调用图时,您必须指定 thread_id 作为配置 configurable 部分的一部分:
{"configurable": {"thread_id": "1"}}
可以检索线程的当前和历史状态。线程必须在执行运行之前创建才能持久化状态。LangSmith API 提供了多个用于创建和管理线程及线程状态的端点。请参阅 API 参考了解更多详情。 检查点保存器使用 thread_id 作为存储和检索检查点的主键。没有它,检查点保存器无法保存状态或在中断后恢复执行,因为检查点保存器使用 thread_id 加载保存的状态。

检查点

线程在特定时间点的状态称为检查点。检查点是每个超级步骤保存的图状态快照,由 StateSnapshot 对象表示(请参阅 StateSnapshot 字段查看完整字段参考)。

超级步骤

LangGraph 在每个超级步骤边界创建一个检查点。超级步骤是图的单个”tick”,其中计划在该步骤的所有节点执行(可能并行)。对于像 START -> A -> B -> END 这样的顺序图,输入、节点 A 和节点 B 都有单独的超级步骤——每个之后产生一个检查点。了解超级步骤边界对于时间旅行很重要,因为您只能从检查点(即超级步骤边界)恢复执行。 检查点被持久化,可用于稍后恢复线程的状态。 让我们看看当按如下方式调用简单图时期望保存的检查点:
from langgraph.graph import StateGraph, START, END
from langgraph.checkpoint.memory import InMemorySaver
from langchain_core.runnables import RunnableConfig
from typing import Annotated
from typing_extensions import TypedDict
from operator import add

class State(TypedDict):
    foo: str
    bar: Annotated[list[str], add]

def node_a(state: State):
    return {"foo": "a", "bar": ["a"]}

def node_b(state: State):
    return {"foo": "b", "bar": ["b"]}


workflow = StateGraph(State)
workflow.add_node(node_a)
workflow.add_node(node_b)
workflow.add_edge(START, "node_a")
workflow.add_edge("node_a", "node_b")
workflow.add_edge("node_b", END)

checkpointer = InMemorySaver()
graph = workflow.compile(checkpointer=checkpointer)

config: RunnableConfig = {"configurable": {"thread_id": "1"}}
graph.invoke({"foo": "", "bar":[]}, config)
运行图后,我们期望看到正好 4 个检查点:
  • 空的检查点,START 作为下一个要执行的节点
  • 用户输入 {'foo': '', 'bar': []}node_a 作为下一个要执行的节点的检查点
  • node_a 的输出 {'foo': 'a', 'bar': ['a']}node_b 作为下一个要执行的节点的检查点
  • node_b 的输出 {'foo': 'b', 'bar': ['a', 'b']} 和没有下一个要执行的节点的检查点
注意,因为我们对 bar 通道有归约器,bar 通道值包含两个节点的输出。

检查点命名空间

每个检查点都有一个 checkpoint_ns(检查点命名空间)字段,用于标识它属于哪个图或子图:
  • ""(空字符串):检查点属于父(根)图。
  • "node_name:uuid":检查点属于作为给定节点调用的子图。对于嵌套子图,命名空间用 | 分隔符连接(例如,"outer_node:uuid|inner_node:uuid")。
您可以通过配置从节点内访问检查点命名空间:
from langchain_core.runnables import RunnableConfig

def my_node(state: State, config: RunnableConfig):
    checkpoint_ns = config["configurable"]["checkpoint_ns"]
    # 父图为 "",子图为 "node_name:uuid"
请参阅子图了解更多关于处理子图状态和检查点的详细信息。

获取和更新状态

获取状态

与保存的图状态交互时,您必须指定线程标识符。您可以通过调用 graph.get_state(config) 查看图的_最新_状态。这将返回一个 StateSnapshot 对象,对应于配置中提供的线程 ID 关联的最新检查点,或者如果提供了与线程关联的检查点 ID,则返回该检查点 ID 关联的 StateSnapshot
# 获取最新的状态快照
config = {"configurable": {"thread_id": "1"}}
graph.get_state(config)

# 获取特定 checkpoint_id 的状态快照
config = {"configurable": {"thread_id": "1", "checkpoint_id": "1ef663ba-28fe-6528-8002-5a559208592c"}}
graph.get_state(config)
在我们的示例中,get_state 的输出如下:
StateSnapshot(
    values={'foo': 'b', 'bar': ['a', 'b']},
    next=(),
    config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28fe-6528-8002-5a559208592c'}},
    metadata={'source': 'loop', 'writes': {'node_b': {'foo': 'b', 'bar': ['b']}}, 'step': 2},
    created_at='2024-08-29T19:19:38.821749+00:00',
    parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}}, tasks=()
)

StateSnapshot 字段

字段类型描述
valuesdict此检查点的状态通道值。
nexttuple[str, ...]接下来要执行的节点名称。空的 () 表示图已完成。
configdict包含 thread_idcheckpoint_nscheckpoint_id
metadatadict执行元数据。包含 source"input""loop""update")、writes(节点输出)和 step(超级步骤计数器)。
created_atstr创建此检查点的时间戳(ISO 8601)。
parent_configdict | None上一个检查点的配置。第一个检查点为 None
taskstuple[PregelTask, ...]此步骤要执行的任务。每个任务有 idnameerrorinterrupts,可选地还有 state(子图快照,使用 subgraphs=True 时)。

获取状态历史

您可以通过调用 graph.get_state_history(config) 获取给定线程的完整图执行历史。这将返回与配置中提供的线程 ID 关联的 StateSnapshot 对象列表。重要的是,检查点按时间顺序排列,最新的检查点/StateSnapshot 位于列表的开头。
config = {"configurable": {"thread_id": "1"}}
list(graph.get_state_history(config))
在我们的示例中,get_state_history 的输出如下:
[
    StateSnapshot(
        values={'foo': 'b', 'bar': ['a', 'b']},
        next=(),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28fe-6528-8002-5a559208592c'}},
        metadata={'source': 'loop', 'writes': {'node_b': {'foo': 'b', 'bar': ['b']}}, 'step': 2},
        created_at='2024-08-29T19:19:38.821749+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}},
        tasks=(),
    ),
    StateSnapshot(
        values={'foo': 'a', 'bar': ['a']},
        next=('node_b',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f9-6ec4-8001-31981c2c39f8'}},
        metadata={'source': 'loop', 'writes': {'node_a': {'foo': 'a', 'bar': ['a']}}, 'step': 1},
        created_at='2024-08-29T19:19:38.819946+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f4-6b4a-8000-ca575a13d36a'}},
        tasks=(PregelTask(id='6fb7314f-f114-5413-a1f3-d37dfe98ff44', name='node_b', error=None, interrupts=()),),
    ),
    StateSnapshot(
        values={'foo': '', 'bar': []},
        next=('node_a',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f4-6b4a-8000-ca575a13d36a'}},
        metadata={'source': 'loop', 'writes': None, 'step': 0},
        created_at='2024-08-29T19:19:38.817813+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f0-6c66-bfff-6723431e8481'}},
        tasks=(PregelTask(id='f1b14528-5ee5-579c-949b-23ef9bfbed58', name='node_a', error=None, interrupts=()),),
    ),
    StateSnapshot(
        values={'bar': []},
        next=('__start__',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1ef663ba-28f0-6c66-bfff-6723431e8481'}},
        metadata={'source': 'input', 'writes': {'foo': ''}, 'step': -1},
        created_at='2024-08-29T19:19:38.816205+00:00',
        parent_config=None,
        tasks=(PregelTask(id='6d27aa2e-d72b-5504-a36f-8620e54a76dd', name='__start__', error=None, interrupts=()),),
    )
]
状态

查找特定检查点

您可以筛选状态历史以查找匹配特定条件的检查点:
history = list(graph.get_state_history(config))

# 找到特定节点执行之前的检查点
before_node_b = next(s for s in history if s.next == ("node_b",))

# 按步骤编号查找检查点
step_2 = next(s for s in history if s.metadata["step"] == 2)