概述

聊天界面主导了我们与 AI 的交互方式,但多模态 AI 的最新突破正在开辟令人兴奋的新可能性。高质量的生成模型和富有表现力的文本转语音(TTS)系统现在使构建感觉更像是对话伙伴而非工具的代理成为可能。 语音代理就是其中之一。您可以使用口语与代理交互,而不是依赖键盘和鼠标将输入输入代理。这可以是一种更自然、更吸引人的与 AI 交互的方式,在某些场景中特别有用。

什么是语音代理?

语音代理是能够与用户进行自然口语对话的代理。这些代理结合了语音识别、自然语言处理、生成式 AI 和文本转语音技术,创造无缝、自然的对话。 它们适用于各种用例,包括:
  • 客户支持
  • 个人助理
  • 免提界面
  • 辅导和培训

语音代理如何工作?

在高级别上,每个语音代理都需要处理三个任务:
  1. 聆听 - 捕获音频并转录
  2. 思考 - 解释意图、推理、规划
  3. 说话 - 生成音频并流式传输回用户
区别在于这些步骤如何排序和耦合。实际上,生产代理遵循两种主要架构之一:

1. STT > 代理 > TTS 架构(三明治架构)

三明治架构组合了三个不同的组件:语音转文本(STT)、基于文本的 LangChain 代理和文本转语音(TTS)。 优点:
  • 完全控制每个组件(根据需要交换 STT/TTS 提供商)
  • 访问现代文本模态模型的最新功能
  • 具有组件之间清晰边界的透明行为
缺点:
  • 需要编排多个服务
  • 管理管道的额外复杂性
  • 从语音到文本的转换会丢失信息(例如语气、情感)

2. 语音到语音架构(S2S)

语音到语音使用多模态模型,原生处理音频输入和生成音频输出。 优点:
  • 更简单的架构,移动部件更少
  • 对于简单交互通常延迟更低
  • 直接音频处理捕获语音的语调和其他细微差别
缺点:
  • 模型选项有限,提供商锁定风险更大
  • 功能可能落后于文本模态模型
  • 音频处理方式透明度较低
  • 可控性和定制选项减少
本指南演示了三明治架构以平衡性能、可控性和对现代模型功能的访问。三明治可以在某些 STT 和 TTS 提供商下实现低于 700ms 的延迟,同时保持对模块化组件的控制。

演示应用程序概述

我们将逐步介绍使用三明治架构构建基于语音的代理。该代理将管理三明治店的订单。应用程序将演示三明治架构的所有三个组件,使用 AssemblyAI 进行 STT,使用 Cartesia 进行 TTS(尽管可以为大多数提供商构建适配器)。 端到端参考应用程序可在 voice-sandwich-demo 仓库中找到。我们将在此逐步介绍该应用程序。 该演示使用 WebSocket 在浏览器和服务器之间进行实时双向通信。相同的架构可以适应其他传输,如电话系统(Twilio、Vonage)或 WebRTC 连接。

架构

该演示实现了一个流式管道,其中每个阶段异步处理数据: 客户端(浏览器)
  • 捕获麦克风音频并编码为 PCM
  • 建立到后端服务器的 WebSocket 连接
  • 实时将音频块流式传输到服务器
  • 接收并播放合成的语音音频
服务器(Python)
  • 接受来自客户端的 WebSocket 连接
  • 编排三步管道:
    • 语音转文本(STT):将音频转发到 STT 提供商(例如 AssemblyAI),接收转录事件
    • 代理:使用 LangChain 代理处理转录本,流式传输响应令牌
    • 文本转语音(TTS):将代理响应发送到 TTS 提供商(例如 Cartesia),接收音频块
  • 返回合成的音频到客户端进行播放
管道使用异步生成器在每个阶段启用流式传输。这允许下游组件在上游阶段完成之前开始处理,从而最小化端到端延迟。

设置

有关详细的安装说明和设置,请参阅仓库 README

1. 语音转文本

STT 阶段将传入的音频流转换为文本转录本。实现使用生产者-消费者模式来并发处理音频流和转录接收。

关键概念

生产者-消费者模式:音频块在接收转录事件的同时发送到 STT 服务。这允许在所有音频到达之前开始转录。 事件类型
  • stt_chunk:当 STT 服务处理音频时提供的部分转录
  • stt_output:触发代理处理的最终格式化转录
WebSocket 连接:维护到 AssemblyAI 实时 STT API 的持久连接,配置为 16kHz PCM 音频和自动轮次格式化。

实现

from typing import AsyncIterator
import asyncio
from assemblyai_stt import AssemblyAISTT
from events import VoiceAgentEvent

async def stt_stream(
    audio_stream: AsyncIterator[bytes],
) -> AsyncIterator[VoiceAgentEvent]:
    """
    转换流:音频(字节)→ 语音事件(VoiceAgentEvent)

    使用生产者-消费者模式:
    - 生产者:读取音频块并发送到 AssemblyAI
    - 消费者:从 AssemblyAI 接收转录事件
    """
    stt = AssemblyAISTT(sample_rate=16000)

    async def send_audio():
        """将音频块泵送到 AssemblyAI 的后台任务。"""
        try:
            async for audio_chunk in audio_stream:
                await stt.send_audio(audio_chunk)
        finally:
            # 当音频流结束时发出完成信号
            await stt.close()

    # 在后台启动音频发送
    send_task = asyncio.create_task(send_audio())

    try:
        # 接收并在事件到达时产生转录事件
        async for event in stt.receive_events():
            yield event
    finally:
        # 清理
        with contextlib.suppress(asyncio.CancelledError):
            send_task.cancel()
            await send_task
        await stt.close()
应用程序实现了一个 AssemblyAI 客户端来管理 WebSocket 连接和消息解析。请参阅下面的实现;可以为其他 STT 提供商构建类似的适配器。
class AssemblyAISTT:
    def __init__(self, api_key: str | None = None, sample_rate: int = 16000):
        self.api_key = api_key or os.getenv("ASSEMBLYAI_API_KEY")
        self.sample_rate = sample_rate
        self._ws: WebSocketClientProtocol | None = None

    async def send_audio(self, audio_chunk: bytes) -> None:
        """将 PCM 音频字节发送到 AssemblyAI。"""
        ws = await self._ensure_connection()
        await ws.send(audio_chunk)

    async def receive_events(self) -> AsyncIterator[STTEvent]:
        """从 AssemblyAI 到达时产生 STT 事件。"""
        async for raw_message in self._ws:
            message = json.loads(raw_message)

            if message["type"] == "Turn":
                # 最终格式化转录
                if message.get("turn_is_formatted"):
                    yield STTOutputEvent.create(message["transcript"])
                # 部分转录
                else:
                    yield STTChunkEvent.create(message["transcript"])

    async def _ensure_connection(self) -> WebSocketClientProtocol:
        """如果尚未连接则建立 WebSocket 连接。"""
        if self._ws is None:
            url = f"wss://streaming.assemblyai.com/v3/ws?sample_rate={self.sample_rate}&format_turns=true"
            self._ws = await websockets.connect(
                url,
                additional_headers={"Authorization": self.api_key}
            )
        return self._ws

2. LangChain 代理

代理阶段通过 LangChain 代理 处理文本转录本并流式传输响应令牌。在这种情况下,我们流式传输代理生成的 所有文本内容块

关键概念

流式响应:代理使用 stream_mode="messages" 在生成响应令牌时发出它们,而不是等待完整响应。这使 TTS 阶段能够立即开始合成。 对话记忆检查点 使用唯一线程 ID 在轮次之间维护对话状态。这允许代理引用对话中的先前交换。

实现

from langchain_core.utils.uuid import uuid7
from langchain.agents import create_agent
from langchain.messages import HumanMessage
from langgraph.checkpoint.memory import InMemorySaver

# 定义代理工具
def add_to_order(item: str, quantity: int) -> str:
    """将商品添加到客户的三明治订单。"""
    return f"Added {quantity} x {item} to order."

def confirm_order(order_summary: str) -> str:
    """与客户确认最终订单。"""
    return f"Order confirmed: {order_summary}. Sending to kitchen."

# 使用工具和记忆创建代理
agent = create_agent(
    model="anthropic:claude-haiku-4-5",  # 选择您的模型
    tools=[add_to_order, confirm_order],
    system_prompt="""你是一个有用的三明治店助理。
    你的目标是接受用户的订单。要简洁且友好。
    不要使用表情符号、特殊字符或 markdown。
    你的响应将被文本转语音引擎朗读。""",
    checkpointer=InMemorySaver(),
)

async def agent_stream(
    event_stream: AsyncIterator[VoiceAgentEvent],
) -> AsyncIterator[VoiceAgentEvent]:
    """
    转换流:语音事件 → 语音事件(带代理响应)

    传递所有上游事件,并在处理 STT 转录本时添加 agent_chunk 事件。
    """
    # 生成唯一线程 ID 用于对话记忆
    thread_id = str(uuid7())

    async for event in event_stream:
        # 传递所有上游事件
        yield event

        # 通过代理处理最终转录本
        if event.type == "stt_output":
            # 使用对话上下文流式传输代理响应
            stream = agent.astream(
                {"messages": [HumanMessage(content=event.transcript)]},
                {"configurable": {"thread_id": thread_id}},
                stream_mode="messages",
            )

            # 在到达时产生代理响应块
            async for message, _ in stream:
                if message.text:
                    yield AgentChunkEvent.create(message.text)

3. 文本转语音

TTS 阶段将文本转换为音频并流式传输回用户。实现使用流式音频生成来最小化感知延迟。

关键概念

流式音频生成:TTS 提供商实时生成音频块,而不是等待完整文本。这允许在生成完整响应之前开始播放。 音频编码:音频块编码为适当格式(如 MP3 或 Opus)以进行网络传输。

实现

from typing import AsyncIterator
from cartesia_tts import CartesiaTTS
from events import VoiceAgentEvent

async def tts_stream(
    event_stream: AsyncIterator[VoiceAgentEvent],
) -> AsyncIterator[bytes]:
    """
    转换流:语音事件 → 音频字节

    将代理响应转换为流式音频。
    """
    tts = CartesiaTTS()

    async for event in event_stream:
        # 处理代理响应块
        if event.type == "agent_chunk":
            # 将文本发送到 TTS 并流式传输音频
            async for audio_chunk in tts.stream(event.text):
                yield audio_chunk

完整管道

async def voice_agent_pipeline(audio_stream: AsyncIterator[bytes]) -> AsyncIterator[bytes]:
    """
    完整语音代理管道:

    音频 → STT → 代理 → TTS → 音频
    """
    # 创建事件流
    stt_events = stt_stream(audio_stream)
    agent_events = agent_stream(stt_events)
    audio_output = tts_stream(agent_events)

    # 流式传输最终音频
    async for audio_chunk in audio_output:
        yield audio_chunk

Cartesia TTS 客户端

应用程序实现了一个 Cartesia 客户端来管理 WebSocket 连接和音频合成。以下是实现;可以为其他 TTS 提供商构建类似的适配器:
class CartesiaTTS:
    def __init__(
        self,
        api_key: str | None = None,
        voice_id: str = "f6ff7c0c-e396-40a9-a70b-f7607edb6937",
        model_id: str = "sonic-3",
        sample_rate: int = 24000,
        encoding: str = "pcm_s16le",
    ):
        self.api_key = api_key or os.getenv("CARTESIA_API_KEY")
        self.voice_id = voice_id
        self.model_id = model_id
        self.sample_rate = sample_rate
        self.encoding = encoding
        self._ws: WebSocketClientProtocol | None = None

    def _generate_context_id(self) -> str:
        """为 Cartesia 生成有效的 context_id。"""
        timestamp = int(time.time() * 1000)
        counter = self._context_counter
        self._context_counter += 1
        return f"ctx_{timestamp}_{counter}"

    async def send_text(self, text: str | None) -> None:
        """发送文本到 Cartesia 进行合成。"""
        if not text or not text.strip():
            return

        ws = await self._ensure_connection()
        payload = {
            "model_id": self.model_id,
            "transcript": text,
            "voice": {
                "mode": "id",
                "id": self.voice_id,
            },
            "output_format": {
                "container": "raw",
                "encoding": self.encoding,
                "sample_rate": self.sample_rate,
            },
            "language": self.language,
            "context_id": self._generate_context_id(),
        }
        await ws.send(json.dumps(payload))

    async def receive_events(self) -> AsyncIterator[TTSChunkEvent]:
        """从 Cartesia 到达时产生音频块。"""
        async for raw_message in self._ws:
            message = json.loads(raw_message)

            # 解码并产生音频块
            if "data" in message and message["data"]:
                audio_chunk = base64.b64decode(message["data"])
                if audio_chunk:
                    yield TTSChunkEvent.create(audio_chunk)

    async def _ensure_connection(self) -> WebSocketClientProtocol:
        """如果尚未连接则建立 WebSocket 连接。"""
        if self._ws is None:
            url = (
                f"wss://api.cartesia.ai/tts/websocket"
                f"?api_key={self.api_key}&cartesia_version={self.cartesia_version}"
            )
            self._ws = await websockets.connect(url)

        return self._ws

LangSmith

您使用 LangChain 构建的许多应用程序都包含多个步骤和多次 LLM 调用。随着这些应用程序变得越来越复杂,能够检查链或代理内部发生的事情变得至关重要。做到这一点的最佳方法是使用 LangSmith 在上述链接注册后,确保设置您的环境变量以开始记录追踪:
export LANGSMITH_TRACING="true"
export LANGSMITH_API_KEY="..."
或者,在 Python 中设置:
import getpass
import os

os.environ["LANGSMITH_TRACING"] = "true"
os.environ["LANGSMITH_API_KEY"] = getpass.getpass()

将所有内容整合在一起

完整的管道将三个阶段链接在一起:
from langchain_core.runnables import RunnableGenerator

pipeline = (
    RunnableGenerator(stt_stream)      # 音频 → STT 事件
    | RunnableGenerator(agent_stream)  # STT 事件 → 代理事件
    | RunnableGenerator(tts_stream)    # 代理事件 → TTS 音频
)

# 在 WebSocket 端点中使用
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()

    async def websocket_audio_stream():
        """从 WebSocket 产生音频字节。"""
        while True:
            data = await websocket.receive_bytes()
            yield data

    # 通过管道转换音频
    output_stream = pipeline.atransform(websocket_audio_stream())

    # 将 TTS 音频发送回客户端
    async for event in output_stream:
        if event.type == "tts_chunk":
            await websocket.send_bytes(event.audio)
我们使用 RunnableGenerators 来组合管道的每个步骤。这是 LangChain 在内部用于管理跨组件流式传输的抽象。 每个阶段独立且并发地处理事件:音频转录在音频到达时立即开始,代理在转录本可用时立即开始推理,语音合成在生成代理文本时立即开始。该架构可以实现低于 700ms 的延迟以支持自然对话。 有关使用 LangChain 构建代理的更多信息,请参阅代理指南

性能优化

最小化延迟

  1. 早期流式传输:每个阶段在前一阶段仍在处理时开始处理
  2. 并行处理:STT 和 TTS 可以并行运行
  3. 优化的 STT/TTS 提供商:选择低延迟提供商

错误处理

async def voice_agent_pipeline(audio_stream: AsyncIterator[bytes]) -> AsyncIterator[bytes]:
    try:
        async for audio_chunk in pipeline(audio_stream):
            yield audio_chunk
    except Exception as e:
        # 记录错误并向用户返回错误消息
        logger.error(f"语音代理错误:{e}")
        yield generate_error_audio("抱歉,出现了问题。请重试。")

下一步