持久执行是一种技术,其中进程或工作流在关键点保存其进度,允许它暂停,然后在停止的地方恢复。这在需要人工介入的场景中特别有用,用户可以在继续之前检查、验证或修改流程,也适用于可能遇到中断或错误(如 LLM 调用超时)的长时间运行任务。通过保留已完成的工作,持久执行使进程能够恢复而不重新处理之前的步骤——即使在很长时间后(如一周后)。 LangGraph 内置的持久化层为工作流提供持久执行,确保每个执行步骤的状态保存到持久化存储中。这种能力保证如果工作流被中断——无论是系统故障还是人工介入交互——它都可以从最后记录的状态恢复。
如果您将 LangGraph 与检查点保存器一起使用,您已经启用了持久执行。您可以在任何点暂停和恢复工作流,即使在中途或失败后。 要充分利用持久执行,请确保您的工作流设计为确定性的幂等的,并将任何副作用或非确定性操作包装在tasks中。您可以从 StateGraph (Graph API)Functional API 两者使用 tasks

要求

要在 LangGraph 中利用持久执行,您需要:
  1. 通过指定检查点保存器来在工作流中启用持久化,该保存器将保存工作流进度。
  2. 执行工作流时指定线程标识符。这将跟踪工作流特定实例的执行历史。
  3. 将任何非确定性操作(如随机数生成)或具有副作用的操作(如文件写入、API 调用)包装在tasks中,以确保恢复工作流时,这些操作不会为特定运行重复,而是从持久化层检索其结果。有关更多信息,请参阅确定性和一致回放

确定性的一致回放

当您恢复工作流运行时,代码不会同一行代码恢复执行;而是它会识别一个合适的起始点来从停止的地方继续。这意味着工作流将从起始点重放所有步骤,直到到达停止的点。 因此,在为持久执行编写工作流时,您必须将任何非确定性操作(如随机数生成)和任何具有副作用的操作(如文件写入、API 调用)包装在tasksnodes中。 为确保您的工作流是确定性的并且可以一致回放,请遵循以下准则:
  • 避免重复工作:如果一个node包含多个具有副作用的操作(如日志记录、文件写入或网络调用),将每个操作包装在单独的 task 中。这确保了恢复工作流时,这些操作不会重复,而是从持久化层检索其结果。
  • 封装非确定性操作: 将任何可能产生非确定性结果(如随机数生成)的代码包装在 tasksnodes 中。这确保了恢复时,工作流按照完全记录的步骤序列和相同的结果继续。
  • 使用幂等操作:尽可能确保副作用(如 API 调用、文件写入)是幂等的。这意味着如果工作流中的操作在失败后重试,其效果与第一次执行时相同。这对于导致数据写入的操作尤其重要。如果一个 task 启动但未能成功完成,工作流的恢复将重新运行该 task,依赖记录的结果来保持一致性。使用幂等键或验证现有结果以避免意外重复,确保平滑和可预测的工作流执行。
有关要避免的陷阱的一些示例,请参阅 functional API 中的常见陷阱部分,其中展示了如何使用 tasks 构建代码以避免这些问题。相同的原则适用于 StateGraph (Graph API)

持久性模式

LangGraph 支持三种持久性模式,允许您根据应用程序的要求平衡性能和数据一致性。较高的持久性模式会增加工作流执行的开销。您可以在调用任何图执行方法时指定持久性模式:
graph.stream(
    {"input": "test"},
    durability="sync"
)
持久性模式从最低到最高如下:
  • "exit":LangGraph 仅在图执行成功退出、出错或由于人工介入中断时持久化更改。这为长时间运行的图提供了最佳性能,但意味着中间状态不保存,因此您无法从执行中途发生的系统故障(如进程崩溃)中恢复。
  • "async":LangGraph 在下一步执行时异步持久化更改。这提供了良好的性能和持久性,但存在一个小风险,即 LangGraph 在执行期间进程崩溃时不写入检查点。
  • "sync":LangGraph 在下一步开始之前同步持久化更改。这确保了 LangGraph 在继续执行之前写入每个检查点,以一些性能开销为代价提供高持久性。

在节点中使用 tasks

如果一个node包含多个操作,您可能会发现将每个操作转换为一个 task 比将操作重构为单独的节点更容易。
from typing import NotRequired
from typing_extensions import TypedDict
from langchain_core.utils.uuid import uuid7

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START, END
import requests

# 定义一个 TypedDict 来表示状态
class State(TypedDict):
    url: str
    result: NotRequired[str]

def call_api(state: State):
    """发出 API 请求的示例节点"""
    result = requests.get(state['url']).text[:100]  # 副作用  #
    return {
        "result": result
    }

# 创建 StateGraph 构建器并为 call_api 函数添加一个节点
builder = StateGraph(State)
builder.add_node("call_api", call_api)

# 将开始和结束节点连接到 call_api 节点
builder.add_edge(START, "call_api")
builder.add_edge("call_api", END)

# 指定检查点保存器
checkpointer = InMemorySaver()

# 使用检查点保存器编译图
graph = builder.compile(checkpointer=checkpointer)

# 定义带有线程 ID 的配置
thread_id = str(uuid7())
config = {"configurable": {"thread_id": thread_id}}

# 调用图
graph.invoke({"url": "https://www.example.com"}, config)

恢复工作流

一旦在工作流中启用了持久执行,您可以针对以下场景恢复执行:
  • 暂停和恢复工作流: 使用 interrupt 函数在特定点暂停工作流,并使用 Command 原语恢复更新后的状态。有关更多详细信息,请参阅中断
  • 从故障中恢复: 在异常(例如 LLM 提供商中断)后自动从最后一个成功的检查点恢复工作流。这涉及通过提供 None 作为输入值使用相同的线程标识符执行工作流(请参阅 functional API 的此示例)。

恢复工作流的起始点

  • 如果您使用的是 StateGraph (Graph API),起始点是执行停止的节点的开始。
  • 如果您在节点内进行子图调用,起始点将是调用停止的子图的节点。 在子图内部,起始点将是执行停止的特定节点
  • 如果您使用的是 Functional API,起始点是执行停止的入口点的开始。