Deep Agents 构建在 LangGraph 的流式处理基础设施之上,具有对子代理流的一流支持。当深层代理将工作委托给子代理时,您可以独立流式传输每个子代理的更新——实时跟踪进度、LLM 令牌和工具调用。 深层代理流式处理可以实现的功能:

启用子图流式处理

Deep Agents 使用 LangGraph 的子图流式处理来呈现子代理执行中的事件。要接收子代理事件,请在流式传输时启用 stream_subgraphs
from deepagents import create_deep_agent

agent = create_deep_agent(
    system_prompt="您是一个乐于助人的研究助手",
    subagents=[
        {
            "name": "researcher",
            "description": "深入研究一个主题",
            "system_prompt": "您是一位细致的研究员。",
        },
    ],
)

for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "研究量子计算进展"}]},
    stream_mode="updates",
    subgraphs=True,
    version="v2",
):
    if chunk["type"] == "updates":
        if chunk["ns"]:
            # 子代理事件 - 命名空间标识来源
            print(f"[子代理: {chunk['ns']}]")
        else:
            # 主代理事件
            print("[主代理]")
        print(chunk["data"])

命名空间

当启用 subgraphs 时,每个流式事件都包含一个命名空间,标识产生它的代理。命名空间是表示代理层次结构的节点名称和任务 ID 的路径。
命名空间来源
() (空)主代理
("tools:abc123",)由主代理的 task 工具调用 abc123 生成的子代理
("tools:abc123", "model_request:def456")子代理内的模型请求节点
使用命名空间将事件路由到正确的 UI 组件:
for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "计划我的假期"}]},
    stream_mode="updates",
    subgraphs=True,
    version="v2",
):
    if chunk["type"] == "updates":
        # 检查此事件是否来自子代理
        is_subagent = any(
            segment.startswith("tools:") for segment in chunk["ns"]
        )

        if is_subagent:
            # 从命名空间提取工具调用 ID
            tool_call_id = next(
                s.split(":")[1] for s in chunk["ns"] if s.startswith("tools:")
            )
            print(f"子代理 {tool_call_id}: {chunk['data']}")
        else:
            print(f"主代理: {chunk['data']}")

子代理进度

使用 stream_mode="updates" 跟踪每个子代理在每个步骤完成时的进度。这对显示哪些子代理处于活动状态以及它们完成了什么工作很有用。
from deepagents import create_deep_agent

agent = create_deep_agent(
    system_prompt=(
        "您是一个项目协调员。始终使用 task 工具将研究任务委托给您的研究员子代理。"
        "将您的最终回复保持在一句话内。"
    ),
    subagents=[
        {
            "name": "researcher",
            "description": "彻底研究主题",
            "system_prompt": (
                "您是一位细致的研究员。研究给定主题,"
                "并用 2-3 句话提供简洁摘要。"
            ),
        },
    ],
)

for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "写一个关于 AI 安全的简短摘要"}]},
    stream_mode="updates",
    subgraphs=True,
    version="v2",
):
    if chunk["type"] == "updates":
        # 主代理更新(空命名空间)
        if not chunk["ns"]:
            for node_name, data in chunk["data"].items():
                if node_name == "tools":
                    # 返回给主代理的子代理结果
                    for msg in data.get("messages", []):
                        if msg.type == "tool":
                            print(f"\n子代理完成: {msg.name}")
                            print(f"  结果: {str(msg.content)[:200]}...")
                else:
                    print(f"[主代理] 步骤: {node_name}")

        # 子代理更新(非空命名空间)
        else:
            for node_name, data in chunk["data"].items():
                print(f"  [{chunk['ns'][0]}] 步骤: {node_name}")
[主代理] 步骤: model_request
  [tools:call_abc123] 步骤: model_request
  [tools:call_abc123] 步骤: tools
  [tools:call_abc123] 步骤: model_request

子代理完成: task
  结果: ## AI 安全报告...
[主代理] 步骤: model_request

LLM 令牌

使用 stream_mode="messages" 从主代理和子代理流式传输单个令牌。每个消息事件都包含标识来源代理的元数据。
current_source = ""

for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "研究量子计算进展"}]},
    stream_mode="messages",
    subgraphs=True,
    version="v2",
):
    if chunk["type"] == "messages":
        token, metadata = chunk["data"]

        # 检查此事件是否来自子代理(命名空间包含 "tools:")
        is_subagent = any(s.startswith("tools:") for s in chunk["ns"])

        if is_subagent:
            # 来自子代理的令牌
            subagent_ns = next(s for s in chunk["ns"] if s.startswith("tools:"))
            if subagent_ns != current_source:
                print(f"\n\n--- [子代理: {subagent_ns}] ---")
                current_source = subagent_ns
            if token.content:
                print(token.content, end="", flush=True)
        else:
            # 来自主代理的令牌
            if "main" != current_source:
                print("\n\n--- [主代理] ---")
                current_source = "main"
            if token.content:
                print(token.content, end="", flush=True)

print()

工具调用

当子代理使用工具时,您可以流式传输工具调用事件以显示每个子代理正在做什么。工具调用块出现在 messages 流模式中。
for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "研究最近的量子计算进展"}]},
    stream_mode="messages",
    subgraphs=True,
    version="v2",
):
    if chunk["type"] == "messages":
        token, metadata = chunk["data"]

        # 识别来源:"main" 或子代理命名空间段
        is_subagent = any(s.startswith("tools:") for s in chunk["ns"])
        source = next((s for s in chunk["ns"] if s.startswith("tools:")), "main") if is_subagent else "main"

        # 工具调用块(流式工具调用)
        if token.tool_call_chunks:
            for tc in token.tool_call_chunks:
                if tc.get("name"):
                    print(f"\n[{source}] 工具调用: {tc['name']}")
                # 参数以块形式流式传输 - 逐步写入它们
                if tc.get("args"):
                    print(tc["args"], end="", flush=True)

        # 工具结果
        if token.type == "tool":
            print(f"\n[{source}] 工具结果 [{token.name}]: {str(token.content)[:150]}")

        # 常规 AI 内容(跳过工具调用消息)
        if token.type == "ai" and token.content and not token.tool_call_chunks:
            print(token.content, end="", flush=True)

print()

自定义更新

在子代理工具内部使用 get_stream_writer 发出自定义进度事件:
import time
from langchain.tools import tool
from langgraph.config import get_stream_writer
from deepagents import create_deep_agent


@tool
def analyze_data(topic: str) -> str:
    """对给定主题运行数据分析。

    此工具执行实际分析并发出进度更新。
    您必须为此分析请求调用此工具。
    """
    writer = get_stream_writer()

    writer({"status": "starting", "topic": topic, "progress": 0})
    time.sleep(0.5)

    writer({"status": "analyzing", "progress": 50})
    time.sleep(0.5)

    writer({"status": "complete", "progress": 100})
    return (
        f'主题 "{topic}" 的分析:客户情绪正面占 85%,'
        "由产品质量和支持响应时间驱动。"
    )


agent = create_deep_agent(
    system_prompt=(
        "您是一个协调员。对于任何分析请求,您必须使用 task 工具委托给分析师子代理。"
        "永远不要直接回答。收到结果后,用一句话总结。"
    ),
    subagents=[
        {
            "name": "analyst",
            "description": "使用实时进度跟踪执行数据分析",
            "system_prompt": (
                "您是一个数据分析师。您必须为每个分析请求调用 analyze_data 工具。"
                "不要使用任何其他工具。分析完成后,报告结果。"
            ),
            "tools": [analyze_data],
        },
    ],
)

for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "分析客户满意度趋势"}]},
    stream_mode="custom",
    subgraphs=True,
    version="v2",
):
    if chunk["type"] == "custom":
        is_subagent = any(s.startswith("tools:") for s in chunk["ns"])
        if is_subagent:
            subagent_ns = next(s for s in chunk["ns"] if s.startswith("tools:"))
            print(f"[{subagent_ns}]", chunk["data"])
        else:
            print("[main]", chunk["data"])
[tools:call_abc123] {'status': 'starting', 'topic': '客户满意度趋势', 'progress': 0}
[tools:call_abc123] {'status': 'analyzing', 'progress': 50}
[tools:call_abc123] {'status': 'complete', 'progress': 100}

流式传输多种模式

结合多种流模式以获取代理执行的完整图景:
# 跳过内部中间件步骤 - 只显示有意义的节点名称
INTERESTING_NODES = {"model_request", "tools"}

last_source = ""
mid_line = False  # 当我们已经写入令牌而没有尾随换行时为 True

for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "分析远程工作对团队生产力的影响"}]},
    stream_mode=["updates", "messages", "custom"],
    subgraphs=True,
    version="v2",
):
    is_subagent = any(s.startswith("tools:") for s in chunk["ns"])
    source = "子代理" if is_subagent else "主代理"

    if chunk["type"] == "updates":
        for node_name in chunk["data"]:
            if node_name not in INTERESTING_NODES:
                continue
            if mid_line:
                print()
                mid_line = False
            print(f"[{source}] 步骤: {node_name}")

    elif chunk["type"] == "messages":
        token, metadata = chunk["data"]
        if token.content:
            # 来源改变时打印标题
            if source != last_source:
                if mid_line:
                    print()
                    mid_line = False
                print(f"\n[{source}] ", end="")
                last_source = source
            print(token.content, end="", flush=True)
            mid_line = True

    elif chunk["type"] == "custom":
        if mid_line:
            print()
            mid_line = False
        print(f"[{source}] 自定义事件:", chunk["data"])

print()

常见模式

跟踪子代理生命周期

监控子代理何时开始、运行和完成:
active_subagents = {}

for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "研究最新的 AI 安全发展"}]},
    stream_mode="updates",
    subgraphs=True,
    version="v2",
):
    if chunk["type"] == "updates":
        for node_name, data in chunk["data"].items():
            # ─── 阶段 1:检测子代理启动 ────────────────────────
            # 当主代理的 model_request 包含 task 工具调用时,
            # 子代理已被生成。
            if not chunk["ns"] and node_name == "model_request":
                for msg in data.get("messages", []):
                    for tc in getattr(msg, "tool_calls", []):
                        if tc["name"] == "task":
                            active_subagents[tc["id"]] = {
                                "type": tc["args"].get("subagent_type"),
                                "description": tc["args"].get("description", "")[:80],
                                "status": "pending",
                            }
                            print(
                                f'[生命周期] 等待中  → 子代理 "{tc["args"].get("subagent_type")}" '
                                f'({tc["id"]})'
                            )

            # ─── 阶段 2:检测子代理运行 ─────────────────────────
            # 当我们从 tools:UUID 命名空间收到事件时,
            # 那个子代理正在积极执行。
            if chunk["ns"] and chunk["ns"][0].startswith("tools:"):
                pregel_id = chunk["ns"][0].split(":")[1]
                # 检查是否有任何等待中的子代理需要标记为运行中。
                # 注意:pregel 任务 ID 与 tool_call_id 不同,
                # 所以我们在第一个子代理事件时将任何等待中的子代理标记为运行中。
                for sub_id, sub in active_subagents.items():
                    if sub["status"] == "pending":
                        sub["status"] = "running"
                        print(
                            f'[生命周期] 运行中  → 子代理 "{sub["type"]}" '
                            f"(pregel: {pregel_id})"
                        )
                        break

            # ─── 阶段 3:检测子代理完成 ──────────────────────
            # 当主代理的 tools 节点返回工具消息时,
            # 子代理已完成并返回其结果。
            if not chunk["ns"] and node_name == "tools":
                for msg in data.get("messages", []):
                    if msg.type == "tool":
                        sub = active_subagents.get(msg.tool_call_id)
                        if sub:
                            sub["status"] = "complete"
                            print(
                                f'[生命周期] 已完成 → 子代理 "{sub["type"]}" '
                                f"({msg.tool_call_id})"
                            )
                            print(f"  结果预览: {str(msg.content)[:120]}...")

# 打印最终状态
print("\n--- 最终子代理状态 ---")
for sub_id, sub in active_subagents.items():
    print(f"  {sub['type']}: {sub['status']}")

v2 流式处理格式

需要 LangGraph >= 1.1。
本页上的所有示例都使用 v2 流式处理格式(version="v2"),这是推荐的方法。每个块都是一个带有 typensdata 键的 StreamPart 字典——无论流模式、流模式数量或子图设置如何,形状都相同。 v2 格式消除了嵌套元组解包,使在 Deep Agents 中处理子图流式处理变得简单。比较两种格式:
# 统一格式 - 无嵌套元组解包
for chunk in agent.stream(
    {"messages": [{"role": "user", "content": "研究量子计算"}]},
    stream_mode=["updates", "messages", "custom"],
    subgraphs=True,
    version="v2",
):
    print(chunk["type"])  # "updates"、"messages" 或 "custom"
    print(chunk["ns"])    # () 表示主代理,("tools:<id>",) 表示子代理
    print(chunk["data"])  # 有效载荷
请参阅 LangGraph 流式处理文档了解更多关于 v2 格式的详细信息,包括类型收窄和 Pydantic/dataclass 强制转换。

相关