函数式 API 允许您将 LangGraph 的核心功能(持久化内存人工介入流式传输)添加到您的应用程序中,而对现有代码的更改最少。
有关函数式 API 的概念信息,请参阅函数式 API

创建简单的工作流

定义 entrypoint 时,输入仅限于函数的第一个参数。要传递多个输入,您可以使用字典。
@entrypoint(checkpointer=checkpointer)
def my_workflow(inputs: dict) -> int:
    value = inputs["value"]
    another_value = inputs["another_value"]
    ...

my_workflow.invoke({"value": 1, "another_value": 2})
from langchain_core.utils.uuid import uuid7
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver

# 检查数字是否为偶数的任务
@task
def is_even(number: int) -> bool:
    return number % 2 == 0

# 格式化消息的任务
@task
def format_message(is_even: bool) -> str:
    return "The number is even." if is_even else "The number is odd."

# 为持久化创建检查点保存器
checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def workflow(inputs: dict) -> str:
    """分类数字的简单工作流。"""
    even = is_even(inputs["number"]).result()
    return format_message(even).result()

# 使用唯一的线程 ID 运行工作流
config = {"configurable": {"thread_id": str(uuid7())}}
result = workflow.invoke({"number": 7}, config=config)
print(result)
此示例演示了如何使用 @task@entrypoint 装饰器。 鉴于提供了检查点保存器,工作流结果将 持久化在检查点保存器中。
import uuid
from langchain.chat_models import init_chat_model
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver

model = init_chat_model('gpt-3.5-turbo')

# 使用 LLM 生成文章的任务
@task
def compose_essay(topic: str) -> str:
    """生成关于给定主题的文章。"""
    return model.invoke([
        {"role": "system", "content": "You are a helpful assistant that writes essays."},
        {"role": "user", "content": f"Write an essay about {topic}."}
    ]).content

# 为持久化创建检查点保存器
checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def workflow(topic: str) -> str:
    """使用 LLM 生成文章的简单工作流。"""
    return compose_essay(topic).result()

# 执行工作流
config = {"configurable": {"thread_id": str(uuid7())}}
result = workflow.invoke("the history of flight", config=config)
print(result)

并行执行

可以通过并发调用任务并等待结果来并行执行任务。这对于提高 IO 密集型任务的性能很有用(例如,调用 LLM 的 API)。
@task
def add_one(number: int) -> int:
    return number + 1

@entrypoint(checkpointer=checkpointer)
def graph(numbers: list[int]) -> list[str]:
    futures = [add_one(i) for i in numbers]
    return [f.result() for f in futures]
此示例演示了如何使用 @task 并行运行多个 LLM 调用。每个调用在不同的主题上生成一段话,结果被组合成单个文本输出。
import uuid
from langchain.chat_models import init_chat_model
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver

# 初始化 LLM 模型
model = init_chat_model("gpt-3.5-turbo")

# 生成关于给定主题的段落的任务
@task
def generate_paragraph(topic: str) -> str:
    response = model.invoke([
        {"role": "system", "content": "You are a helpful assistant that writes educational paragraphs."},
        {"role": "user", "content": f"Write a paragraph about {topic}."}
    ])
    return response.content

# 为持久化创建检查点保存器
checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def workflow(topics: list[str]) -> str:
    """并行生成多个段落并将它们组合起来。"""
    futures = [generate_paragraph(topic) for topic in topics]
    paragraphs = [f.result() for f in futures]
    return "\n\n".join(paragraphs)

# 运行工作流
config = {"configurable": {"thread_id": str(uuid7())}}
result = workflow.invoke(["quantum computing", "climate change", "history of aviation"], config=config)
print(result)
此示例使用 LangGraph 的并发模型来提高执行时间,特别是当任务涉及 I/O(如 LLM 完成)时。

调用图

函数式 API图 API 可以在同一个应用程序中一起使用,因为它们共享相同的基础运行时。
from langgraph.func import entrypoint
from langgraph.graph import StateGraph

builder = StateGraph()
...
some_graph = builder.compile()

@entrypoint()
def some_workflow(some_input: dict) -> int:
    # 调用使用图 API 定义的图
    result_1 = some_graph.invoke(...)
    # 调用另一个使用图 API 定义的图
    result_2 = another_graph.invoke(...)
    return {
        "result_1": result_1,
        "result_2": result_2
    }
import uuid
from typing import TypedDict
from langgraph.func import entrypoint
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph

# 定义共享状态类型
class State(TypedDict):
    foo: int

# 定义简单的转换节点
def double(state: State) -> State:
    return {"foo": state["foo"] * 2}

# 使用图 API 构建图
builder = StateGraph(State)
builder.add_node("double", double)
builder.set_entry_point("double")
graph = builder.compile()

# 定义函数式 API 工作流
checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def workflow(x: int) -> dict:
    result = graph.invoke({"foo": x})
    return {"bar": result["foo"]}

# 执行工作流
config = {"configurable": {"thread_id": str(uuid7())}}
print(workflow.invoke(5, config=config))  # Output: {'bar': 10}

调用其他入口点

您可以在 入口点任务内调用其他入口点
@entrypoint() # 将自动使用来自父入口点的检查点保存器
def some_other_workflow(inputs: dict) -> int:
    return inputs["value"]

@entrypoint(checkpointer=checkpointer)
def my_workflow(inputs: dict) -> int:
    value = some_other_workflow.invoke({"value": 1})
    return value
import uuid
from langgraph.func import entrypoint
from langgraph.checkpoint.memory import InMemorySaver

# 初始化检查点保存器
checkpointer = InMemorySaver()

# 一个可重用的子工作流,将数字相乘
@entrypoint()
def multiply(inputs: dict) -> int:
    return inputs["a"] * inputs["b"]

# 调用子工作流的主工作流
@entrypoint(checkpointer=checkpointer)
def main(inputs: dict) -> dict:
    result = multiply.invoke({"a": inputs["x"], "b": inputs["y"]})
    return {"product": result}

# 执行主工作流
config = {"configurable": {"thread_id": str(uuid7())}}
print(main.invoke({"x": 6, "y": 7}, config=config))  # Output: {'product': 42}

流式传输

函数式 API 使用与 图 API 相同的流式传输机制。请 阅读 流式传输指南 部分以获取更多详细信息。 使用流式传输 API 流式传输更新和自定义数据的示例。
from langgraph.func import entrypoint
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.config import get_stream_writer   

checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def main(inputs: dict) -> int:
    writer = get_stream_writer()
    writer("Started processing")
    result = inputs["x"] * 2
    writer(f"Result is {result}")
    return result

config = {"configurable": {"thread_id": "abc"}}

for mode, chunk in main.stream(
    {"x": 5},
    stream_mode=["custom", "updates"],
    config=config
):
    print(f"{mode}: {chunk}")
  1. langgraph.config 导入 get_stream_writer
  2. 在入口点内获取流编写器实例。
  3. 在计算开始之前发出自定义数据。
  4. 在计算结果后发出另一条自定义消息。
  5. 使用 .stream() 处理流式输出。
  6. 指定要使用的流式传输模式。
('updates', {'add_one': 2})
('updates', {'add_two': 3})
('custom', 'hello')
('custom', 'world')
('updates', {'main': 5})
在 Python < 3.11 中使用异步 如果使用 Python < 3.11 并编写异步代码,使用 get_stream_writer 将不起作用。请改用 直接使用 StreamWriter 类。有关更多详细信息,请参阅在 Python < 3.11 中使用异步
from langgraph.types import StreamWriter

@entrypoint(checkpointer=checkpointer)
async def main(inputs: dict, writer: StreamWriter) -> int:  #...

重试策略

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypoint, task
from langgraph.types import RetryPolicy

# 此变量仅用于演示目的以模拟网络故障。
# 它不是您在实际代码中会拥有的内容。
attempts = 0

# 让我们配置 RetryPolicy 以在 ValueError 时重试。
# 默认的 RetryPolicy 针对重试特定的网络错误进行了优化。
retry_policy = RetryPolicy(retry_on=ValueError)

@task(retry_policy=retry_policy)
def get_info():
    global attempts
    attempts +=1

    if attempts < 2:
        raise ValueError('Failure')
    return "OK"

checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def main(inputs, writer):
    return get_info().result()

config = {
    "configurable": {
        "thread_id": "1"
    }
}

main.invoke({'any_input': 'foobar'}, config=config)
'OK'

缓存任务

import time
from langgraph.cache.memory import InMemoryCache
from langgraph.func import entrypoint, task
from langgraph.types import CachePolicy


@task(cache_policy=CachePolicy(ttl=120))
def slow_add(x: int) -> int:
    time.sleep(1)
    return x * 2


@entrypoint(cache=InMemoryCache())
def main(inputs: dict) -> dict[str, int]:
    result1 = slow_add(inputs["x"]).result()
    result2 = slow_add(inputs["x"]).result()
    return {"result1": result1, "result2": result2}


for chunk in main.stream({"x": 5}, stream_mode="updates"):
    print(chunk)

#> {'slow_add': 10}
#> {'slow_add': 10, '__metadata__': {'cached': True}}
#> {'main': {'result1': 10, 'result2': 10}}
  1. ttl 以秒为单位指定。缓存将在此时间后失效。

错误后恢复

import time
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.func import entrypoint, task
from langgraph.types import StreamWriter

# 此变量仅用于演示目的以模拟网络故障。
# 它不是您在实际代码中会拥有的内容。
attempts = 0

@task()
def get_info():
    """
    模拟在成功之前失败一次的任务。
    在第一次尝试时引发异常,然后在随后的尝试中返回 "OK"。
    """
    global attempts
    attempts += 1

    if attempts < 2:
        raise ValueError("Failure")  # 在第一次尝试时模拟失败
    return "OK"
# 为持久化初始化内存检查点保存器
checkpointer = InMemorySaver()

@task
def slow_task():
    """
    通过引入 1 秒延迟来模拟慢速运行的任务。
    """
    time.sleep(1)
    return "Ran slow task."

@entrypoint(checkpointer=checkpointer)
def main(inputs, writer: StreamWriter):
    """
    顺序运行 slow_task 和 get_info 任务的主工作流函数。

    参数:
    - inputs:包含工作流输入值的字典。
    - writer:用于流式传输自定义数据的 StreamWriter。

    工作流首先执行 `slow_task`,然后尝试执行 `get_info`,
    后者在第一次调用时会失败。
    """
    slow_task_result = slow_task().result()  # 对 slow_task 的阻塞调用
    get_info().result()  # 第一次尝试时将在此处引发异常
    return slow_task_result

# 使用唯一线程标识符的工作流执行配置
config = {
    "configurable": {
        "thread_id": "1"  # 用于跟踪工作流执行的唯一标识符
    }
}

# 此调用将由于 slow_task 执行而花费约 1 秒
try:
    # 第一次调用将由于 `get_info` 任务失败而引发异常
    main.invoke({'any_input': 'foobar'}, config=config)
except ValueError:
    pass  # 优雅地处理失败
当我们恢复执行时,我们不需要重新运行 slow_task,因为其结果已保存在检查点中。
main.invoke(None, config=config)
'Ran slow task.'

人工介入

函数式 API 使用 interrupt 函数和 Command 原语支持人工介入工作流。

基本的人工介入工作流

我们将创建三个任务
  1. 追加 "bar"
  2. 暂停以获取人工输入。恢复时,追加人工输入。
  3. 追加 "qux"
from langgraph.func import entrypoint, task
from langgraph.types import Command, interrupt


@task
def step_1(input_query):
    """追加 bar。"""
    return f"{input_query} bar"


@task
def human_feedback(input_query):
    """追加用户输入。"""
    feedback = interrupt(f"Please provide feedback: {input_query}")
    return f"{input_query} {feedback}"


@task
def step_3(input_query):
    """追加 qux。"""
    return f"{input_query} qux"
我们现在可以在入口点中组合这些任务:
from langgraph.checkpoint.memory import InMemorySaver

checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def graph(input_query):
    result_1 = step_1(input_query).result()
    result_2 = human_feedback(result_1).result()
    result_3 = step_3(result_2).result()

    return result_3
interrupt() 在任务内部被调用,使人工能够审查和编辑先前任务的输出。先前任务的结果——在这种情况下是 step_1——被持久化,因此在 interrupt 之后不会再次运行。 让我们发送一个查询字符串:
config = {"configurable": {"thread_id": "1"}}

for event in graph.stream("foo", config):
    print(event)
    print("\n")
注意我们在 step_1 之后使用 interrupt 暂停了。中断提供了恢复运行的说明。要恢复,我们发出一个 Command,其中包含 human_feedback 任务预期的数据。
# 继续执行
for event in graph.stream(Command(resume="baz"), config):
    print(event)
    print("\n")
恢复后,运行继续经过剩余步骤并按预期终止。

审查工具调用

要在执行之前审查工具调用,我们添加一个调用 interruptreview_tool_call 函数。当调用此函数时,执行将暂停,直到我们发出命令以恢复它。 给定一个工具调用,我们的函数将 interrupt 以进行人工审查。此时我们可以:
  • 接受工具调用
  • 修订工具调用并继续
  • 生成自定义工具消息(例如,指示模型重新格式化其工具调用)
from typing import Union

def review_tool_call(tool_call: ToolCall) -> Union[ToolCall, ToolMessage]:
    """审查工具调用,返回验证后的版本。"""
    human_review = interrupt(
        {
            "question": "Is this correct?",
            "tool_call": tool_call,
        }
    )
    review_action = human_review["action"]
    review_data = human_review.get("data")
    if review_action == "continue":
        return tool_call
    elif review_action == "update":
        updated_tool_call = {**tool_call, **{"args": review_data}}
        return updated_tool_call
    elif review_action == "feedback":
        return ToolMessage(
            content=review_data, name=tool_call["name"], tool_call_id=tool_call["id"]
        )
我们现在可以更新我们的入口点 以审查生成的工具调用。如果工具调用被接受或修订,我们以与之前相同的方式执行。否则,我们只是追加由人工提供的 ToolMessage。先前任务的结果——在这种情况下是初始模型调用——被持久化,因此在 interrupt 之后不会再次运行。
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph.message import add_messages
from langgraph.types import Command, interrupt


checkpointer = InMemorySaver()


@entrypoint(checkpointer=checkpointer)
def agent(messages, previous):
    if previous is not None:
        messages = add_messages(previous, messages)

    model_response = call_model(messages).result()
    while True:
        if not model_response.tool_calls:
            break

        # 审查工具调用
        tool_results = []
        tool_calls = []
        for i, tool_call in enumerate(model_response.tool_calls):
            review = review_tool_call(tool_call)
            if isinstance(review, ToolMessage):
                tool_results.append(review)
            else:  # 是验证后的工具调用
                tool_calls.append(review)
                if review != tool_call:
                    model_response.tool_calls[i] = review  # 更新消息

        # 执行剩余的工具调用
        tool_result_futures = [call_tool(tool_call) for tool_call in tool_calls]
        remaining_tool_results = [fut.result() for fut in tool_result_futures]

        # 追加到消息列表
        messages = add_messages(
            messages,
            [model_response, *tool_results, *remaining_tool_results],
        )

        # 再次调用模型
        model_response = call_model(messages).result()

    # 生成最终响应
    messages = add_messages(messages, model_response)
    return entrypoint.final(value=model_response, save=messages)

短期内存

短时内存允许在不同的调用中存储信息,这些调用使用相同的线程 ID。有关更多详细信息,请参阅短时内存

管理检查点

您可以查看和删除由检查点保存器存储的信息。

查看线程状态

config = {
    "configurable": {
        "thread_id": "1",
        # 可选地为特定检查点提供一个 ID,
        # 否则显示最新的检查点
        # "checkpoint_id": "1f029ca3-1f5b-6704-8004-820c16b69a5a"  #

    }
}
graph.get_state(config)
StateSnapshot(
    values={'messages': [HumanMessage(content="hi! I'm bob"), AIMessage(content='Hi Bob! How are you doing today?), HumanMessage(content="what's my name?"), AIMessage(content='Your name is Bob.')]}, next=(),
    config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f029ca3-1f5b-6704-8004-820c16b69a5a'}},
    metadata={
        'source': 'loop',
        'writes': {'call_model': {'messages': AIMessage(content='Your name is Bob.')}},
        'step': 4,
        'parents': {},
        'thread_id': '1'
    },
    created_at='2025-05-05T16:01:24.680462+00:00',
    parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f029ca3-1790-6b0a-8003-baf965b6a38f'}},
    tasks=(),
    interrupts=()
)

查看线程的历史记录

config = {
    "configurable": {
        "thread_id": "1"
    }
}
list(graph.get_state_history(config))
[
    StateSnapshot(
        values={'messages': [HumanMessage(content="hi! I'm bob"), AIMessage(content='Hi Bob! How are you doing today? Is there anything I can help you with?'), HumanMessage(content="what's my name?"), AIMessage(content='Your name is Bob.')]},
        next=(),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f029ca3-1f5b-6704-8004-820c16b69a5a'}},
        metadata={'source': 'loop', 'writes': {'call_model': {'messages': AIMessage(content='Your name is Bob.')}}, 'step': 4, 'parents': {}, 'thread_id': '1'},
        created_at='2025-05-05T16:01:24.680462+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f029ca3-1790-6b0a-8003-baf965b6a38f'}},
        tasks=(),
        interrupts=()
    ),
    StateSnapshot(
        values={'messages': [HumanMessage(content="hi! I'm bob"), AIMessage(content='Hi Bob! How are you doing today? Is there anything I can help you with?'), HumanMessage(content="what's my name?")]},
        next=('call_model',),
        config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f029ca3-1790-6b0a-8003-baf965b6a38f'}},
        metadata={'source': 'loop', 'writes': None, 'step': 3, 'parents': {}, 'thread_id': '1'},
        created_at='2025-05-05T16:01:23.863421+00:00',
        parent_config={'configurable': {'thread_id': '1', 'checkpoint_ns': '', 'checkpoint_id': '1f029ca3-1790-6b0a-8003-baf965b6a38f'}},
        tasks=(PregelTask(id='8ab4155e-6b15-b885-9ce5-bed69a2c305c', name='call_model', path=('__pregel_pull', 'call_model'), error=None, interrupts=(), state=None, result={'messages': AIMessage(content='Your name is Bob.')}),),
        interrupts=()
    ),
    StateSnapshot(
        values={'messages': [HumanMessage(content="hi! I'm bob"), AIMessage(content='Hi Bob! How are you doing today? Is there anything I can help you with?')]},
        next=('__start__',),
        config={...},
        metadata={'source': 'input', 'writes': {'__start__': {'messages': [{'role': 'user', 'content': "what's my name?"}]}}, 'step': 2, 'parents': {}, 'thread_id': '1'},
        created_at='2025-05-05T16:01:23.863173+00:00',
        parent_config={...}
        tasks=(PregelTask(id='24ba39d6-6db1-4c9b-f4c5-682aeaf38dcd', name='__start__', path=('__pregel_pull', '__start__'), error=None, interrupts=(), state=None, result={'messages': [{'role': 'user', 'content': "what's my name?"}]}),),
        interrupts=()
    ),
    StateSnapshot(
        values={'messages': [HumanMessage(content="hi! I'm bob")]},
        next=('call_model',),
        config={...},
        metadata={'source': 'loop', 'writes': None, 'step': 0, 'parents': {}, 'thread_id': '1'},
        created_at='2025-05-05T16:01:22.278960+00:00',
        parent_config={...}
        tasks=(PregelTask(id='8cbd75e0-3720-b056-04f7-71ac805140a0', name='call_model', path=('__pregel_pull', 'call_model'), error=None, interrupts=(), state=None, result={'messages': AIMessage(content='Hi Bob! How are you doing today? Is there anything I can help you with?')}),),
        interrupts=()
    ),
    StateSnapshot(
        values={'messages': []},
        next=('__start__',),
        config={...},
        metadata={'source': 'input', 'writes': {'__start__': {'messages': [{'role': 'user', 'content': "hi! I'm bob"}]}}, 'step': -1, 'parents': {}, 'thread_id': '1'},
        created_at='2025-05-05T16:01:22.277497+00:00',
        parent_config=None,
        tasks=(PregelTask(id='d458367b-8265-812c-18e2-33001d199ce6', name='__start__', path=('__pregel_pull', '__start__'), error=None, interrupts=(), state=None, result={'messages': [{'role': 'user', 'content': "hi! I'm bob"}]}),),
        interrupts=()
    )
]

解耦返回值与保存的值

使用 entrypoint.final 解耦返回给调用者的内容与持久化在检查点中的内容。这在以下情况下很有用:
  • 您想要返回计算结果(例如,摘要或状态),但保存不同的内部值以在下一次调用时使用。
  • 您需要控制在下一次运行时传递给 previous 参数的内容。
from langgraph.func import entrypoint
from langgraph.checkpoint.memory import InMemorySaver

checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def accumulate(n: int, *, previous: int | None) -> entrypoint.final[int, int]:
    previous = previous or 0
    total = previous + n
    # 将 *previous* 值返回给调用者,但将 *new* 总数保存到检查点。
    return entrypoint.final(value=previous, save=total)

config = {"configurable": {"thread_id": "my-thread"}}

print(accumulate.invoke(1, config=config))  # 0
print(accumulate.invoke(2, config=config))  # 1
print(accumulate.invoke(3, config=config))  # 3

聊天机器人示例

一个使用函数式 API 和 InMemorySaver 检查点保存器的简单聊天机器人示例。 该机器人能够记住之前的对话并从中断处继续。
from langchain.messages import BaseMessage
from langgraph.graph import add_messages
from langgraph.func import entrypoint, task
from langgraph.checkpoint.memory import InMemorySaver
from langchain_anthropic import ChatAnthropic

model = ChatAnthropic(model="claude-sonnet-4-6")

@task
def call_model(messages: list[BaseMessage]):
    response = model.invoke(messages)
    return response

checkpointer = InMemorySaver()

@entrypoint(checkpointer=checkpointer)
def workflow(inputs: list[BaseMessage], *, previous: list[BaseMessage]):
    if previous:
        inputs = add_messages(previous, inputs)

    response = call_model(inputs).result()
    return entrypoint.final(value=response, save=add_messages(inputs, response))

config = {"configurable": {"thread_id": "1"}}
input_message = {"role": "user", "content": "hi! I'm bob"}
for chunk in workflow.stream([input_message], config, stream_mode="values"):
    chunk.pretty_print()

input_message = {"role": "user", "content": "what's my name?"}
for chunk in workflow.stream([input_message], config, stream_mode="values"):
    chunk.pretty_print()

长期内存

长期内存 允许在不同的线程 ID之间存储信息。这对于在一个对话中学习有关给定用户的信息并在另一个对话中使用它可能很有用。

工作流

  • 工作流和代理 指南,其中包含更多有关如何使用函数式 API 构建工作流的示例。

与其他库集成