通过实现代理执行流程中特定点的钩子来构建自定义中间件。

钩子

中间件提供两种样式的钩子来拦截代理执行:

节点风格钩子

在特定执行点按顺序运行。

包装风格钩子

在每个模型或工具调用周围运行。

节点风格钩子

在特定执行点按顺序运行。用于日志记录、验证和状态更新。 选择您的中间件需要的钩子。您可以在节点风格钩子和包装风格钩子之间进行选择。 节点风格钩子在特定执行点运行:
钩子运行时机
before_agent代理开始之前(每次调用一次)
before_model每次模型调用之前
after_model每次模型响应之后
after_agent代理完成之后(每次调用一次)
包装风格钩子在每次调用周围运行,让您控制执行:
钩子运行时机
wrap_model_call在每次模型调用周围
wrap_tool_call在每次工具调用周围
示例:
from langchain.agents.middleware import before_model, after_model, AgentState
from langchain.messages import AIMessage
from langgraph.runtime import Runtime
from typing import Any


@before_model(can_jump_to=["end"])
def check_message_limit(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
    if len(state["messages"]) >= 50:
        return {
            "messages": [AIMessage("已达到对话限制。")],
            "jump_to": "end"
        }
    return None

@after_model
def log_response(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
    print(f"模型返回:{state['messages'][-1].content}")
    return None

包装风格钩子

拦截执行并控制何时调用处理程序。用于重试、缓存和转换。 您可以决定处理程序被调用零次(短路)、一次(正常流程)或多次(重试逻辑)。 可用的钩子:
  • wrap_model_call - 在每次模型调用周围
  • wrap_tool_call - 在每次工具调用周围
示例:
from langchain.agents.middleware import wrap_model_call, ModelRequest, ModelResponse
from typing import Callable


@wrap_model_call
def retry_model(
    request: ModelRequest,
    handler: Callable[[ModelRequest], ModelResponse],
) -> ModelResponse:
    for attempt in range(3):
        try:
            return handler(request)
        except Exception as e:
            if attempt == 2:
                raise
            print(f"重试 {attempt + 1}/3,错误:{e}")

状态更新

节点风格和包装风格钩子都可以更新代理状态。机制有所不同:
  • 节点风格钩子before_agentbefore_modelafter_modelafter_agent):直接返回一个字典。该字典使用图的 reducer 合并到代理状态中。
  • 包装风格钩子wrap_model_callwrap_tool_call):对于模型调用,返回带有 CommandExtendedModelResponse,以在模型响应旁边注入状态更新。对于工具调用,直接返回一个 Command。当您需要根据在模型或工具调用期间运行的逻辑来跟踪或更新状态时使用这些,例如摘要触发点、使用量元数据或从请求或响应计算的自定义字段。

节点风格钩子

从节点风格钩子返回一个字典,以将更新合并到代理状态中。字典键映射到状态字段。
from langchain.agents.middleware import after_model, AgentState
from langgraph.runtime import Runtime
from typing import Any
from typing_extensions import NotRequired


class TrackingState(AgentState):
    model_call_count: NotRequired[int]


@after_model(state_schema=TrackingState)
def increment_after_model(state: TrackingState, runtime: Runtime) -> dict[str, Any] | None:
    return {"model_call_count": state.get("model_call_count", 0) + 1}

包装风格钩子

wrap_model_call 返回带有 CommandExtendedModelResponse,以从模型调用层注入状态更新:
from typing import Callable
from langchain.agents.middleware import (
    wrap_model_call,
    ModelRequest,
    ModelResponse,
    AgentState,
    ExtendedModelResponse
)
from langgraph.types import Command
from typing_extensions import NotRequired

class UsageTrackingState(AgentState):
    """带有 token 使用量跟踪的代理状态。"""

    last_model_call_tokens: NotRequired[int]


@wrap_model_call(state_schema=UsageTrackingState)
def track_usage(
    request: ModelRequest,
    handler: Callable[[ModelRequest], ModelResponse],
) -> ExtendedModelResponse:
    response = handler(request)
    return ExtendedModelResponse(
        model_response=response,
        command=Command(update={"last_model_call_tokens": 150}),
    )
Command 通过图的 reducer 流动,因此更新被正确应用,消息是附加的而不是替换现有状态。

与多个中间件的组合

当多个中间件层返回 ExtendedModelResponse 时,它们的命令会组合:
  • 命令通过 reducer 应用: 每个 Command 成为单独的状态更新。对于消息,这意味着它们是附加的。
  • 冲突时外层获胜: 对于非 reducer 状态字段,命令从内到外应用。最外层中间件的值在冲突键上优先。
  • 重试安全: 如果外层中间件实现的逻辑可能导致多次调用 handler()(例如重试逻辑),则丢弃较早调用的命令。
from typing import Annotated, Callable

from langchain.agents.middleware import (
    AgentMiddleware,
    AgentState,
    ExtendedModelResponse,
    ModelRequest,
    ModelResponse,
)
from langchain.messages import SystemMessage
from langgraph.types import Command
from typing_extensions import NotRequired


def _last_wins(_a: str, b: str) -> str:
    """Reducer:最后写入者获胜(外层覆盖内层)。"""
    return b


class CustomMiddlewareState(AgentState):
    """代理状态:trace_layer 使用最后获胜(外层获胜),messages 使用附加 reducer。"""

    # 非 reducer 字段,最后获胜:两个中间件都写入;最外层值获胜
    trace_layer: NotRequired[Annotated[str, _last_wins]]


class OuterMiddleware(AgentMiddleware):
    def wrap_model_call(
        self,
        request: ModelRequest,
        handler: Callable[[ModelRequest], ModelResponse],
    ) -> ExtendedModelResponse:
        response = handler(request)
        return ExtendedModelResponse(
            model_response=response,
            command=Command(update={
                "trace_layer": "outer",
                "messages": [SystemMessage(content="[Outer 运行时]")],
            }),
        )


class InnerMiddleware(AgentMiddleware):
    """添加 trace_layer 和 message。外层添加到相同键;trace_layer:外层获胜,messages:附加。"""

    def wrap_model_call(
        self,
        request: ModelRequest,
        handler: Callable[[ModelRequest], ModelResponse],
    ):
        response = handler(request)
        return ExtendedModelResponse(
            model_response=response,
            command=Command(update={
                "trace_layer": "inner",
                "messages": [SystemMessage(content="[Inner 运行时]")],
            }),
        )

创建中间件

您可以通过两种方式创建中间件:

基于装饰器的中间件

对于单钩子中间件快速简单。使用装饰器包装各个函数。

基于类的中间件

对于具有多个钩子或配置的复杂中间件更强大。

基于装饰器的中间件

对于单钩子中间件快速简单。使用装饰器包装各个函数。 可用的装饰器: 节点风格: 包装风格: 便捷功能: 示例:
from langchain.agents.middleware import (
    before_model,
    wrap_model_call,
    AgentState,
    ModelRequest,
    ModelResponse,
)
from langchain.agents import create_agent
from langgraph.runtime import Runtime
from typing import Any, Callable


@before_model
def log_before_model(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
    print(f"即将使用 {len(state['messages'])} 条消息调用模型")
    return None

@wrap_model_call
def retry_model(
    request: ModelRequest,
    handler: Callable[[ModelRequest], ModelResponse],
) -> ModelResponse:
    for attempt in range(3):
        try:
            return handler(request)
        except Exception as e:
            if attempt == 2:
                raise
            print(f"重试 {attempt + 1}/3,错误:{e}")

agent = create_agent(
    model="gpt-4.1",
    middleware=[log_before_model, retry_model],
    tools=[...],
)
何时使用装饰器:
  • 需要单个钩子
  • 无需复杂配置
  • 快速原型开发

基于类的中间件

对于具有多个钩子或配置的复杂中间件更强大。当您需要为同一钩子定义同步和异步实现,或者想要在单个中间件中组合多个钩子时使用类。 示例:
from langchain.agents.middleware import (
    AgentMiddleware,
    AgentState,
    ModelRequest,
    ModelResponse,
)
from langgraph.runtime import Runtime
from typing import Any, Callable

class LoggingMiddleware(AgentMiddleware):
    def before_model(self, state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
        print(f"即将使用 {len(state['messages'])} 条消息调用模型")
        return None

    def after_model(self, state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
        print(f"模型返回:{state['messages'][-1].content}")
        return None

    async def abefore_model(
        self, state: AgentState, runtime: Runtime
    ) -> dict[str, Any] | None:
        # before_model 的异步版本
        return None

    async def aafter_model(
        self, state: AgentState, runtime: Runtime
    ) -> dict[str, Any] | None:
        # after_model 的异步版本
        print(f"模型返回:{state['messages'][-1].content}")
        return None


agent = create_agent(
    model="gpt-4.1",
    middleware=[LoggingMiddleware()],
    tools=[...],
)
何时使用类:
  • 为同一钩子定义同步和异步实现
  • 单个中间件中需要多个钩子
  • 需要复杂配置(例如可配置阈值、自定义模型)
  • 跨项目重用,配置在初始化时进行

自定义状态模式

如果您的中间件需要跨钩子跟踪状态,中间件可以用自定义属性扩展代理的状态。这使中间件能够:
  • 跨执行跟踪状态: 维护在整个代理执行生命周期中持续存在的计数器、标志或其他值
  • 在钩子之间共享数据:before_model 传递信息到 after_model,或在不同的中间件实例之间传递
  • 实现横切关注点: 添加功能,如速率限制、使用量跟踪、用户上下文或审计日志,而无需修改核心代理逻辑
  • 做出条件决策: 使用累积状态来决定是否继续执行、跳转到不同节点或动态修改行为
from langchain.agents import create_agent
from langchain.messages import HumanMessage
from langchain.agents.middleware import AgentState, before_model, after_model
from typing_extensions import NotRequired
from typing import Any
from langgraph.runtime import Runtime


class CustomState(AgentState):
    model_call_count: NotRequired[int]
    user_id: NotRequired[str]


@before_model(state_schema=CustomState, can_jump_to=["end"])
def check_call_limit(state: CustomState, runtime: Runtime) -> dict[str, Any] | None:
    count = state.get("model_call_count", 0)
    if count > 10:
        return {"jump_to": "end"}
    return None


@after_model(state_schema=CustomState)
def increment_counter(state: CustomState, runtime: Runtime) -> dict[str, Any] | None:
    return {"model_call_count": state.get("model_call_count", 0) + 1}


agent = create_agent(
    model="gpt-4.1",
    middleware=[check_call_limit, increment_counter],
    tools=[],
)

# 使用自定义状态调用
result = agent.invoke({
    "messages": [HumanMessage("你好")],
    "model_call_count": 0,
    "user_id": "user-123",
})

执行顺序

使用多个中间件时,了解它们的执行方式:
agent = create_agent(
    model="gpt-4.1",
    middleware=[middleware1, middleware2, middleware3],
    tools=[...],
)
Before 钩子按顺序运行:
  1. middleware1.before_agent()
  2. middleware2.before_agent()
  3. middleware3.before_agent()
代理循环开始
  1. middleware1.before_model()
  2. middleware2.before_model()
  3. middleware3.before_model()
包装钩子像函数调用一样嵌套:
  1. middleware1.wrap_model_call()middleware2.wrap_model_call()middleware3.wrap_model_call() → model
After 钩子按相反顺序运行:
  1. middleware3.after_model()
  2. middleware2.after_model()
  3. middleware1.after_model()
代理循环结束
  1. middleware3.after_agent()
  2. middleware2.after_agent()
  3. middleware1.after_agent()
关键规则:
  • before_* 钩子:从第一个到最后一个
  • after_* 钩子:从最后一个到第一个(反向)
  • wrap_* 钩子:嵌套(第一个中间件包装所有其他)

代理跳转

要从中间件提前退出,返回带有 jump_to 的字典: 可用的跳转目标:
  • 'end':跳转到代理执行的末尾(或第一个 after_agent 钩子)
  • 'tools':跳转到工具节点
  • 'model':跳转到模型节点(或第一个 before_model 钩子)
from langchain.agents.middleware import after_model, hook_config, AgentState
from langchain.messages import AIMessage
from langgraph.runtime import Runtime
from typing import Any


@after_model
@hook_config(can_jump_to=["end"])
def check_for_blocked(state: AgentState, runtime: Runtime) -> dict[str, Any] | None:
    last_message = state["messages"][-1]
    if "BLOCKED" in last_message.content:
        return {
            "messages": [AIMessage("我无法回应该请求。")],
            "jump_to": "end"
        }
    return None