.. _chapter-gateway-rpc: ************************************** TUI Gateway:跨进程 JSON-RPC 通信 ************************************** .. contents:: :local: :depth: 2 为什么需要 Gateway ==================== Hermes Agent 有两种主要的前端界面: 1. **CLI 模式** (``hermes chat``):Python REPL,直接在终端中运行,Agent 和 UI 在同一个进程中 2. **TUI 模式** (TypeScript/React 前端):一个独立的富 GUI 应用,用 Electron 或 Web 技术构建 问题在于:Agent 核心是 Python 写的(``AIAgent`` 类),而 TUI 前端是 TypeScript 写的。 它们运行在 **不同的进程中** ,甚至可能在不同的机器上(远程部署场景)。 Gateway 就是连接这两者的桥梁: .. mermaid:: :name: gateway-architecture :caption: Gateway 架构总览 flowchart TD subgraph TUI["TUI 前端 (TypeScript/React)"] UI1["消息输入框"] UI2["工具状态面板"] UI3["对话历史"] UI4["设置面板"] end subgraph GW["TUI Gateway (Python 进程)"] GW1["JSON-RPC 调度器"] GW2["会话管理器"] GW3["Agent 工厂"] GW4["SlashWorker 子进程"] end subgraph AGENT["Agent 核心"] A1["AIAgent"] A2["工具执行"] A3["流式输出"] A4["上下文压缩"] end TUI -- "stdin/stdout JSON Lines" --> GW GW -- "线程池 + 回调" --> AGENT GW -- "stdin/stdout JSON Lines" --> GW4 AGENT -- "事件流" --> GW GW -- "事件推送" --> TUI Gateway 作为一个独立的 Python 进程运行,通过 **stdin/stdout 管道** 与 TUI 前端通信。 TUI 前端启动 Gateway 进程,将 JSON-RPC 请求写入 Gateway 的 stdin,从 Gateway 的 stdout 读取响应和事件。 这种架构的优势: - **语言无关性** :前端可以用任何语言实现,只需遵循 JSON-RPC 协议 - **进程隔离** :Agent 崩溃不会拖垮前端,反之亦然 - **安全性** :前端无法直接访问 Python 运行时,所有操作通过 RPC 方法暴露 - **可扩展性** :Gateway 可以同时管理多个会话,每个会话有独立的 Agent 实例 通信协议 ========== Gateway 使用 **JSON-RPC 2.0 over stdin/stdout (JSON Lines)** 协议。 请求格式 ---------- 每个请求是一个单行 JSON 对象: .. code-block:: json {"jsonrpc": "2.0", "id": 1, "method": "prompt.submit", "params": {"session_id": "abc123", "text": "Hello"}} 响应格式 ---------- 成功响应: .. code-block:: json {"jsonrpc": "2.0", "id": 1, "result": {"status": "streaming"}} 错误响应: .. code-block:: json {"jsonrpc": "2.0", "id": 1, "error": {"code": 4009, "message": "session busy"}} 事件推送(无 id,单向通知): .. code-block:: json {"jsonrpc": "2.0", "method": "event", "params": {"type": "message.delta", "session_id": "abc123", "payload": {"text": "Hello"}}} Stdout 保护 ------------- Gateway 将 Python 的 ``sys.stdout`` 重定向到 ``sys.stderr`` , 确保所有 Python 的 ``print()`` 调用不会污染 JSON 协议通道: .. code-block:: python _real_stdout = sys.stdout sys.stdout = sys.stderr 所有 JSON 输出通过 ``write_json()`` 函数,使用 ``_stdout_lock`` 保证线程安全: .. code-block:: python def write_json(obj: dict) -> bool: line = json.dumps(obj, ensure_ascii=False) + "\n" try: with _stdout_lock: _real_stdout.write(line) _real_stdout.flush() return True except BrokenPipeError: return False 当 ``write_json()`` 返回 ``False`` 时(管道断裂),Gateway 进程优雅退出。 这在 TUI 前端关闭时自动触发。 错误码体系 ------------ Gateway 定义了一组语义化的错误码: .. list-table:: Gateway 错误码 :header-rows: 1 :widths: 15 40 45 * - 错误码 - 含义 - 典型场景 * - -32700 - JSON 解析错误 - 格式错误的请求行 * - -32601 - 未知方法 - 调用不存在的 RPC 方法 * - -32000 - 处理器内部错误 - 未预期的异常 * - 4001 - 会话未找到 - 使用无效的 session_id * - 4002 - 参数无效 - 缺少必需参数 * - 4009 - 会话繁忙 - 在 Agent 运行时发送新请求 * - 5032 - Agent 初始化超时 - 模型提供者不可用 双调度架构 ============ Gateway 的 RPC 调度器采用 **双轨设计** ,根据方法名将请求路由到不同的执行通道: .. mermaid:: :name: gateway-dual-dispatch :caption: 双调度架构 flowchart LR INPUT["stdin JSON-RPC"] --> PARSER["JSON 解析"] PARSER --> ROUTER{"dispatch() 路由"} ROUTER -->|"快速方法
(prompt.submit,
clarify.respond 等)"| INLINE["主线程
handle_request()"] ROUTER -->|"长时方法
(slash.exec,
session.resume 等)"| POOL["ThreadPoolExecutor
线程池"] INLINE --> RESP1["直接返回响应"] POOL --> RESP2["write_json()
异步写入响应"] subgraph "长时处理器列表" L1["cli.exec"] L2["session.branch"] L3["session.resume"] L4["shell.exec"] L5["slash.exec"] end 内联处理器(主线程) ---------------------- 大多数 RPC 方法在主线程中执行。这些是 **快速操作**——读取配置、返回会话信息、 响应交互请求等。它们的响应直接从 ``dispatch()`` 返回。 长时处理器(线程池) ---------------------- 部分 RPC 方法可能阻塞数秒到数分钟。如果在主线程中执行它们,会阻塞 stdin 读取循环, 导致其他 RPC 请求(特别是 ``approval.respond`` 和 ``session.interrupt``)无法被处理。 .. code-block:: python _LONG_HANDLERS = frozenset({ "cli.exec", "session.branch", "session.resume", "shell.exec", "slash.exec" }) _pool = concurrent.futures.ThreadPoolExecutor( max_workers=max(2, int(os.environ.get("HERMES_TUI_RPC_POOL_WORKERS", "4") or 4)), thread_name_prefix="tui-rpc", ) 长时处理器被提交到线程池执行。响应通过 ``write_json()`` 异步写入—— 这个函数已经被 ``_stdout_lock`` 保护,所以并发写入是安全的。 初始化握手 ============ Gateway 启动时,会进行一次 **初始化握手** : .. code-block:: python # entry.py main() if not write_json({ "jsonrpc": "2.0", "method": "event", "params": {"type": "gateway.ready", "payload": {"skin": resolve_skin()}}, }): sys.exit(0) 这条消息告诉 TUI 前端: 1. Gateway 已成功启动 2. 当前活跃的皮肤配置(颜色、品牌、工具前缀等) TUI 前端收到 ``gateway.ready`` 事件后,才能开始发送 RPC 请求。 如果 stdout 已关闭(TUI 前端已退出),``write_json()`` 返回 ``False`` , Gateway 立即退出。 信号处理 ---------- Gateway 注册了两个信号处理器: .. code-block:: python signal.signal(signal.SIGPIPE, signal.SIG_DFL) # 管道断裂时优雅退出 signal.signal(signal.SIGINT, signal.SIG_IGN) # 忽略 Ctrl+C - **SIGPIPE** :当 TUI 前端关闭 stdout 管道时触发,使用默认处理器让进程退出 - **SIGINT** :忽略终端的 Ctrl+C,防止意外中断。Agent 的中断通过 RPC 方法 ``session.interrupt`` 触发 prompt.submit 完整流程 ======================== ``prompt.submit`` 是 Gateway 最核心的 RPC 方法,它处理用户发送的每一条消息。 以下是完整的执行流程: .. mermaid:: :name: gateway-prompt-submit :caption: prompt.submit 完整时序 sequenceDiagram autonumber participant TUI as TUI 前端 participant GW as Gateway participant Session as 会话管理器 participant Agent as AIAgent participant Stream as 流式渲染器 TUI->>GW: prompt.submit {session_id, text} GW->>Session: 查找会话 Session-->>GW: session dict alt 会话繁忙 GW-->>TUI: error 4009 "session busy" end Note over Session: 设置 running = True
快照 history_version GW->>GW: 附加图片预处理 GW->>GW: @上下文引用展开 GW-->>TUI: {status: "streaming"}
(立即返回) Note over GW: 后台线程开始执行 GW->>Agent: run_conversation(text, history) loop 流式输出 Agent-->>GW: delta text GW->>Stream: feed(delta) → 渲染 GW-->>TUI: event message.delta {text, rendered} end alt 工具调用 Agent-->>GW: tool_start_callback GW-->>TUI: event tool.start {name, context} Agent-->>GW: tool_complete_callback GW-->>TUI: event tool.complete {name, summary, inline_diff} end alt 思维过程 Agent-->>GW: thinking_callback GW-->>TUI: event thinking.delta {text} end Agent-->>GW: 最终结果 GW->>Session: 写入历史(版本检查) Note over Session: history_version 匹配?
匹配 → 写入
不匹配 → 丢弃 GW->>Session: 设置 running = False GW-->>TUI: event message.complete {text, usage, status} 时序分析 ---------- 1. **请求验证** (< 1ms):检查 session_id 是否存在,会话是否繁忙 2. **状态锁定** (< 1ms):设置 ``running = True`` ,快照 ``history_version`` 3. **预处理** (0-5s):图片附件通过 vision 预分析,``@`` 上下文引用展开 4. **立即响应** (< 1ms):返回 ``{status: "streaming"}`` ,不等待 Agent 完成 5. **Agent 执行** (1s - 数分钟):后台线程运行 ``run_conversation()`` 6. **流式输出** (实时):每个 token delta 立即推送到 TUI 7. **工具反馈** (实时):工具开始/完成事件 8. **结果写入** (< 1ms):带版本检查的历史写入 9. **状态释放** (< 1ms):设置 ``running = False`` 关键设计决策:**prompt.submit 立即返回 ``{status: "streaming"}``** 。 这意味着 TUI 前端不需要等待 Agent 完成,可以继续处理用户的其他操作(如发送 ``/interrupt``)。 56 个 RPC 方法分类表 ====================== ``tui_gateway/server.py``(3086 行)通过 ``@method`` 装饰器定义了 56 个 RPC 方法, 按功能分为十二大类。 Session(会话管理)— 13 个方法 ------------------------------- .. list-table:: :header-rows: 1 :widths: 28 52 20 * - 方法名 - 功能 - 阻塞性 * - session.create - 创建新会话(异步构建 Agent,返回 session_id) - 快速 * - session.close - 关闭会话,释放 Agent 和 SlashWorker - 快速 * - session.list - 列出历史会话(TUI + CLI,合并排序) - 快速 * - session.resume - 恢复之前的会话(按 session_id 或标题查找) - **长时** * - session.title - 获取/设置会话标题 - 快速 * - session.usage - 获取 token 使用量 - 快速 * - session.history - 获取对话历史(转换为 messages 数组) - 快速 * - session.undo - 撤销最后一轮对话(运行中拒绝执行) - 快速 * - session.compress - 手动压缩上下文(运行中拒绝执行) - 快速 * - session.save - 保存对话到 JSON 文件 - 快速 * - session.branch - 从当前会话分支新会话(加载完整历史) - **长时** * - session.interrupt - 中断当前 Agent 执行(设置 interrupted 标志) - 快速 * - session.steer - 注入消息到下一个工具结果(不中断,不违反 role 交替规则) - 快速 Prompt(提示处理)— 4 个方法 ----------------------------- .. list-table:: :header-rows: 1 :widths: 28 52 20 * - 方法名 - 功能 - 阻塞性 * - prompt.submit - 提交用户消息(流式响应,立即返回 streaming 状态) - 快速(返回后异步执行) * - prompt.background - 后台执行提示(独立 Agent 实例,返回 task_id) - 快速 * - prompt.btw - 旁注式提问(不持久化,无工具,max_iterations=8) - 快速 * - clipboard.paste - 从系统剪贴板粘贴图片到附件列表 - 快速 Media(媒体附件)— 2 个方法 ---------------------------- .. list-table:: :header-rows: 1 :widths: 28 52 20 * - 方法名 - 功能 - 阻塞性 * - image.attach - 附加本地图片(验证格式,读取元数据,估算 token) - 快速 * - input.detect_drop - 检测文件拖放(区分图片和普通文件) - 快速 Interaction(交互响应)— 4 个方法 ----------------------------------- .. list-table:: :header-rows: 1 :widths: 28 52 20 * - 方法名 - 功能 - 阻塞性 * - clarify.respond - 响应 clarify 请求(唤醒阻塞的 Agent 线程) - 快速 * - sudo.respond - 响应 sudo 密码请求(唤醒阻塞的 Agent 线程) - 快速 * - secret.respond - 响应 secret 输入请求(唤醒阻塞的 Agent 线程) - 快速 * - approval.respond - 响应命令审批请求(approve/deny,支持 resolve_all) - 快速 Config(配置管理)— 3 个方法 ------------------------------ .. list-table:: :header-rows: 1 :widths: 28 52 20 * - 方法名 - 功能 - 阻塞性 * - config.set - 设置配置项(model/reasoning/skin/personality/voice 等) - 快速 * - config.get - 读取配置项(provider/full/prompt/skin/reasoning/compact 等) - 快速 * - config.show - 返回结构化的配置概览(Model/Agent/Environment 分节) - 快速 Completion(补全)— 2 个方法 ------------------------------ .. list-table:: :header-rows: 1 :widths: 28 52 20 * - 方法名 - 功能 - 阻塞性 * - complete.path - 文件路径补全(支持 ``@file:`` / ``@folder:`` 上下文引用) - 快速 * - complete.slash - Slash 命令补全(合并 COMMAND_REGISTRY + skill commands + TUI extras) - 快速 Command(命令)— 4 个方法 --------------------------- .. list-table:: :header-rows: 1 :widths: 28 52 20 * - 方法名 - 功能 - 阻塞性 * - command.resolve - 解析命令名/别名(返回规范名称和元数据) - 快速 * - command.dispatch - 调度 slash 命令(skill/plugin/quick command 前置处理) - 快速 * - commands.catalog - 获取完整命令目录(COMMAND_REGISTRY + skills + quick_commands) - 快速 * - cli.exec - 执行 hermes CLI 子命令(subprocess,最长 600s 超时) - **长时** Voice(语音)— 3 个方法 -------------------------- .. list-table:: :header-rows: 1 :widths: 28 52 20 * - 方法名 - 功能 - 阻塞性 * - voice.toggle - 开关语音模式(status/on/off,持久化到 config) - 快速 * - voice.record - 控制语音录制(start/stop,stop 时自动转录返回文本) - 快速 * - voice.tts - 文字转语音播放(后台线程执行 speak_text) - 快速 Model & Tools(模型与工具)— 6 个方法 --------------------------------------- .. list-table:: :header-rows: 1 :widths: 28 52 20 * - 方法名 - 功能 - 阻塞性 * - model.options - 列出所有已认证 Provider 及其可用模型 - 快速 * - tools.list - 列出所有 toolset 及其工具(含启用状态) - 快速 * - tools.show - 列出所有工具定义(按 toolset 分组,含描述) - 快速 * - tools.configure - 启用/禁用 toolset 或 MCP server(重置 Agent 刷新工具) - 快速 * - toolsets.list - 列出所有 toolset 概览(不含工具详情) - 快速 * - reload.mcp - 重新加载所有 MCP server(shutdown → discover → refresh_tools) - 快速 System(系统管理)— 8 个方法 ------------------------------ .. list-table:: :header-rows: 1 :widths: 28 52 20 * - 方法名 - 功能 - 阻塞性 * - terminal.resize - 更新终端宽度(影响流式渲染器列宽) - 快速 * - setup.status - 检查是否有 Provider 已配置 - 快速 * - process.stop - 终止所有后台进程(process_registry.kill_all) - 快速 * - paste.collapse - 折叠粘贴的长文本(保存到 ~/.hermes/pastes/,返回占位符) - 快速 * - shell.exec - 执行 shell 命令(subprocess,30s 超时,含危险命令检测) - 快速 * - agents.list - 列出所有后台 Agent 进程 - 快速 * - plugins.list - 列出所有已加载插件 - 快速 * - insights.get - 获取使用洞察(按天数统计会话数和消息数) - 快速 Rollback(快照回滚)— 3 个方法 -------------------------------- .. list-table:: :header-rows: 1 :widths: 28 52 20 * - 方法名 - 功能 - 阻塞性 * - rollback.list - 列出所有 Git 快照 checkpoint - 快速 * - rollback.restore - 恢复到指定 checkpoint(支持 file_path 单文件回滚) - 快速 * - rollback.diff - 查看 checkpoint 的 diff(含渲染后的 inline diff) - 快速 Advanced(高级功能)— 4 个方法 -------------------------------- .. list-table:: :header-rows: 1 :widths: 28 52 20 * - 方法名 - 功能 - 阻塞性 * - browser.manage - 管理浏览器 CDP 连接(status/connect/disconnect) - 快速 * - cron.manage - 管理定时任务(list/add/remove/pause/resume) - 快速 * - skills.manage - 管理 Skill(list/search/install/browse/inspect) - 快速 * - slash.exec - 在 SlashWorker 子进程中执行 slash 命令(含副作用镜像) - **长时** 阻塞式提示机制 ================ Gateway 的阻塞式提示机制是其最精妙的设计之一。当 Agent 需要用户交互时(clarify、sudo、secret), 它会 **阻塞 Agent 线程** ,等待 TUI 前端的响应。 _block() 函数 --------------- .. code-block:: python def _block(event: str, sid: str, payload: dict, timeout: int = 300) -> str: rid = uuid.uuid4().hex[:8] # 生成唯一请求 ID ev = threading.Event() # 创建同步事件 _pending[rid] = (sid, ev) # 注册到全局等待表 payload["request_id"] = rid _emit(event, sid, payload) # 推送事件到 TUI ev.wait(timeout=timeout) # 阻塞等待 _pending.pop(rid, None) # 清理 return _answers.pop(rid, "") # 返回答案 这个函数的工作原理: 1. 生成一个唯一的 ``request_id`` (8 字符十六进制) 2. 创建一个 ``threading.Event`` 作为同步原语 3. 将 ``(session_id, Event)`` 对注册到 ``_pending`` 字典 4. 通过 ``_emit()`` 向 TUI 前端推送交互请求 5. **阻塞当前线程** ,等待 Event 被设置 6. 从 ``_answers`` 字典获取 TUI 前端的响应 .. mermaid:: :name: gateway-block-mechanism :caption: _block() 阻塞提示机制 sequenceDiagram autonumber participant Agent as Agent 线程 participant Block as _block() participant TUI as TUI 前端 participant Pending as _pending 字典 participant Answers as _answers 字典 Agent->>Block: 需要用户输入
(clarify/sudo/secret) Block->>Pending: 注册 (rid, sid, Event) Block->>TUI: 推送事件
clarify.request {question, choices, request_id} Note over Block: ev.wait() 阻塞 TUI->>TUI: 显示交互 UI TUI->>TUI: 用户做出选择 TUI->>Answers: clarify.respond {request_id, answer} Note over Answers: _answers[rid] = answer Answers->>Pending: ev.set() 唤醒 Note over Block: 阻塞解除 Block->>Answers: _answers.pop(rid) → answer Block-->>Agent: 返回用户选择 超时处理 ---------- 如果用户在超时时间内没有响应,``ev.wait()`` 会自然返回, ``_answers.pop(rid, "")`` 返回空字符串。 Agent 的回调函数会根据空字符串做出合理的默认行为: - **Clarify** :返回超时消息,Agent 自行决定 - **Sudo** :返回空密码,命令通常因权限不足而失败 - **Secret** :返回空值,标记为 "skipped" _pending 字典结构 ------------------- ``_pending`` 字典存储所有等待中的交互请求: .. code-block:: python _pending: dict[str, tuple[str, threading.Event]] = {} # request_id → (session_id, Event) ``_answers`` 字典存储已到达的响应: .. code-block:: python _answers: dict[str, str] = {} # request_id → answer_text 这两个字典是线程安全的(GIL 保护下的原子操作)。 响应路由 ---------- 当 TUI 前端发送 ``clarify.respond`` 时: .. code-block:: python def _respond(rid, params, key): r = params.get("request_id", "") entry = _pending.get(r) if not entry: return _err(rid, 4009, f"no pending {key} request") _, ev = entry _answers[r] = params.get(key, "") ev.set() # 唤醒阻塞的 Agent 线程 return _ok(rid, {"status": "ok"}) 三种 respond 方法共享同一个 ``_respond()`` 辅助函数: - ``clarify.respond`` → key = "answer" - ``sudo.respond`` → key = "password" - ``secret.respond`` → key = "value" ``approval.respond`` 是独立实现的——它不使用 ``_pending/_answers`` 机制, 而是调用 ``tools.approval.resolve_gateway_approval()`` 直接解析审批结果。 会话内存管理 ============== _sessions 字典结构 -------------------- Gateway 的所有会话状态存储在 ``_sessions`` 字典中: .. code-block:: python _sessions: dict[str, dict] = {} # session_id → session dict 每个会话字典包含: .. list-table:: 会话字典字段 :header-rows: 1 :widths: 25 25 50 * - 字段 - 类型 - 说明 * - agent - AIAgent | None - Agent 实例(延迟创建) * - agent_ready - threading.Event - Agent 初始化完成信号 * - agent_error - str | None - Agent 初始化错误消息 * - session_key - str - 数据库会话键(格式:YYYYMMDD_HHMMSS_XXXXXX) * - history - list[dict] - 对话历史(messages 数组) * - history_lock - threading.Lock - 历史读写互斥锁 * - history_version - int - 乐观并发控制版本号 * - running - bool - Agent 是否正在执行 * - cols - int - 终端宽度(用于渲染) * - slash_worker - _SlashWorker | None - Slash 命令子进程 * - attached_images - list[str] - 待附加的图片路径列表 * - image_counter - int - 图片计数器 * - edit_snapshots - dict - 工具调用 ID → LocalEditSnapshot * - tool_started_at - dict - 工具调用 ID → 开始时间戳 * - show_reasoning - bool - 是否显示推理过程 * - tool_progress_mode - str - 工具进度显示模式 Agent 实例生命周期 -------------------- Agent 的创建是异步的——``session.create`` 立即返回,Agent 在后台线程中构建: .. code-block:: python def _build() -> None: try: agent = _make_agent(sid, key) _get_db().create_session(key, source="tui", model=_resolve_model()) session["agent"] = agent worker = _SlashWorker(key, getattr(agent, "model", _resolve_model())) session["slash_worker"] = worker _wire_callbacks(sid) _emit("session.info", sid, _session_info(agent)) except Exception as e: session["agent_error"] = str(e) _emit("error", sid, {"message": f"agent init failed: {e}"}) finally: ready.set() # 通知主线程:初始化完成(无论成功还是失败) 后续的 RPC 方法(如 ``prompt.submit``)通过 ``_sess()`` 辅助函数等待 Agent 就绪: .. code-block:: python def _sess(params, rid): s, err = _sess_nowait(params, rid) return (None, err) if err else (s, _wait_agent(s, rid)) def _wait_agent(session, rid, timeout=30.0): ready = session.get("agent_ready") if ready is not None and not ready.wait(timeout=timeout): return _err(rid, 5032, "agent initialization timed out") err = session.get("agent_error") return _err(rid, 5032, err) if err else None 孤儿会话清理 -------------- 存在一个竞态条件:``session.close`` 可能在 Agent 构建线程完成之前被调用。 ``_build()`` 的 ``finally`` 块检测这种情况: .. code-block:: python finally: if _sessions.get(sid) is not session: # session.close 已经移除了这个会话 # 我们是新构建的 worker 和 notify 注册的孤儿 if worker is not None: worker.close() if notify_registered: unregister_gateway_notify(key) ready.set() 乐观并发控制 ============== history_version 机制 ---------------------- 当用户在 Agent 运行时执行 ``/undo`` 、``/compress`` 或 ``/retry`` 时, 会话历史会被外部修改。为了防止 Agent 完成后的历史写入覆盖这些修改, Gateway 使用 ``history_version`` 实现乐观并发控制: .. code-block:: python # prompt.submit 开始时 history_version = int(session.get("history_version", 0)) # Agent 完成后 with session["history_lock"]: current_version = int(session.get("history_version", 0)) if current_version == history_version: session["history"] = result["messages"] session["history_version"] = history_version + 1 else: # 历史在外部被修改了,丢弃 Agent 的输出 print("history_version mismatch — agent output NOT written", file=sys.stderr) 这个机制确保: 1. 正常流程:版本匹配,Agent 输出正常写入 2. 并发修改:版本不匹配,Agent 输出被丢弃(但仍然在 TUI 中显示),并附带警告 busy-state 守卫 ----------------- ``running`` 标志防止并发请求: .. code-block:: python with session["history_lock"]: if session.get("running"): return _err(rid, 4009, "session busy") session["running"] = True # ... Agent 执行 ... finally: session["running"] = False # 无论成功还是失败都释放 以下方法额外检查 ``running`` 状态: - ``session.undo`` :运行中拒绝执行("session busy — /interrupt first") - ``session.compress`` :同上 - ``config.set("model")`` :运行中拒绝切换模型 - ``rollback.restore`` (完整历史回滚时):运行中拒绝执行 - ``slash.exec`` 中的 ``/model`` / ``/personality`` / ``/prompt`` / ``/compress`` :运行中拒绝执行 SlashWorker 子进程 ==================== Gateway 中许多 slash 命令需要访问完整的 CLI 环境(配置、工具定义、会话状态), 但它们在 Gateway 进程中执行可能会与 Agent 的运行冲突。 解决方案是 ``_SlashWorker``——一个持久的 ``HermesCLI`` 子进程: .. code-block:: python class _SlashWorker: def __init__(self, session_key: str, model: str): argv = [sys.executable, "-m", "tui_gateway.slash_worker", "--session-key", session_key] if model: argv += ["--model", model] self.proc = subprocess.Popen( argv, stdin=subprocess.PIPE, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, bufsize=1, ) ``slash_worker.py`` 的入口逻辑非常简洁——它创建一个 ``HermesCLI`` 实例, 然后进入 stdin 循环逐行读取 JSON 请求,调用 ``cli.process_command()`` 处理, 将输出捕获为 JSON 响应写入 stdout: .. code-block:: python # slash_worker.py 核心循环 cli = HermesCLI(model=args.model or None, compact=True, resume=args.session_key, verbose=False) for raw in sys.stdin: req = json.loads(raw.strip()) out = _run(cli, req.get("command", "")) sys.stdout.write(json.dumps({"id": rid, "ok": True, "output": out}) + "\n") 通信协议 ---------- SlashWorker 使用简单的 JSON Lines 协议: **请求** (Gateway → Worker): .. code-block:: json {"id": 1, "command": "/help"} **响应** (Worker → Gateway): .. code-block:: json {"id": 1, "ok": true, "output": "Available Commands:\n..."} 错误响应: .. code-block:: json {"id": 1, "ok": false, "error": "command failed"} 序列号管理 ------------ ``_lock`` 保护的序列号确保请求-响应匹配: .. code-block:: python with self._lock: self._seq += 1 rid = self._seq self.proc.stdin.write(json.dumps({"id": rid, "command": command}) + "\n") self.proc.stdin.flush() while True: msg = self.stdout_queue.get(timeout=_SLASH_WORKER_TIMEOUT_S) if msg.get("id") != rid: continue # 忽略旧请求的响应 if not msg.get("ok"): raise RuntimeError(msg.get("error", "slash worker failed")) return str(msg.get("output", "")) 超时控制 ---------- 默认超时为 45 秒(可通过 ``HERMES_TUI_SLASH_TIMEOUT_S`` 环境变量调整)。 超时后抛出 ``RuntimeError("slash worker timed out")`` 。 Stderr 尾部追踪 ----------------- Worker 维护最近 80 行 stderr 输出: .. code-block:: python def _drain_stderr(self): for line in (self.proc.stderr or []): if text := line.rstrip("\n"): self.stderr_tail = (self.stderr_tail + [text])[-80:] 当 Worker 异常退出时,这些尾部日志会包含在错误消息中,方便调试。 生命周期管理 -------------- - **创建** :``session.create`` 时创建,或 ``_restart_slash_worker()`` 重建 - **重建** :模型切换后需要重启 Worker(因为模型参数变了) - **关闭** :``session.close`` 时调用 ``worker.close()`` (terminate + wait) - **atexit** :进程退出时自动关闭所有 Worker Agent 回调系统 ================ 当 Agent 执行工具调用、生成思维过程或需要用户交互时, 它通过一组 **回调函数** 通知 Gateway。 回调注册 ---------- ``_agent_cbs()`` 函数为每个会话创建一组回调闭包: .. code-block:: python def _agent_cbs(sid: str) -> dict: return dict( tool_start_callback=lambda tc_id, name, args: _on_tool_start(sid, tc_id, name, args), tool_complete_callback=lambda tc_id, name, args, result: _on_tool_complete(sid, tc_id, name, args, result), tool_progress_callback=lambda event_type, name=None, preview=None, args=None, **kwargs: _on_tool_progress( sid, event_type, name, preview, args, **kwargs ), tool_gen_callback=lambda name: _emit("tool.generating", sid, {"name": name}), thinking_callback=lambda text: _emit("thinking.delta", sid, {"text": text}), reasoning_callback=lambda text: _emit("reasoning.delta", sid, {"text": text}), status_callback=lambda kind, text=None: _status_update(sid, str(kind), str(text) if text else None), clarify_callback=lambda q, c: _block("clarify.request", sid, {"question": q, "choices": c}), ) 这些回调在 Agent 创建时注入: .. code-block:: python AIAgent( model=_resolve_model(), quiet_mode=True, **_agent_cbs(sid), ) tool_start / tool_complete ---------------------------- **tool_start** : 1. 捕获 ``LocalEditSnapshot`` (用于内联 diff) 2. 记录工具开始时间(用于持续时长计算) 3. 如果工具进度模式开启,推送 ``tool.start`` 事件 **tool_complete** : 1. 计算 Duration(从 ``tool_started_at`` 到当前时间) 2. 生成工具摘要(如 "Did 3 searches in 2.1s") 3. 渲染内联 diff(如果有 ``LocalEditSnapshot``) 4. 推送 ``tool.complete`` 事件,携带 duration、summary、inline_diff tool_progress 回调 -------------------- ``tool_progress_callback`` 处理多种进度事件: - ``tool.started`` :推送 ``tool.progress`` 事件(工具名称 + 预览) - ``reasoning.available`` :推送推理可用性通知 - ``subagent.*`` :推送子 Agent 的状态更新(goal、task_count、task_index、summary、duration) thinking / reasoning ---------------------- Agent 的思维过程通过两种回调推送: - ``thinking_callback`` :一般性思考文本 - ``reasoning_callback`` :推理过程的增量文本 两者都推送为 ``thinking.delta`` / ``reasoning.delta`` 事件。 TUI 前端可以选择折叠或展开这些内容。 clarify 回调 -------------- ``clarify_callback`` 通过 ``_block()`` 实现阻塞式交互: .. code-block:: python clarify_callback=lambda q, c: _block("clarify.request", sid, { "question": q, "choices": c }) Agent 线程被阻塞,直到 TUI 前端发送 ``clarify.respond`` 。 额外的回调通过 ``_wire_callbacks()`` 注册: - **sudo 密码** :通过 ``_block("sudo.request", sid, {}, timeout=120)`` - **secret 捕获** :通过 ``_block("secret.request", sid, pl)`` ,存储后返回确认 消息队列与 busy→false 转换 ============================= Agent 完成一次对话后,``running`` 标志从 ``True`` 变为 ``False`` 。 在这个转换时刻,Gateway 需要检查是否有排队的消息等待发送。 Drain 机制 ------------ 在 ``prompt.submit`` 的 ``finally`` 块中: .. code-block:: python finally: with session["history_lock"]: session["running"] = False 当 ``running`` 变为 ``False`` 时,TUI 前端可以安全地发送下一个请求。 这个设计确保: 1. Agent 运行期间的请求被拒绝(``session busy``) 2. Agent 完成后立即可以接受新请求 3. 历史写入和 running 标志更新在同一个锁下完成,保证一致性 事件顺序保证 -------------- Gateway 的事件推送遵循以下顺序: 1. ``message.start`` — 对话开始 2. ``thinking.delta`` / ``reasoning.delta`` — 思维过程(多个) 3. ``tool.start`` → ``tool.complete`` — 工具调用(多轮) 4. ``tool.progress`` — 工具进度更新(可选) 5. ``message.delta`` — 响应文本(多个,流式) 6. ``message.complete`` — 对话完成(包含 usage、status、rendered) TUI 前端可以依赖这个顺序来正确渲染对话界面。 内存与资源管理 ================ 配置缓存 ---------- ``_load_cfg()`` 使用文件修改时间(mtime)缓存配置: .. code-block:: python def _load_cfg() -> dict: global _cfg_cache, _cfg_mtime p = _hermes_home / "config.yaml" mtime = p.stat().st_mtime if p.exists() else None with _cfg_lock: if _cfg_cache is not None and _cfg_mtime == mtime: return copy.deepcopy(_cfg_cache) # ... 读取文件 ... _cfg_cache = copy.deepcopy(data) _cfg_mtime = mtime return data 这避免了每次 RPC 调用都读取和解析 YAML 文件的性能开销。 会话关闭 ---------- ``session.close`` 清理所有资源: 1. 从 ``_sessions`` 字典中移除会话 2. 注销 approval 通知(``unregister_gateway_notify``) 3. 关闭 SlashWorker 子进程 4. Agent 的 ``atexit`` 钩子负责清理终端和浏览器会话 进程退出 ---------- Gateway 注册了 atexit 处理器来清理所有 SlashWorker: .. code-block:: python atexit.register(lambda: [ s.get("slash_worker") and s["slash_worker"].close() for s in _sessions.values() ]) 同时,线程池在进程退出时被关闭: .. code-block:: python atexit.register(lambda: _pool.shutdown(wait=False, cancel_futures=True)) Platform 适配器基类 ==================== Gateway 支持多种前端平台(Telegram、Discord、Slack、Signal、Matrix、微信、飞书、 钉钉、WhatsApp、Home Assistant 等 20+ 个),每个平台通过继承 ``BasePlatformAdapter``(``gateway/platforms/base.py``)实现适配。 BasePlatformAdapter 接口 -------------------------- 基类定义了所有平台必须实现的核心接口: .. code-block:: python class BasePlatformAdapter(ABC): @abstractmethod async def connect(self) -> bool: ... @abstractmethod async def disconnect(self) -> None: ... @abstractmethod async def send(self, chat_id: str, content: str, ...) -> SendResult: ... @abstractmethod async def get_chat_info(self, chat_id: str) -> Dict[str, Any]: ... 并提供了丰富的默认实现: - **媒体发送** :``send_image()`` / ``send_voice()`` / ``send_video()`` / ``send_document()`` — 默认以文本降级,子类覆盖原生发送 - **消息编辑** :``edit_message()`` — 用于流式更新,支持 ``finalize`` 标志 - **消息分块** :``truncate_message()`` — 自动按平台长度限制分块,保持代码块完整性 - **媒体提取** :``extract_images()`` / ``extract_media()`` / ``extract_local_files()`` — 从响应文本中提取多媒体附件 - **重试机制** :``_send_with_retry()`` — 指数退避重试 + 纯文本降级 - **Typing 指示器** :``_keep_typing()`` — 持续刷新 typing 状态,支持暂停(审批等待时) - **图片缓存** :``cache_image_from_url()`` / ``cache_image_from_bytes()`` — SSRF 安全的图片下载与本地缓存 - **音频缓存** :``cache_audio_from_url()`` / ``cache_audio_from_bytes()`` — 语音消息的本地缓存 - **文档缓存** :``cache_document_from_url()`` — 支持 PDF/DOCX/XLSX 等格式 - **代理支持** :``resolve_proxy_url()`` — 自动检测 HTTPS_PROXY/HTTP_PROXY 及 macOS 系统代理 消息处理流水线 ----------------- ``handle_message()`` 是平台适配器的核心方法,它实现了完整的异步消息处理生命周期: .. mermaid:: :name: platform-handle-message :caption: 平台适配器消息处理流水线 sequenceDiagram autonumber participant P as 平台适配器 participant HM as handle_message() participant PM as _process_message_background() participant Agent as AIAgent P->>HM: MessageEvent HM->>HM: 检查 _active_sessions alt 已有活跃会话 alt 紧急命令 (/approve, /stop) HM->>PM: 直接调度 else 图片突发 HM->>HM: 合并到 _pending_messages else 普通消息 HM->>HM: 触发 interrupt HM->>HM: 排入 _pending_messages end else 无活跃会话 HM->>HM: 设置 _active_sessions guard HM->>PM: 创建后台 asyncio.Task end PM->>PM: 启动 _keep_typing PM->>Agent: _message_handler(event) Agent-->>PM: response text PM->>PM: extract_media / extract_images PM->>P: _send_with_retry (text + media) PM->>PM: 检查 _pending_messages PM->>PM: 清理 _active_sessions 平台注册与生命周期 -------------------- 所有平台适配器位于 ``gateway/platforms/`` 目录中。当前支持的平台: - ``telegram.py`` — Telegram DM / Group / Topic - ``discord.py`` — Discord(线程式对话) - ``slack.py`` — Slack(Assistant API + thread replies) - ``signal.py`` — Signal - ``matrix.py`` — Matrix - ``whatsapp.py`` — WhatsApp - ``weixin.py`` — 微信/元宝 - ``wecom.py`` — 企业微信 - ``feishu.py`` — 飞书 - ``dingtalk.py`` — 钉钉 - ``homeassistant.py`` — Home Assistant - ``email.py`` — Email - ``sms.py`` — SMS - ``webhook.py`` — 通用 Webhook - ``bluebubbles.py`` — BlueBubbles (iMessage) - ``mattermost.py`` — Mattermost - ``qqbot/`` — QQ Bot - ``api_server.py`` — HTTP/SSE API Server(Open WebUI 兼容) Stream Consumer:Agent 输出流处理 =================================== ``gateway/stream_consumer.py`` 负责处理 Agent 的输出流——将 Agent 产生的 原始 token 流转换为结构化的事件,推送给前端平台。 .. mermaid:: :name: stream-consumer-pipeline :caption: Stream Consumer 处理流水线 flowchart LR AGENT["AIAgent
输出流"] --> SC["GatewayStreamConsumer"] SC --> PARSE["Token 解析"] PARSE --> BATCH["批次聚合"] BATCH --> DISPATCH["平台分发"] DISPATCH --> TUI["TUI 前端"] DISPATCH --> API["API Server
(SSE)"] DISPATCH --> TG["Telegram"] DISPATCH --> DISC["Discord"] DISPATCH --> SLACK["Slack"] 核心设计 --------- ``GatewayStreamConsumer`` 使用 **edit transport** 模式: 先发送一条初始消息,然后通过 ``editMessageText`` 持续更新内容。 这在 Telegram、Discord、Slack 等平台上是普遍支持的。 Stream Consumer 的核心配置: .. code-block:: python @dataclass class StreamConsumerConfig: edit_interval: float = 1.0 # 编辑间隔(秒) buffer_threshold: int = 40 # 缓冲区触发阈值(字符数) cursor: str = " ▉" # 流式光标字符 buffer_only: bool = False # 仅缓冲模式 核心职责 --------- Stream Consumer 承担以下核心职责: 1. **Token 流消费** :从 Agent 的输出迭代器中逐个读取 token 2. **事件分类** :区分文本 token、工具调用、思维过程、错误等不同类型 3. **批次聚合** :将小 token 合并为适合网络传输的批次 4. **平台分发** :将处理后的事件推送到目标平台适配器 5. **分段处理** :工具调用边界触发新消息段(``_NEW_SEGMENT``),确保后续文本出现在工具进度消息下方 Stream Consumer 的设计将 Agent 的输出逻辑与平台的推送逻辑解耦——Agent 只需产生 token 流, Stream Consumer 负责将其适配到不同平台的推送协议。 API Server:HTTP/SSE 接口 =========================== API Server(``gateway/platforms/api_server.py``)是 Gateway 的 HTTP/SSE 接口层, 为 Open WebUI 等第三方前端提供 Agent 访问能力。 X-Hermes-Session-Key:长期记忆作用域 -------------------------------------- ``X-Hermes-Session-Key`` 是一个自定义 HTTP 请求头,允许外部客户端指定一个 **长期会话键** 来控制 Agent 的记忆范围: .. code-block:: http POST /v1/chat/completions HTTP/1.1 X-Hermes-Session-Key: user_20240101_primary Content-Type: application/json 其工作原理: 1. API Server 检查请求头中的 ``X-Hermes-Session-Key`` 值 2. 如果存在,使用该键查找或创建对应的持久化会话 3. Agent 加载该会话的历史记录,实现跨请求的长期记忆 4. 如果不存在,API Server 生成临时会话键(行为与之前一致) 这个特性使得 Open WebUI 等前端可以为每个用户维护独立的长期对话上下文, 而不仅仅是单次请求-响应的无状态交互。 SSE 流式输出 -------------- 针对 Open WebUI 的 SSE(Server-Sent Events)流式输出进行了优化: - **Token 批处理** :将连续的小 token 合并为较大的批次再发送,减少 SSE 事件数量和网络开销 - **错误处理** :在 SSE 流中增加结构化的错误事件,使前端能优雅地处理 Agent 异常 .. code-block:: data: {"error": {"message": "Agent timeout", "type": "agent_error"}} data: [DONE] 源码文件索引 ============== 本章涉及的主要源文件: - ``tui_gateway/entry.py`` — Gateway 入口点,stdin 读取循环,信号处理,初始化握手 - ``tui_gateway/server.py`` — 56 个 RPC 方法定义(3086 行),会话管理,阻塞提示,SlashWorker,回调系统 - ``tui_gateway/render.py`` — 流式渲染器,消息渲染,diff 渲染 - ``tui_gateway/slash_worker.py`` — SlashWorker 子进程入口(77 行) - ``gateway/platforms/base.py`` — 平台适配器基类(2343 行),消息处理流水线,媒体缓存,重试机制 - ``gateway/stream_consumer.py`` — Agent 输出流消费者,edit transport,token 批处理与平台分发 - ``gateway/platforms/api_server.py`` — HTTP/SSE API Server,X-Hermes-Session-Key 支持 - ``gateway/platforms/telegram.py`` — Telegram 平台适配器 - ``gateway/platforms/discord.py`` — Discord 平台适配器 - ``gateway/platforms/slack.py`` — Slack 平台适配器 - ``gateway/config.py`` — Gateway 配置管理,Platform/PlatformConfig 数据类 - ``gateway/session.py`` — 会话键构建,SessionSource 数据类 - ``gateway/status.py`` — 运行时状态写入,作用域锁管理