LangGraph 实现了一个流式传输系统,用于呈现实时更新。流式传输对于增强基于 LLM 构建的应用程序的响应能力至关重要。通过在完整响应准备好之前逐步显示输出,流式传输显著改善了用户体验(UX),特别是在处理 LLM 延迟时。

开始使用

基本用法

LangGraph 图公开了 stream(同步)和 astream(异步)方法,作为迭代器产生流式输出。传递一个或多个流模式来控制您接收的数据。
for chunk in graph.stream(
    {"topic": "ice cream"},
    stream_mode=["updates", "custom"],
    version="v2",
):
    if chunk["type"] == "updates":
        for node_name, state in chunk["data"].items():
            print(f"节点 {node_name} 已更新:{state}")
    elif chunk["type"] == "custom":
        print(f"状态:{chunk['data']['status']}")
输出
状态:正在构思笑话...
节点 generate_joke 已更新:{'joke': '为什么冰淇淋要去学校?为了接受圣代教育!'}
from typing import TypedDict
from langgraph.graph import StateGraph, START, END
from langgraph.config import get_stream_writer


class State(TypedDict):
    topic: str
    joke: str


def generate_joke(state: State):
    writer = get_stream_writer()
    writer({"status": "正在构思笑话..."})
    return {"joke": f"为什么 {state['topic']} 要去学校?为了接受圣代教育!"}

graph = (
    StateGraph(State)
    .add_node(generate_joke)
    .add_edge(START, "generate_joke")
    .add_edge("generate_joke", END)
    .compile()
)

for chunk in graph.stream(
    {"topic": "ice cream"},
    stream_mode=["updates", "custom"],
    version="v2",
):
    if chunk["type"] == "updates":
        for node_name, state in chunk["data"].items():
            print(f"节点 {node_name} 已更新:{state}")
    elif chunk["type"] == "custom":
        print(f"状态:{chunk['data']['status']}")
输出
状态:正在构思笑话...
节点 generate_joke 已更新:{'joke': '为什么冰淇淋要去学校?为了接受圣代教育!'}

流输出格式 (v2)

需要 LangGraph >= 1.1。本页所有示例都使用 version="v2"
version="v2" 传递给 stream()astream() 以获得统一的输出格式。每个块都是一个具有一致形状的 StreamPart 字典——无论流模式、数量或子图设置如何:
{
    "type": "values" | "updates" | "messages" | "custom" | "checkpoints" | "tasks" | "debug",
    "ns": (),           # 命名空间元组,为子图事件填充
    "data": ...,        # 实际有效载荷(类型因流模式而异)
}
每种流模式都有对应的 TypedDict,包含 ValuesStreamPartUpdatesStreamPartMessagesStreamPartCustomStreamPartCheckpointStreamPartTasksStreamPartDebugStreamPart。您可以从 langgraph.types 导入这些类型。联合类型 StreamPartpart["type"] 的不相交联合,支持编辑器和类型检查器中的完整类型收窄。 使用 v1(默认),输出格式会根据您的流式传输选项而变化(单模式返回原始数据,多模式返回 (mode, data) 元组,子图返回 (namespace, data) 元组)。使用 v2,格式始终相同:
for chunk in graph.stream(inputs, stream_mode="updates", version="v2"):
    print(chunk["type"])  # "updates"
    print(chunk["ns"])    # ()
    print(chunk["data"])  # {"node_name": {"key": "value"}}
v2 格式还支持类型收窄,这意味着您可以按 chunk["type"] 过滤块,并获得该模式的正确有效载荷类型。每个分支将 part["data"] 收窄到该模式的特定类型:
for part in graph.stream(
    {"topic": "ice cream"},
    stream_mode=["values", "updates", "messages", "custom"],
    version="v2",
):
    if part["type"] == "values":
        # ValuesStreamPart — 每个步骤后的完整状态快照
        print(f"状态:topic={part['data']['topic']}")
    elif part["type"] == "updates":
        # UpdatesStreamPart — 每个节点的仅更改的键
        for node_name, state in part["data"].items():
            print(f"节点 `{node_name}` 已更新:{state}")
    elif part["type"] == "messages":
        # MessagesStreamPart — LLM 调用的 (LLM token, metadata) 元组
        msg, metadata = part["data"]
        print(msg.content, end="", flush=True)
    elif part["type"] == "custom":
        # CustomStreamPart — 通过 get_stream_writer() 发出的任意数据
        print(f"进度:{part['data']['progress']}%")

流模式

将以下一个或多个流模式作为列表传递给 streamastream 方法:
模式类型描述
valuesValuesStreamPart每个步骤后的完整状态。
updatesUpdatesStreamPart每个步骤后的状态更新。同一步骤中的多个更新单独流式传输。
messagesMessagesStreamPartLLM 调用返回的 (LLM token, metadata) 元组。
customCustomStreamPart通过 get_stream_writer 从节点发出的自定义数据。
checkpointsCheckpointStreamPart检查点事件(与 get_state() 格式相同)。需要检查点保存器。
tasksTasksStreamPart带结果和错误的任务开始/完成事件。需要检查点保存器。
debugDebugStreamPart所有可用信息——结合 checkpointstasks 以及额外元数据。

图状态

使用流模式 updatesvalues 来流式传输图执行时的状态。
  • updates 流式传输每个步骤后节点的状态更新
  • values 流式传输每个步骤后状态的完整值
from typing import TypedDict
from langgraph.graph import StateGraph, START, END


class State(TypedDict):
  topic: str
  joke: str


def refine_topic(state: State):
    return {"topic": state["topic"] + " 和猫"}


def generate_joke(state: State):
    return {"joke": f"这是一个关于 {state['topic']} 的笑话"}

graph = (
  StateGraph(State)
  .add_node(refine_topic)
  .add_node(generate_joke)
  .add_edge(START, "refine_topic")
  .add_edge("refine_topic", "generate_joke")
  .add_edge("generate_joke", END)
  .compile()
)
使用此模式仅流式传输每个步骤后节点返回的状态更新。流式输出包括节点名称以及更新内容。
for chunk in graph.stream(
    {"topic": "ice cream"},
    stream_mode="updates",
    version="v2",
):
    if chunk["type"] == "updates":
        for node_name, state in chunk["data"].items():
            print(f"节点 `{node_name}` 已更新:{state}")
输出
节点 `refine_topic` 已更新:{'topic': 'ice cream and cats'}
节点 `generate_joke` 已更新:{'joke': 'This is a joke about ice cream and cats'}

LLM tokens

使用 messages 流模式从图的任何部分(包括节点、工具、子图或任务)逐 token 流式传输大语言模型(LLM)的输出。 来自 messages 模式的流式输出是一个元组 (message_chunk, metadata),其中:
  • message_chunk:来自 LLM 的 token 或消息段。
  • metadata:包含图节点和 LLM 调用详细信息的字典。
如果您的 LLM 不作为 LangChain 集成提供,您可以使用 custom 模式流式传输其输出。有关详细信息,请参阅与任何 LLM 一起使用
在 Python < 3.11 中使用异步需要手动配置 在 Python < 3.11 中使用异步代码时,必须显式地将 RunnableConfig 传递给 ainvoke() 以启用正确的流式传输。有关详细信息,请参阅在 Python < 3.11 中使用异步,或升级到 Python 3.11+。
from dataclasses import dataclass

from langchain.chat_models import init_chat_model
from langgraph.graph import StateGraph, START


@dataclass
class MyState:
    topic: str
    joke: str = ""


model = init_chat_model(model="gpt-4.1-mini")

def call_model(state: MyState):
    """调用 LLM 生成关于主题的笑话"""
    # 注意,即使 LLM 使用 .invoke 而不是 .stream 运行,也会发出消息事件
    model_response = model.invoke(
        [
            {"role": "user", "content": f"Generate a joke about {state.topic}"}
        ]
    )
    return {"joke": model_response.content}

graph = (
    StateGraph(MyState)
    .add_node(call_model)
    .add_edge(START, "call_model")
    .compile()
)

# "messages" 流模式流式传输带有元数据的 LLM tokens
# 使用 version="v2" 获得统一的 StreamPart 格式
for chunk in graph.stream(
    {"topic": "ice cream"},
    stream_mode="messages",
    version="v2",
):
    if chunk["type"] == "messages":
        message_chunk, metadata = chunk["data"]
        if message_chunk.content:
            print(message_chunk.content, end="|", flush=True)

Filter by LLM invocation

您可以将 tags 与 LLM 调用关联起来,以按 LLM 调用过滤流式传输的 tokens。
from langchain.chat_models import init_chat_model

# model_1 标记为 "joke"
model_1 = init_chat_model(model="gpt-4.1-mini", tags=['joke'])
# model_2 标记为 "poem"
model_2 = init_chat_model(model="gpt-4.1-mini", tags=['poem'])

graph = ... # 定义使用这些 LLM 的图

# stream_mode 设置为 "messages" 以流式传输 LLM tokens
# 元数据包含有关 LLM 调用的信息,包括 tags
async for chunk in graph.astream(
    {"topic": "cats"},
    stream_mode="messages",
    version="v2",
):
    if chunk["type"] == "messages":
        msg, metadata = chunk["data"]
        # 按元数据中的 tags 字段过滤流式传输的 tokens,以仅包含
        # 带有 "joke" 标签的 LLM 调用的 tokens
        if metadata["tags"] == ["joke"]:
            print(msg.content, end="|", flush=True)
from typing import TypedDict

from langchain.chat_models import init_chat_model
from langgraph.graph import START, StateGraph

# joke_model 标记为 "joke"
joke_model = init_chat_model(model="gpt-4.1-mini", tags=["joke"])
# poem_model 标记为 "poem"
poem_model = init_chat_model(model="gpt-4.1-mini", tags=["poem"])


class State(TypedDict):
      topic: str
      joke: str
      poem: str


async def call_model(state, config):
      topic = state["topic"]
      print("Writing joke...")
      # 注意:显式传递 config 对于 python < 3.11 是必需的
      # 因为在此之前没有添加上下文变量支持:https://docs.python.org/3/library/asyncio-task.html#creating-tasks
      # 显式传递 config 以确保上下文变量正确传播
      # 在使用异步代码时,Python < 3.11 需要此操作。有关更多详细信息,请参阅异步部分
      joke_response = await joke_model.ainvoke(
            [{"role": "user", "content": f"Write a joke about {topic}"}],
            config,
      )
      print("\n\nWriting poem...")
      poem_response = await poem_model.ainvoke(
            [{"role": "user", "content": f"Write a short poem about {topic}"}],
            config,
      )
      return {"joke": joke_response.content, "poem": poem_response.content}


graph = (
      StateGraph(State)
      .add_node(call_model)
      .add_edge(START, "call_model")
      .compile()
)

# stream_mode 设置为 "messages" 以流式传输 LLM tokens
# 元数据包含有关 LLM 调用的信息,包括 tags
async for chunk in graph.astream(
      {"topic": "cats"},
      stream_mode="messages",
      version="v2",
):
    if chunk["type"] == "messages":
        msg, metadata = chunk["data"]
        if metadata["tags"] == ["joke"]:
            print(msg.content, end="|", flush=True)

Omit messages from the stream

使用 nostream 标签完全从流中排除 LLM 输出。标记为 nostream 的调用仍然运行并产生输出;它们的 tokens 只是不在 messages 模式下发出。 这在以下情况下很有用:
  • 您需要 LLM 输出进行内部处理(例如结构化输出),但不想将其流式传输到客户端
  • 您通过不同的通道(例如自定义 UI 消息)流式传输相同的内容,并希望避免在 messages 流中出现重复输出
from typing import Any, TypedDict

from langchain_anthropic import ChatAnthropic
from langgraph.graph import START, StateGraph

stream_model = ChatAnthropic(model_name="claude-3-haiku-20240307")
internal_model = ChatAnthropic(model_name="claude-3-haiku-20240307").with_config(
    {"tags": ["nostream"]}
)


class State(TypedDict):
    topic: str
    answer: str
    notes: str


def answer(state: State) -> dict[str, Any]:
    r = stream_model.invoke(
        [{"role": "user", "content": f"Reply briefly about {state['topic']}"}]
    )
    return {"answer": r.content}


def internal_notes(state: State) -> dict[str, Any]:
    # Tokens from this model are omitted from stream_mode="messages" because of nostream
    r = internal_model.invoke(
        [{"role": "user", "content": f"Private notes on {state['topic']}"}]
    )
    return {"notes": r.content}


graph = (
    StateGraph(State)
    .add_node("write_answer", answer)
    .add_node("internal_notes", internal_notes)
    .add_edge(START, "write_answer")
    .add_edge("write_answer", "internal_notes")
    .compile()
)

initial_state: State = {"topic": "AI", "answer": "", "notes": ""}
stream = graph.stream(initial_state, stream_mode="messages")

Filter by node

要仅从特定节点流式传输 tokens,请使用 stream_mode="messages" 并通过流式传输元数据中的 langgraph_node 字段过滤输出:
# "messages" 流模式流式传输带有元数据的 LLM tokens
# 使用 version="v2" 获得统一的 StreamPart 格式
for chunk in graph.stream(
    inputs,
    stream_mode="messages",
    version="v2",
):
    if chunk["type"] == "messages":
        msg, metadata = chunk["data"]
        # 按元数据中的 langgraph_node 字段过滤流式传输的 tokens
        # 以仅包含来自指定节点的 tokens
        if msg.content and metadata["langgraph_node"] == "some_node_name":
            ...
from typing import TypedDict
from langgraph.graph import START, StateGraph
from langchain_openai import ChatOpenAI

model = ChatOpenAI(model="gpt-4.1-mini")


class State(TypedDict):
      topic: str
      joke: str
      poem: str


def write_joke(state: State):
      topic = state["topic"]
      joke_response = model.invoke(
            [{"role": "user", "content": f"Write a joke about {topic}"}]
      )
      return {"joke": joke_response.content}


def write_poem(state: State):
      topic = state["topic"]
      poem_response = model.invoke(
            [{"role": "user", "content": f"Write a short poem about {topic}"}]
      )
      return {"poem": poem_response.content}


graph = (
      StateGraph(State)
      .add_node(write_joke)
      .add_node(write_poem)
      # 同时编写笑话和诗歌
      .add_edge(START, "write_joke")
      .add_edge(START, "write_poem")
      .compile()
)

# "messages" 流模式流式传输带有元数据的 LLM tokens
# 使用 version="v2" 获得统一的 StreamPart 格式
for chunk in graph.stream(
    {"topic": "cats"},
    stream_mode="messages",
    version="v2",
):
    if chunk["type"] == "messages":
        msg, metadata = chunk["data"]
        # 按元数据中的 langgraph_node 字段过滤流式传输的 tokens
        # 以仅包含来自 write_poem 节点的 tokens
        if msg.content and metadata["langgraph_node"] == "write_poem":
            print(msg.content, end="|", flush=True)

Custom data

要从 LangGraph 节点或工具内部发送自定义用户定义数据,请按照以下步骤操作:
  1. 使用 get_stream_writer 访问流编写器并发送自定义数据。
  2. 调用 .stream().astream() 时设置 stream_mode="custom" 以在流中获取自定义数据。您可以组合多种模式(例如 ["updates", "custom"]),但至少必须有一个是 "custom"
在 Python < 3.11 的异步中没有 get_stream_writer 在 Python < 3.11 上运行的异步代码中,get_stream_writer 将不起作用。 相反,请向您的节点或工具添加 writer 参数并手动传递它。 有关使用示例,请参阅在 Python < 3.11 中使用异步
from typing import TypedDict
from langgraph.config import get_stream_writer
from langgraph.graph import StateGraph, START

class State(TypedDict):
    query: str
    answer: str

def node(state: State):
    # 获取流编写器以发送自定义数据
    writer = get_stream_writer()
    # 发出自定义键值对(例如进度更新)
    writer({"custom_key": "在节点内生成自定义数据"})
    return {"answer": "some data"}

graph = (
    StateGraph(State)
    .add_node(node)
    .add_edge(START, "node")
    .compile()
)

inputs = {"query": "example"}

# 设置 stream_mode="custom" 以在流中接收自定义数据
for chunk in graph.stream(inputs, stream_mode="custom", version="v2"):
    if chunk["type"] == "custom":
        print(f"Custom event: {chunk['data']['custom_key']}")

Subgraph outputs

要在流式输出中包含子图的输出,您可以在父图的 .stream() 方法中设置 subgraphs=True。这将流式传输父图和任何子图的输出。 输出将作为元组 (namespace, data) 流式传输,其中 namespace 是一个包含调用子图的节点路径的元组,例如 ("parent_node:<task_id>", "child_node:<task_id>")
使用 version="v2",子图事件使用相同的 StreamPart 格式。ns 字段标识来源:
for chunk in graph.stream(
    {"foo": "foo"},
    subgraphs=True,
    stream_mode="updates",
    version="v2",
):
    print(chunk["type"])  # "updates"
    print(chunk["ns"])    # () for root, ("node_name:<task_id>",) for subgraph
    print(chunk["data"])  # {"node_name": {"key": "value"}}
from langgraph.graph import START, StateGraph
from typing import TypedDict

# 定义子图
class SubgraphState(TypedDict):
    foo: str  # 注意此键与父图状态共享
    bar: str

def subgraph_node_1(state: SubgraphState):
    return {"bar": "bar"}

def subgraph_node_2(state: SubgraphState):
    return {"foo": state["foo"] + state["bar"]}

subgraph_builder = StateGraph(SubgraphState)
subgraph_builder.add_node(subgraph_node_1)
subgraph_builder.add_node(subgraph_node_2)
subgraph_builder.add_edge(START, "subgraph_node_1")
subgraph_builder.add_edge("subgraph_node_1", "subgraph_node_2")
subgraph = subgraph_builder.compile()

# 定义父图
class ParentState(TypedDict):
    foo: str

def node_1(state: ParentState):
    return {"foo": "hi! " + state["foo"]}

builder = StateGraph(ParentState)
builder.add_node("node_1", node_1)
builder.add_node("node_2", subgraph)
builder.add_edge(START, "node_1")
builder.add_edge("node_1", "node_2")
graph = builder.compile()

for chunk in graph.stream(
    {"foo": "foo"},
    stream_mode="updates",
    # 设置 subgraphs=True 以从子图流式传输输出
    subgraphs=True,
    version="v2",
):
    if chunk["type"] == "updates":
        if chunk["ns"]:
            print(f"Subgraph {chunk['ns']}: {chunk['data']}")
        else:
            print(f"Root: {chunk['data']}")
Root: {'node_1': {'foo': 'hi! foo'}}
Subgraph ('node_2:dfddc4ba-c3c5-6887-5012-a243b5b377c2',): {'subgraph_node_1': {'bar': 'bar'}}
Subgraph ('node_2:dfddc4ba-c3c5-6887-5012-a243b5b377c2',): {'subgraph_node_2': {'foo': 'hi! foobar'}}
Root: {'node_2': {'foo': 'hi! foobar'}}
注意,我们不仅接收节点更新,还接收命名空间,这些命名空间告诉我们正在从哪个图(或子图)进行流式传输。

Checkpoints

使用 checkpoints 流模式在图执行时接收检查点事件。每个检查点事件具有与 get_state() 输出相同的格式。需要检查点保存器
from langgraph.checkpoint.memory import MemorySaver

graph = (
    StateGraph(State)
    .add_node(refine_topic)
    .add_node(generate_joke)
    .add_edge(START, "refine_topic")
    .add_edge("refine_topic", "generate_joke")
    .add_edge("generate_joke", END)
    .compile(checkpointer=MemorySaver())
)

config = {"configurable": {"thread_id": "1"}}

for chunk in graph.stream(
    {"topic": "ice cream"},
    config=config,
    stream_mode="checkpoints",
    version="v2",
):
    if chunk["type"] == "checkpoints":
        print(chunk["data"])

Tasks

使用 tasks 流模式在图执行时接收任务开始和完成事件。任务事件包括有关正在运行的节点、其结果以及任何错误的信息。需要检查点保存器
from langgraph.checkpoint.memory import MemorySaver

graph = (
    StateGraph(State)
    .add_node(refine_topic)
    .add_node(generate_joke)
    .add_edge(START, "refine_topic")
    .add_edge("refine_topic", "generate_joke")
    .add_edge("generate_joke", END)
    .compile(checkpointer=MemorySaver())
)

config = {"configurable": {"thread_id": "1"}}

for chunk in graph.stream(
    {"topic": "ice cream"},
    config=config,
    stream_mode="tasks",
    version="v2",
):
    if chunk["type"] == "tasks":
        print(chunk["data"])

Debug

使用 debug 流模式在图执行期间尽可能多地流式传输信息。流式输出包括节点名称以及完整状态。
for chunk in graph.stream(
    {"topic": "ice cream"},
    stream_mode="debug",
    version="v2",
):
    if chunk["type"] == "debug":
        print(chunk["data"])
debug 模式结合 checkpointstasks 事件以及额外元数据。如果您只需要调试信息的子集,请直接使用 checkpointstasks

Multiple modes at once

您可以将列表作为 stream_mode 参数传递,以同时流式传输多种模式。 使用 version="v2",每个块都是一个 StreamPart 字典。使用 chunk["type"] 来区分模式:
for chunk in graph.stream(inputs, stream_mode=["updates", "custom"], version="v2"):
    if chunk["type"] == "updates":
        for node_name, state in chunk["data"].items():
            print(f"Node `{node_name}` updated: {state}")
    elif chunk["type"] == "custom":
        print(f"Custom event: {chunk['data']}")

Advanced

Use with any LLM

您可以使用 stream_mode="custom"任何 LLM API流式传输数据——即使该 API 实现 LangChain 聊天模型接口。 这使您能够集成原始 LLM 客户端或提供自己的流式接口的外部服务,使 LangGraph 对于自定义设置非常灵活。
from langgraph.config import get_stream_writer

def call_arbitrary_model(state):
    """调用任意模型并流式传输输出的示例节点"""
    # 获取流编写器以发送自定义数据
    writer = get_stream_writer()
    # 假设您有一个产生块的流式客户端
    # 使用您的自定义流式客户端生成 LLM tokens
    for chunk in your_custom_streaming_client(state["topic"]):
        # 使用 writer 将自定义数据发送到流
        writer({"custom_llm_chunk": chunk})
    return {"result": "completed"}

graph = (
    StateGraph(State)
    .add_node(call_arbitrary_model)
    # 根据需要添加其他节点和边
    .compile()
)
# 设置 stream_mode="custom" 以在流中接收自定义数据
for chunk in graph.stream(
    {"topic": "cats"},
    stream_mode="custom",
    version="v2",
):
    if chunk["type"] == "custom":
        # 块数据将包含从 llm 流式传输的自定义数据
        print(chunk["data"])
import operator
import json

from typing import TypedDict
from typing_extensions import Annotated
from langgraph.graph import StateGraph, START

from openai import AsyncOpenAI

openai_client = AsyncOpenAI()
model_name = "gpt-4.1-mini"


async def stream_tokens(model_name: str, messages: list[dict]):
    response = await openai_client.chat.completions.create(
        messages=messages, model=model_name, stream=True
    )
    role = None
    async for chunk in response:
        delta = chunk.choices[0].delta

        if delta.role is not None:
            role = delta.role

        if delta.content:
            yield {"role": role, "content": delta.content}


# 这是我们的工具
async def get_items(place: str) -> str:
    """使用此工具列出在您被询问的地方可能找到的物品。"""
    writer = get_stream_writer()
    response = ""
    async for msg_chunk in stream_tokens(
        model_name,
        [
            {
                "role": "user",
                "content": (
                    "Can you tell me what kind of items "
                    f"i might find in the following place: '{place}'. "
                    "List at least 3 such items separating them by a comma. "
                    "And include a brief description of each item."
                ),
            }
        ],
    ):
        response += msg_chunk["content"]
        writer(msg_chunk)

    return response


class State(TypedDict):
    messages: Annotated[list[dict], operator.add]


# 这是工具调用图节点
async def call_tool(state: State):
    ai_message = state["messages"][-1]
    tool_call = ai_message["tool_calls"][-1]

    function_name = tool_call["function"]["name"]
    if function_name != "get_items":
        raise ValueError(f"Tool {function_name} not supported")

    function_arguments = tool_call["function"]["arguments"]
    arguments = json.loads(function_arguments)

    function_response = await get_items(**arguments)
    tool_message = {
        "tool_call_id": tool_call["id"],
        "role": "tool",
        "name": function_name,
        "content": function_response,
    }
    return {"messages": [tool_message]}


graph = (
    StateGraph(State)
    .add_node(call_tool)
    .add_edge(START, "call_tool")
    .compile()
)
让我们使用包含工具调用的 AIMessage 调用图:
inputs = {
    "messages": [
        {
            "content": None,
            "role": "assistant",
            "tool_calls": [
                {
                    "id": "1",
                    "function": {
                        "arguments": '{"place":"bedroom"}',
                        "name": "get_items",
                    },
                    "type": "function",
                }
            ],
        }
    ]
}

async for chunk in graph.astream(
    inputs,
    stream_mode="custom",
    version="v2",
):
    if chunk["type"] == "custom":
        print(chunk["data"]["content"], end="|", flush=True)

Disable streaming for specific chat models

如果您的应用程序混合使用支持流式传输的模型和不支持流式传输的模型,您可能需要为不支持流式传输的模型显式禁用流式传输。 在初始化模型时设置 streaming=False
from langchain.chat_models import init_chat_model

model = init_chat_model(
    "claude-sonnet-4-6",
    # 设置 streaming=False 以禁用聊天模型的流式传输
    streaming=False
)
并非所有聊天模型集成都支持 streaming 参数。如果您的模型不支持它,请使用 disable_streaming=True 代替。此参数通过基类在所有聊天模型上都可用。

Migrate to v2

v2 流式格式(本页通篇使用)提供统一的输出格式。以下是主要差异的摘要以及如何迁移:
场景v1 (default)v2 (version="v2")
单一流模式原始数据(字典)带有 typensdataStreamPart 字典
多个流模式(mode, data) 元组相同的 StreamPart 字典,按 chunk["type"] 过滤
子图流式传输(namespace, data) 元组相同的 StreamPart 字典,检查 chunk["ns"]
多个模式 + 子图(namespace, mode, data) 三元组相同的 StreamPart 字典
invoke() 返回类型普通字典(状态)带有 .value.interruptsGraphOutput
中断位置(流)状态字典中的 __interrupt__values 流部分上的 interrupts 字段
中断位置(invoke)结果字典中的 __interrupt__GraphOutput 上的 .interrupts 属性
Pydantic/dataclass 输出返回普通字典强制转换为模型/dataclass 实例

v2 invoke format

当您将 version="v2" 传递给 invoke()ainvoke() 时,它返回一个带有 .value.interrupts 属性的 GraphOutput 对象:
from langgraph.types import GraphOutput

result = graph.invoke(inputs, version="v2")

assert isinstance(result, GraphOutput)
result.value       # 您的输出 — 字典、Pydantic 模型或 dataclass
result.interrupts  # tuple[Interrupt, ...],如果没有发生则为空
对于默认 "values" 以外的任何流模式,invoke(..., stream_mode="updates", version="v2") 返回 list[StreamPart] 而不是 list[tuple]
GraphOutput 上的字典样式访问(result["key"]"key" in resultresult["__interrupt__"])仍然为了向后兼容而工作,但已弃用,将在未来版本中删除。迁移到 result.valueresult.interrupts
这将状态与中断元数据分开。使用 v1 时,中断嵌入在返回字典的 __interrupt__ 下:
config = {"configurable": {"thread_id": "thread-1"}}
result = graph.invoke(inputs, config=config, version="v2")

if result.interrupts:
    print(result.interrupts[0].value)
    graph.invoke(Command(resume=True), config=config, version="v2")

Pydantic and dataclass state coercion

当您的图状态是 Pydantic 模型或 dataclass 时,v2 values 模式会自动将输出强制转换为正确的类型:
from pydantic import BaseModel
from typing import Annotated
import operator

class MyState(BaseModel):
    value: str
    items: Annotated[list[str], operator.add]

# 使用 version="v2",chunk["data"] 是一个 MyState 实例
for chunk in graph.stream(
    {"value": "x", "items": []}, stream_mode="values", version="v2"
):
    print(type(chunk["data"]))  # <class 'MyState'>

Async with Python < 3.11

在 Python 版本 < 3.11 中,asyncio tasks 不支持 context 参数。 这限制了 LangGraph 自动传播上下文的能力,并在两个关键方面影响 LangGraph 的流式传输机制:
  1. 必须RunnableConfig 显式传递到异步 LLM 调用中(例如 ainvoke()),因为回调不会自动传播。
  2. 不能在异步节点或工具中使用 get_stream_writer——您必须直接传递 writer 参数。
from typing import TypedDict
from langgraph.graph import START, StateGraph
from langchain.chat_models import init_chat_model

model = init_chat_model(model="gpt-4.1-mini")

class State(TypedDict):
    topic: str
    joke: str

# 在异步节点函数中接受 config 作为参数
async def call_model(state, config):
    topic = state["topic"]
    print("Generating joke...")
    # 将 config 传递给 model.ainvoke() 以确保正确的上下文传播
    joke_response = await model.ainvoke(
        [{"role": "user", "content": f"Write a joke about {topic}"}],
        config,
    )
    return {"joke": joke_response.content}

graph = (
    StateGraph(State)
    .add_node(call_model)
    .add_edge(START, "call_model")
    .compile()
)

# 设置 stream_mode="messages" 以流式传输 LLM tokens
async for chunk in graph.astream(
    {"topic": "ice cream"},
    stream_mode="messages",
    version="v2",
):
    if chunk["type"] == "messages":
        message_chunk, metadata = chunk["data"]
        if message_chunk.content:
            print(message_chunk.content, end="|", flush=True)
from typing import TypedDict
from langgraph.types import StreamWriter

class State(TypedDict):
      topic: str
      joke: str

# 在异步节点或工具的函数签名中添加 writer 作为参数
# LangGraph 将自动将流编写器传递给函数
async def generate_joke(state: State, writer: StreamWriter):
      writer({"custom_key": "Streaming custom data while generating a joke"})
      return {"joke": f"This is a joke about {state['topic']}"}

graph = (
      StateGraph(State)
      .add_node(generate_joke)
      .add_edge(START, "generate_joke")
      .compile()
)

# 设置 stream_mode="custom" 以在流中接收自定义数据  #
async for chunk in graph.astream(
      {"topic": "ice cream"},
      stream_mode="custom",
      version="v2",
):
      if chunk["type"] == "custom":
          print(chunk["data"])