加入和重新加入让您可以断开与正在运行的代理流的连接而无需停止代理,然后稍后重新连接。代理在客户端离开时继续在服务器端执行,您可以从上次中断的地方继续接收流。
为什么需要加入和重新加入?
传统的流式传输 API 将客户端和服务器紧密耦合:如果客户端断开连接,流就会丢失。加入和重新加入打破了这种耦合,实现了几个重要的模式:
- 网络中断:在移动网络基站或 Wi-Fi 网络之间切换的移动用户可以无缝恢复
- 页面导航:用户从聊天页面导航离开,稍后返回而不丢失进度
- 移动后台处理:被操作系统挂起的应用可以在恢复前台时重新加入流
- 长时间运行的任务:执行多分钟操作的代理(研究、代码生成、数据分析),用户不需要保持页面打开
- 多设备切换:在手机上开始对话,在桌面上重新加入
核心概念
加入/重新加入模式涉及三个关键机制:
| 方法 / 选项 | 用途 |
|---|
stream.stop() | 断开客户端与流的连接而不停止代理 |
stream.joinStream(runId) | 通过运行 ID 重新连接到现有流 |
onDisconnect: "continue" | 提交选项,告诉服务器在客户端断开后继续运行 |
streamResumable: true | 提交选项,启用稍后可以重新加入的流 |
stream.stop() 与取消运行有根本区别。停止仅断开客户端。代理继续在服务器端处理。要实际取消代理的执行,您需要使用中断或取消机制。
设置 useStream
关键设置步骤是从 onCreated 回调中捕获 run_id,以便稍后重新加入。
定义一个与代理状态模式匹配的 TypeScript 接口,并将其作为类型参数传递给 useStream,以便对状态值进行类型安全访问。在下面的示例中,将 typeof myAgent 替换为您的接口名称:
import type { BaseMessage } from "@langchain/core/messages";
interface AgentState {
messages: BaseMessage[];
}
import { useStream } from "@langchain/react";
import { useState } from "react";
function Chat() {
const [savedRunId, setSavedRunId] = useState<string | null>(null);
const stream = useStream<typeof myAgent>({
apiUrl: "http://localhost:2024",
assistantId: "join_rejoin",
onCreated(run) {
setSavedRunId(run.run_id);
},
});
const isConnected = stream.isLoading;
return (
<div>
<ConnectionStatus connected={isConnected} />
<MessageList messages={stream.messages} />
<ChatControls
stream={stream}
savedRunId={savedRunId}
isConnected={isConnected}
/>
</div>
);
}
使用可恢复选项提交
当您提交消息时,传递 onDisconnect: "continue" 和 streamResumable: true 以启用加入/重新加入流程:
stream.submit(
{ messages: [{ type: "human", content: text }] },
{
onDisconnect: "continue",
streamResumable: true,
}
);
| 选项 | 默认 | 描述 |
|---|
onDisconnect | "cancel" | 客户端断开连接时发生什么。"continue" 保持代理运行;"cancel" 停止它。 |
streamResumable | false | 当为 true 时,服务器保留流状态,以便客户端稍后可以重新加入。 |
始终同时使用两个选项。如果没有 streamResumable: true,仅设置 onDisconnect: "continue" 意味着代理继续运行,但您无法重新加入流来查看其输出。
断开流连接
调用 stream.stop() 断开客户端连接。代理继续在服务器端处理。
调用 stop() 后:
stream.isLoading 变为 false
- 消息列表保留断开点之前接收的所有消息
- 代理继续在服务器上运行
- 在重新加入之前不会收到新消息
重新加入流
使用保存的运行 ID 调用 stream.joinStream(runId) 重新连接:
stream.joinStream(savedRunId);
重新加入后:
stream.isLoading 再次变为 true
- 断开连接期间生成的任何消息都会被传递
- 新的流式消息实时恢复
- 如果代理已经完成,您会立即收到最终状态
构建连接状态指示器
视觉指示器帮助用户了解他们是否正在从代理接收实时更新。
function ConnectionStatus({ connected }: { connected: boolean }) {
return (
<div className="connection-status">
<span
className={`status-dot ${connected ? "connected" : "disconnected"}`}
/>
<span className="status-text">
{connected ? "Connected" : "Disconnected"}
</span>
</div>
);
}
使用绿色/红色圆点为指示器添加样式:
.status-dot {
width: 8px;
height: 8px;
border-radius: 50%;
display: inline-block;
margin-right: 6px;
}
.status-dot.connected {
background-color: #22c55e;
box-shadow: 0 0 4px #22c55e;
}
.status-dot.disconnected {
background-color: #ef4444;
box-shadow: 0 0 4px #ef4444;
}
断开和重新加入控件
提供显式按钮来断开连接和重新加入,以便用户完全控制:
function ChatControls({ stream, savedRunId, isConnected }) {
const [input, setInput] = useState("");
const handleSend = () => {
if (!input.trim()) return;
stream.submit(
{ messages: [{ type: "human", content: input.trim() }] },
{ onDisconnect: "continue", streamResumable: true }
);
setInput("");
};
return (
<div className="controls">
<div className="input-row">
<input
value={input}
onChange={(e) => setInput(e.target.value)}
placeholder="Type a message..."
onKeyDown={(e) => e.key === "Enter" && handleSend()}
/>
<button onClick={handleSend}>Send</button>
</div>
<div className="stream-controls">
{isConnected ? (
<button onClick={() => stream.stop()} className="disconnect-btn">
Disconnect
</button>
) : (
savedRunId && (
<button
onClick={() => stream.joinStream(savedRunId)}
className="rejoin-btn"
>
Rejoin stream
</button>
)
)}
</div>
</div>
);
}
持久化运行 ID
对于跨会话重新加入(例如,用户关闭浏览器并稍后返回),将运行 ID 持久化到存储:
const stream = useStream<typeof myAgent>({
apiUrl: "http://localhost:2024",
assistantId: "join_rejoin",
onCreated(run) {
localStorage.setItem("activeRunId", run.run_id);
},
});
// On page load, check for an active run
const existingRunId = localStorage.getItem("activeRunId");
if (existingRunId) {
stream.joinStream(existingRunId);
}
持久化的运行 ID 应在运行完成时清理。监听流完成并移除存储的 ID,以避免尝试重新加入已完成的运行。
错误处理
如果运行已过期、被删除或服务器已重启,重新加入可能会失败。优雅地处理这些情况:
try {
stream.joinStream(savedRunId);
} catch (error) {
console.error("Failed to rejoin stream:", error);
// Clear stale run ID and inform the user
setSavedRunId(null);
localStorage.removeItem("activeRunId");
}
完整示例
function JoinRejoinChat() {
const [savedRunId, setSavedRunId] = useState<string | null>(null);
const [input, setInput] = useState("");
const stream = useStream<typeof myAgent>({
apiUrl: "http://localhost:2024",
assistantId: "join_rejoin",
onCreated(run) {
setSavedRunId(run.run_id);
},
});
const isConnected = stream.isLoading;
const handleSend = () => {
if (!input.trim()) return;
stream.submit(
{ messages: [{ type: "human", content: input.trim() }] },
{ onDisconnect: "continue", streamResumable: true }
);
setInput("");
};
return (
<div className="chat-container">
<header>
<h2>Join & Rejoin Demo</h2>
<ConnectionStatus connected={isConnected} />
</header>
<div className="messages">
{stream.messages.map((msg, i) => (
<MessageBubble key={i} message={msg} />
))}
</div>
<div className="controls">
<form onSubmit={(e) => { e.preventDefault(); handleSend(); }}>
<input
value={input}
onChange={(e) => setInput(e.target.value)}
placeholder="Type a message..."
/>
<button type="submit">Send</button>
</form>
<div className="stream-actions">
{isConnected ? (
<button onClick={() => stream.stop()}>
Disconnect
</button>
) : (
savedRunId && (
<button onClick={() => stream.joinStream(savedRunId)}>
Rejoin stream
</button>
)
)}
</div>
</div>
</div>
);
}
最佳实践
- 始终保存运行 ID:没有它,重新加入是不可能的。同时使用组件状态和持久化存储以提高弹性。
- 显示清晰的连接状态:用户应始终知道他们是在接收实时更新还是查看快照。
- 页面可见性变化时自动重新加入:使用页面可见性 API 在用户返回标签页时自动重新加入。
- 设置合理的超时:如果重新加入尝试花费时间过长,则回退到获取线程历史记录。
- 清理已完成的运行:在代理完成时移除持久化的运行 ID,以避免过时的重新加入尝试。