Functional API 允许您以最少的代码改动向现有应用程序添加 LangGraph 的关键功能(持久化记忆人工介入流式传输)。 它旨在将这些功能集成到可能使用标准语言原语进行分支和控制流的现有代码中,例如 if 语句、for 循环和函数调用。与许多需要将代码重构为显式管道或 DAG 的数据编排框架不同,Functional API 允许您合并这些功能而无需强制执行刚性执行模型。 Functional API 使用两个关键构建块:
  • @entrypoint:将函数标记为工作流的起点,封装逻辑并管理执行流程,包括处理长时间运行的任务和中断。
  • @task:表示一个离散的工作单元,例如 API 调用或数据处理步骤,可以在 entrypoint 内异步执行。任务返回一个类似 future 的对象,可以同步等待或解析。
这提供了一个用于构建具有状态管理和流式传输的工作流的最小抽象。
有关如何使用 functional API 的信息,请参阅使用 Functional API

Functional API 与 Graph API

对于喜欢更声明式方法的用户,LangGraph 的 Graph API 允许您使用图范式定义工作流。两种 API 共享相同的底层运行时,因此您可以在同一应用程序中一起使用它们。 以下是一些关键区别:
  • 控制流:Functional API 不需要考虑图结构。您可以使用标准 Python 构造来定义工作流。这通常会减少您需要编写的代码量。
  • 短期记忆Graph API 需要声明一个状态,可能还需要定义归约器来管理图状态的更新。@entrypoint@tasks 不需要显式状态管理,因为它们的状态作用域仅限于函数,不会在函数之间共享。
  • 检查点:两种 API 都会生成和使用检查点。在 Graph API 中,每个超级步骤后都会生成一个新的检查点。在 Functional API 中,当任务执行时,它们的结果会保存到与给定 entrypoint 关联的现有检查点中,而不是创建新的检查点。
  • 可视化:Graph API 可以轻松地将工作流可视化为图,这有助于调试、理解工作流以及与他人共享。Functional API 不支持可视化,因为图是在运行时动态生成的。

示例

下面我们演示一个简单的应用程序,它写一篇论文并中断请求人工审查。
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypoint, task
from langgraph.types import interrupt

@task
def write_essay(topic: str) -> str:
    """写一篇关于给定主题的文章"""
    time.sleep(1) # 长时间运行任务的占位符
    return f"An essay about topic: {topic}"

@entrypoint(checkpointer=InMemorySaver())
def workflow(topic: str) -> dict:
    """一个简单的工作流,写一篇文章并请求审查"""
    essay = write_essay("cat").result()
    is_approved = interrupt({
        # 作为参数提供给 interrupt 的任何 JSON 可序列化有效载荷
        # 当从工作流流式传输数据时,它将在客户端作为 Interrupt 暴露
        "essay": essay, # 我们希望审查的文章
        # 我们可以添加我们需要的任何其他信息
        # 例如,添加一个名为 "action" 的键和一些说明
        "action": "请批准/拒绝这篇文章",
    })

    return {
        "essay": essay, # 生成的文章
        "is_approved": is_approved, # 来自人工介入的响应
    }
此工作流将写一篇关于主题”猫”的文章,然后暂停以获取人工审查。工作流可以无限期中断,直到提供审查。当工作流恢复时,它从头开始执行,但因为 writeEssay 任务的结果已经保存,任务结果将从检查点加载而不是重新计算。
import time
from langchain_core.utils.uuid import uuid7
from langgraph.func import entrypoint, task
from langgraph.types import interrupt
from langgraph.checkpoint.memory import InMemorySaver


@task
def write_essay(topic: str) -> str:
    """写一篇关于给定主题的文章"""
    time.sleep(1)  # 这是长时间运行任务的占位符
    return f"An essay about topic: {topic}"

@entrypoint(checkpointer=InMemorySaver())
def workflow(topic: str) -> dict:
    """一个简单的工作流,写一篇文章并请求审查"""
    essay = write_essay("cat").result()
    is_approved = interrupt(
        {
            # 作为参数提供给 interrupt 的任何 JSON 可序列化有效载荷
            # 当从工作流流式传输数据时,它将在客户端作为 Interrupt 暴露
            "essay": essay,  # 我们希望审查的文章
            # 我们可以添加我们需要的任何其他信息
            # 例如,添加一个名为 "action" 的键和一些说明
            "action": "请批准/拒绝这篇文章",
        }
    )
    return {
        "essay": essay,  # 生成的文章
        "is_approved": is_approved,  # 来自人工介入的响应
    }


thread_id = str(uuid7())
config = {"configurable": {"thread_id": thread_id}}
for item in workflow.stream("cat", config):
    print(item)
# > {'write_essay': '一篇关于主题的文章:cat'}
# > {
# >     '__interrupt__': (
# >        Interrupt(
# >            value={
# >                'essay': '一篇关于主题的文章:cat',
# >                'action': '请批准/拒绝这篇文章'
# >            },
# >            id='b9b2b9d788f482663ced6dc755c9e981'
# >        ),
# >    )
# > }
文章已经写好,可以进行审查了。提供审查后,我们可以恢复工作流:
from langgraph.types import Command

# 从用户获取审查(例如,通过 UI)
# 在这种情况下,我们使用布尔值,但这可以是任何 JSON 可序列化的值
human_review = True

for item in workflow.stream(Command(resume=human_review), config):
    print(item)
{'workflow': {'essay': '一篇关于主题的文章:cat', 'is_approved': False}}
工作流已完成,审查已添加到文章中。

Entrypoint

@entrypoint 装饰器可用于从函数创建工作流。它封装工作流逻辑并管理执行流程,包括处理_长时间运行的任务_和中断

定义

entrypoint 通过使用 @entrypoint 装饰器装饰函数来定义。 该函数必须接受单个位置参数,它作为工作流输入。如果您需要传递多个数据块,请使用字典作为第一个参数的类型。 使用 entrypoint 装饰函数会产生一个 Pregel 实例,帮助管理工作流的执行(例如,处理流式传输、恢复和检查点)。 您通常需要将检查点保存器传递给 @entrypoint 装饰器,以启用持久化并使用人工介入等功能。
from langgraph.func import entrypoint

@entrypoint(checkpointer=checkpointer)
def my_workflow(some_input: dict) -> int:
    # 一些可能涉及长时间运行任务(如 API 调用)的逻辑
    # 并且可能因人工介入而中断
    ...
    return result
序列化 entrypoint 的输入输出必须是 JSON 可序列化的以支持检查点。请参阅序列化部分了解更多详情。

可注入参数

声明 entrypoint 时,您可以请求访问在运行时自动注入的其他参数。这些参数包括:
参数描述
previous访问与给定线程的先前 checkpoint 关联的状态。请参阅短期记忆
store[BaseStore][langgraph.store.base.BaseStore] 的实例。对长期记忆有用。
writer在使用 Async Python < 3.11 时用于访问 StreamWriter。请参阅 Functional API 流式传输详情
config用于访问运行时配置。请参阅 RunnableConfig 了解更多信息。
使用适当的名称和类型注解声明参数。
from langchain_core.runnables import RunnableConfig
from langgraph.func import entrypoint
from langgraph.store.base import BaseStore
from langgraph.store.memory import InMemoryStore
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import StreamWriter

in_memory_checkpointer = InMemorySaver(...)
in_memory_store = InMemoryStore(...)  # 长期记忆的 InMemoryStore 实例

@entrypoint(
    checkpointer=in_memory_checkpointer,  # 指定检查点保存器
    store=in_memory_store  # 指定存储
)
def my_workflow(
    some_input: dict,  # 输入(例如,通过 `invoke` 传递)
    *,
    previous: Any = None, # 用于短期记忆
    store: BaseStore,  # 用于长期记忆
    writer: StreamWriter,  # 用于流式传输自定义数据
    config: RunnableConfig  # 用于访问传递给 entrypoint 的配置
) -> ...: