本指南回顾了常见的工作流和 Agent 模式。
  • 工作流具有预定的代码路径,旨在按特定顺序运行。
  • Agent 是动态的,定义自己的流程和工具使用。
Agent 工作流 LangGraph 在构建 Agent 和工作流时提供了多项优势,包括持久化流式传输,以及调试和部署支持。

设置

要构建工作流或 Agent,您可以使用支持结构化输出和工具调用的任何聊天模型。以下示例使用 Anthropic:
  1. 安装依赖项:
pip install langchain_core langchain-anthropic langgraph
  1. 初始化 LLM:
import os
import getpass

from langchain_anthropic import ChatAnthropic

def _set_env(var: str):
    if not os.environ.get(var):
        os.environ[var] = getpass.getpass(f"{var}:")


_set_env("ANTHROPIC_API_KEY")

llm = ChatAnthropic(model="claude-sonnet-4-6")

LLM 和增强

工作流和 Agent 系统基于 LLM 以及您添加的各种增强。工具调用结构化输出短期记忆是定制 LLM 以满足您需求的几个选项。 LLM 增强
# 结构化输出的模式
from pydantic import BaseModel, Field


class SearchQuery(BaseModel):
    search_query: str = Field(None, description="针对网络搜索优化的查询。")
    justification: str = Field(
        None, description="为什么此查询与用户请求相关。"
    )


# 使用结构化输出模式增强 LLM
structured_llm = llm.with_structured_output(SearchQuery)

# 调用增强后的 LLM
output = structured_llm.invoke("钙 CT 评分与高胆固醇有什么关系?")

# 定义一个工具
def multiply(a: int, b: int) -> int:
    return a * b

# 使用工具增强 LLM
llm_with_tools = llm.bind_tools([multiply])

# 使用触发工具调用的输入调用 LLM
msg = llm_with_tools.invoke("2 乘以 3 是多少?")

# 获取工具调用
msg.tool_calls

提示链

提示链是每个 LLM 调用处理前一个调用输出的时候。它通常用于执行可以分解为更小、可验证步骤的定义良好的任务。一些示例包括:
  • 将文档翻译成不同语言
  • 验证生成内容的一致性
提示链
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
from IPython.display import Image, display


# 图状态
class State(TypedDict):
    topic: str
    joke: str
    improved_joke: str
    final_joke: str


# 节点
def generate_joke(state: State):
    """第一个 LLM 调用生成初始笑话"""

    msg = llm.invoke(f"写一个关于 {state['topic']} 的简短笑话")
    return {"joke": msg.content}


def check_punchline(state: State):
    """门函数检查笑话是否有包袱"""

    # 简单检查 — 笑话是否包含 "?" 或 "!"
    if "?" in state["joke"] or "!" in state["joke"]:
        return "Pass"
    return "Fail"


def improve_joke(state: State):
    """第二个 LLM 调用来改进笑话"""

    msg = llm.invoke(f"通过添加双关语使这个笑话更有趣:{state['joke']}")
    return {"improved_joke": msg.content}


def polish_joke(state: State):
    """第三个 LLM 调用进行最终润色"""
    msg = llm.invoke(f"给这个笑话添加一个出人意料的转折:{state['improved_joke']}")
    return {"final_joke": msg.content}


# 构建工作流
workflow = StateGraph(State)

# 添加节点
workflow.add_node("generate_joke", generate_joke)
workflow.add_node("improve_joke", improve_joke)
workflow.add_node("polish_joke", polish_joke)

# 添加边连接节点
workflow.add_edge(START, "generate_joke")
workflow.add_conditional_edges(
    "generate_joke", check_punchline, {"Fail": "improve_joke", "Pass": END}
)
workflow.add_edge("improve_joke", "polish_joke")
workflow.add_edge("polish_joke", END)

# 编译
chain = workflow.compile()

# 显示工作流
display(Image(chain.get_graph().draw_mermaid_png()))

# 调用
state = chain.invoke({"topic": "猫"})
print("初始笑话:")
print(state["joke"])
print("\n--- --- ---\n")
if "improved_joke" in state:
    print("改进的笑话:")
    print(state["improved_joke"])
    print("\n--- --- ---\n")

    print("最终笑话:")
    print(state["final_joke"])
else:
    print("最终笑话:")
    print(state["joke"])

并行化

通过并行化,LLM 同时处理一个任务。这可以通过同时运行多个独立的子任务,或多次运行同一任务以检查不同的输出来实现。并行化通常用于:
  • 拆分子任务并并行运行,从而提高速度
  • 多次运行任务以检查不同的输出,从而提高置信度
一些示例包括:
  • 运行一个处理文档关键字的子任务,以及第二个检查格式错误的子任务
  • 多次运行一个根据不同标准对文档准确性进行评分的任务,例如引用数量、使用的来源数量以及来源质量
parallelization.png
# 图状态
class State(TypedDict):
    topic: str
    joke: str
    story: str
    poem: str
    combined_output: str


# 节点
def call_llm_1(state: State):
    """第一个 LLM 调用生成初始笑话"""

    msg = llm.invoke(f"写一个关于 {state['topic']} 的笑话")
    return {"joke": msg.content}


def call_llm_2(state: State):
    """第二个 LLM 调用生成故事"""

    msg = llm.invoke(f"写一个关于 {state['topic']} 的故事")
    return {"story": msg.content}


def call_llm_3(state: State):
    """第三个 LLM 调用生成诗歌"""

    msg = llm.invoke(f"写一首关于 {state['topic']} 的诗")
    return {"poem": msg.content}


def aggregator(state: State):
    """将笑话、故事和诗歌组合成单个输出"""

    combined = f"这是一个关于 {state['topic']} 的故事、笑话和诗歌!\n\n"
    combined += f"故事:\n{state['story']}\n\n"
    combined += f"笑话:\n{state['joke']}\n\n"
    combined += f"诗歌:\n{state['poem']}"
    return {"combined_output": combined}


# 构建工作流
parallel_builder = StateGraph(State)

# 添加节点
parallel_builder.add_node("call_llm_1", call_llm_1)
parallel_builder.add_node("call_llm_2", call_llm_2)
parallel_builder.add_node("call_llm_3", call_llm_3)
parallel_builder.add_node("aggregator", aggregator)

# 添加边连接节点
parallel_builder.add_edge(START, "call_llm_1")
parallel_builder.add_edge(START, "call_llm_2")
parallel_builder.add_edge(START, "call_llm_3")
parallel_builder.add_edge("call_llm_1", "aggregator")
parallel_builder.add_edge("call_llm_2", "aggregator")
parallel_builder.add_edge("call_llm_3", "aggregator")
parallel_builder.add_edge("aggregator", END)
parallel_workflow = parallel_builder.compile()

# 显示工作流
display(Image(parallel_workflow.get_graph().draw_mermaid_png()))

# 调用
state = parallel_workflow.invoke({"topic": "猫"})
print(state["combined_output"])

路由

路由工作流处理输入,然后将它们定向到上下文相关的任务。这允许您为复杂任务定义专门的流程。例如,一个用于回答产品相关问题的工作流可能首先处理问题类型,然后将请求路由到定价、退款、退货等特定流程。 routing.png
from typing_extensions import Literal
from langchain.messages import HumanMessage, SystemMessage


# 用于路由逻辑的结构化输出模式
class Route(BaseModel):
    step: Literal["poem", "story", "joke"] = Field(
        None, description="路由过程中的下一步"
    )


# 使用结构化输出模式增强 LLM
router = llm.with_structured_output(Route)


# 状态
class State(TypedDict):
    input: str
    decision: str
    output: str


# 节点
def llm_call_1(state: State):
    """写一个故事"""

    result = llm.invoke(state["input"])
    return {"output": result.content}


def llm_call_2(state: State):
    """写一个笑话"""

    result = llm.invoke(state["input"])
    return {"output": result.content}


def llm_call_3(state: State):
    """写一首诗"""

    result = llm.invoke(state["input"])
    return {"output": result.content}


def llm_call_router(state: State):
    """将输入路由到适当的节点"""

    # 运行增强后的 LLM,使用结构化输出作为路由逻辑
    decision = router.invoke(
        [
            SystemMessage(
                content="根据用户请求将输入路由到故事、笑话或诗歌。"
            ),
            HumanMessage(content=state["input"]),
        ]
    )

    return {"decision": decision.step}


# 条件边函数,路由到适当的节点
def route_decision(state: State):
    # 返回您想访问的下一个节点的名称
    if state["decision"] == "story":
        return "llm_call_1"
    elif state["decision"] == "joke":
        return "llm_call_2"
    elif state["decision"] == "poem":
        return "llm_call_3"


# 构建工作流
router_builder = StateGraph(State)

# 添加节点
router_builder.add_node("llm_call_1", llm_call_1)
router_builder.add_node("llm_call_2", llm_call_2)
router_builder.add_node("llm_call_3", llm_call_3)
router_builder.add_node("llm_call_router", llm_call_router)

# 添加边连接节点
router_builder.add_edge(START, "llm_call_router")
router_builder.add_conditional_edges(
    "llm_call_router",
    route_decision,
    {  # route_decision 返回的名称 : 要访问的下一个节点的名称
        "llm_call_1": "llm_call_1",
        "llm_call_2": "llm_call_2",
        "llm_call_3": "llm_call_3",
    },
)
router_builder.add_edge("llm_call_1", END)
router_builder.add_edge("llm_call_2", END)
router_builder.add_edge("llm_call_3", END)

# 编译工作流
router_workflow = router_builder.compile()

# 显示工作流
display(Image(router_workflow.get_graph().draw_mermaid_png()))

# 调用
state = router_workflow.invoke({"input": "写一个关于猫的笑话"})
print(state["output"])

协调器-工作器

在协调器-工作器配置中,协调器:
  • 将任务分解为子任务
  • 将子任务委托给工作器
  • 将工作器输出合成为最终结果
worker.png 协调器-工作器工作流提供更多灵活性,通常在子任务无法像并行化那样预定义时使用。这在编写代码或需要在多个文件中更新内容的工作流中很常见。例如,一个需要在未知数量的文档中更新多个 Python 库的安装说明的工作流可能会使用此模式。
from typing import Annotated, List
import operator


# 用于规划的结构化输出模式
class Section(BaseModel):
    name: str = Field(
        description="报告此部分的名称。",
    )
    description: str = Field(
        description="此部分涵盖的主要主题和概念的简要概述。",
    )


class Sections(BaseModel):
    sections: List[Section] = Field(
        description="报告的各个部分。",
    )


# 使用结构化输出模式增强 LLM
planner = llm.with_structured_output(Sections)

在 LangGraph 中创建工作器

协调器-工作器工作流很常见,LangGraph 对它们有内置支持。Send API 允许您动态创建工作器节点并向它们发送特定的输入。每个工作器都有自己的状态,所有工作器输出都写入到协调器图可以访问的共享状态键。这使协调器能够访问所有工作器输出,并允许它们将输出合成为最终输出。下面的示例迭代部分列表,并使用 Send API 将部分发送到每个工作器。
from langgraph.types import Send


# 图状态
class State(TypedDict):
    topic: str  # 报告主题
    sections: list[Section]  # 报告部分列表
    completed_sections: Annotated[
        list, operator.add
    ]  # 所有工作器并行写入此键
    final_report: str  # 最终报告


# 工作器状态
class WorkerState(TypedDict):
    section: Section
    completed_sections: Annotated[list, operator.add]


# 节点
def orchestrator(state: State):
    """生成报告计划的协调器"""

    # 生成查询
    report_sections = planner.invoke(
        [
            SystemMessage(content="生成报告计划。"),
            HumanMessage(content=f"这是报告主题:{state['topic']}"),
        ]
    )

    return {"sections": report_sections.sections}


def llm_call(state: WorkerState):
    """编写报告一部分的工作器"""

    # 生成部分
    section = llm.invoke(
        [
            SystemMessage(
                content="按照提供的名称和描述编写报告部分。每个部分不包含前言。使用 markdown 格式。"
            ),
            HumanMessage(
                content=f"这是部分名称:{state['section'].name} 和描述:{state['section'].description}"
            ),
        ]
    )

    # 将更新的部分写入已完成的部分
    return {"completed_sections": [section.content]}


def synthesizer(state: State):
    """从部分合成完整报告"""

    # 已完成部分列表
    completed_sections = state["completed_sections"]

    # 将已完成的部分格式化为字符串,用作最终部分的上下文
    completed_report_sections = "\n\n---\n\n".join(completed_sections)

    return {"final_report": completed_report_sections}


# 创建 llm_call 工作器的条件边函数,每个工作器编写报告的一部分
def assign_workers(state: State):
    """为计划中的每个部分分配一个工作器"""

    # 通过 Send() API 并行启动部分编写
    return [Send("llm_call", {"section": s}) for s in state["sections"]]


# 构建工作流
orchestrator_worker_builder = StateGraph(State)

# 添加节点
orchestrator_worker_builder.add_node("orchestrator", orchestrator)
orchestrator_worker_builder.add_node("llm_call", llm_call)
orchestrator_worker_builder.add_node("synthesizer", synthesizer)

# 添加边连接节点
orchestrator_worker_builder.add_edge(START, "orchestrator")
orchestrator_worker_builder.add_conditional_edges(
    "orchestrator", assign_workers, ["llm_call"]
)
orchestrator_worker_builder.add_edge("llm_call", "synthesizer")
orchestrator_worker_builder.add_edge("synthesizer", END)

# 编译工作流
orchestrator_worker = orchestrator_worker_builder.compile()

# 显示工作流
display(Image(orchestrator_worker.get_graph().draw_mermaid_png()))

# 调用
state = orchestrator_worker.invoke({"topic": "创建关于 LLM 缩放定律的报告"})

from IPython.display import Markdown
Markdown(state["final_report"])

评估器-优化器

在评估器-优化器工作流中,一个 LLM 调用创建响应,另一个评估该响应。如果评估器或人工干预确定响应需要改进,则提供反馈并重新创建响应。此循环继续,直到生成可接受的响应。 评估器-优化器工作流通常在任务有特定的成功标准,但需要迭代才能满足该标准时使用。例如,在两种语言之间翻译文本时,并不总是有完美的匹配。可能需要几次迭代才能在两种语言中生成具有相同含义的翻译。 evaluator_optimizer.png
# 图状态
class State(TypedDict):
    joke: str
    topic: str
    feedback: str
    funny_or_not: str


# 用于评估的结构化输出模式
class Feedback(BaseModel):
    grade: Literal["funny", "not funny"] = Field(
        description="判断笑话是否有趣。",
    )
    feedback: str = Field(
        description="如果笑话不有趣,提供如何改进的反馈。",
    )


# 使用结构化输出模式增强 LLM
evaluator = llm.with_structured_output(Feedback)


# 节点
def llm_call_generator(state: State):
    """LLM 生成笑话"""

    if state.get("feedback"):
        msg = llm.invoke(
            f"写一个关于 {state['topic']} 的笑话,但要考虑反馈:{state['feedback']}"
        )
    else:
        msg = llm.invoke(f"写一个关于 {state['topic']} 的笑话")
    return {"joke": msg.content}


def llm_call_evaluator(state: State):
    """LLM 评估笑话"""

    grade = evaluator.invoke(f"为笑话 {state['joke']} 打分")
    return {"funny_or_not": grade.grade, "feedback": grade.feedback}


# 根据评估器的反馈路由回笑话生成器或结束的条件边函数
def route_joke(state: State):
    """根据评估器的反馈路由回笑话生成器或结束"""

    if state["funny_or_not"] == "funny":
        return "Accepted"
    elif state["funny_or_not"] == "not funny":
        return "Rejected + Feedback"


# 构建工作流
optimizer_builder = StateGraph(State)

# 添加节点
optimizer_builder.add_node("llm_call_generator", llm_call_generator)
optimizer_builder.add_node("llm_call_evaluator", llm_call_evaluator)

# 添加边连接节点
optimizer_builder.add_edge(START, "llm_call_generator")
optimizer_builder.add_edge("llm_call_generator", "llm_call_evaluator")
optimizer_builder.add_conditional_edges(
    "llm_call_evaluator",
    route_joke,
    {  # route_joke 返回的名称 : 要访问的下一个节点的名称
        "Accepted": END,
        "Rejected + Feedback": "llm_call_generator",
    },
)

# 编译工作流
optimizer_workflow = optimizer_builder.compile()

# 显示工作流
display(Image(optimizer_workflow.get_graph().draw_mermaid_png()))

# 调用
state = optimizer_workflow.invoke({"topic": "猫"})
print(state["joke"])

Agent

Agent 通常实现为使用工具执行操作的 LLM。它们在连续的反馈循环中运行,用于问题和解决方案不可预测的情况。Agent 比工作流具有更多的自主权,可以决定它们使用的工具以及如何解决问题。您仍然可以为 Agent 定义可用的工具集和行为准则。 agent.png
要开始使用 Agent,请参阅快速入门或在 LangChain 中阅读更多关于它们如何工作的内容。
使用工具
from langchain.tools import tool


# 定义工具
@tool
def multiply(a: int, b: int) -> int:
    """将 `a` 和 `b` 相乘。

    Args:
        a: 第一个整数
        b: 第二个整数
    """
    return a * b


@tool
def add(a: int, b: int) -> int:
    """将 `a` 和 `b` 相加。

    Args:
        a: 第一个整数
        b: 第二个整数
    """
    return a + b


@tool
def divide(a: int, b: int) -> float:
    """将 `a` 和 `b` 相除。

    Args:
        a: 第一个整数
        b: 第二个整数
    """
    return a / b


# 使用工具增强 LLM
tools = [add, multiply, divide]
tools_by_name = {tool.name: tool for tool in tools}
llm_with_tools = llm.bind_tools(tools)
from langgraph.graph import MessagesState
from langchain.messages import SystemMessage, HumanMessage, ToolMessage


# 节点
def llm_call(state: MessagesState):
    """LLM 决定是否调用工具"""

    return {
        "messages": [
            llm_with_tools.invoke(
                [
                    SystemMessage(
                        content="您是一个有用的助手,负责对一组输入执行算术运算。"
                    )
                ]
                + state["messages"]
            )
        ]
    }


def tool_node(state: dict):
    """执行工具调用"""

    result = []
    for tool_call in state["messages"][-1].tool_calls:
        tool = tools_by_name[tool_call["name"]]
        observation = tool.invoke(tool_call["args"])
        result.append(ToolMessage(content=observation, tool_call_id=tool_call["id"]))
    return {"messages": result}


# 根据是否进行工具调用路由到工具节点或结束的条件边函数
def should_continue(state: MessagesState) -> Literal["tool_node", END]:
    """根据是否进行工具调用决定是否继续循环或停止"""

    messages = state["messages"]
    last_message = messages[-1]

    # 如果 LLM 进行工具调用,则执行操作
    if last_message.tool_calls:
        return "tool_node"

    # 否则,我们停止(回复用户)
    return END


# 构建工作流
agent_builder = StateGraph(MessagesState)

# 添加节点
agent_builder.add_node("llm_call", llm_call)
agent_builder.add_node("tool_node", tool_node)

# 添加边连接节点
agent_builder.add_edge(START, "llm_call")
agent_builder.add_conditional_edges(
    "llm_call",
    should_continue,
    ["tool_node", END]
)
agent_builder.add_edge("tool_node", "llm_call")

# 编译 Agent
agent = agent_builder.compile()

# 显示 Agent
display(Image(agent.get_graph(xray=True).draw_mermaid_png()))

# 调用
messages = [HumanMessage(content="将 3 和 4 相加。")]
messages = agent.invoke({"messages": messages})
for m in messages["messages"]:
    m.pretty_print()