.. _chapter-session-state: 状态与会话管理:Agent 的记忆系统 ================================================ .. contents:: :depth: 3 :local: 为什么 Agent 需要持久化 ------------------------- 无状态 Agent 的局限 ~~~~~~~~~~~~~~~~~~~~~ 最简单的 Agent 实现是无状态的——每次对话都从零开始,所有历史都存在于内存中。 一旦进程退出,一切消失。对于单次问答这没什么问题,但对于一个编程助手来说, 无状态意味着: - 用户关闭终端后,Agent 忘记了之前做了什么 - 系统崩溃或网络中断后,对话无法恢复 - 无法搜索"上周那个关于数据库迁移的对话" - 多个平台(CLI、Telegram、Discord)无法共享状态 - 无法追踪 token 消耗和费用 Hermes 选择了 **SQLite 持久化** 作为解决方案, 提供了完整的会话生命周期管理、全文搜索和跨平台状态共享。 持久化的需求层次 ~~~~~~~~~~~~~~~~~~ Hermes 的持久化需求可以分为四个层次: **1. 消息存储:** 完整保存每条消息(用户、助手、工具调用、工具结果), 包括时间戳、token 计数和推理链。 **2. 会话管理:** 创建、恢复、结束、删除会话, 支持会话标题、来源标记(CLI/gateway/Telegram)和父子关系。 **3. 全文搜索:** 在所有会话的所有消息中快速搜索关键词, 支持中英日韩等 CJK 字符。 **4. 并发安全:** 多个进程(gateway + CLI + 工作树 Agent) 同时访问同一个数据库,不丢失数据也不死锁。 SessionDB 架构概览 --------------------- 技术选型:为什么是 SQLite? ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ Hermes 选择了 SQLite 而非 PostgreSQL、Redis 或文件系统,原因如下: .. list-table:: :header-rows: 1 :widths: 20 40 40 * - 特性 - SQLite - 其他方案 * - 部署 - 零配置,单个文件 - PostgreSQL 需要服务器 * - 并发读 - WAL 模式下无限并发读 - 文件系统需要锁 * - 事务 - 完整 ACID - JSONL 无事务保证 * - 全文搜索 - 内置 FTS5 - 需要外部搜索引擎 * - 跨平台 - Python 标准库自带 - 依赖外部服务 核心设计决策记录在 ``hermes_state.py`` 的模块文档中: - WAL 模式用于并发读 + 单写(gateway 多平台场景) - FTS5 虚拟表用于快速文本搜索 - ``parent_session_id`` 链支持压缩触发的会话分割 - 批量运行器和 RL 轨迹不在这里存储(独立系统) - 会话来源标记(``'cli'``, ``'telegram'``, ``'discord'`` 等)用于过滤 WAL 模式详解 ~~~~~~~~~~~~~~ SQLite 的 **WAL(Write-Ahead Logging)** 模式是整个并发架构的基础。 在默认的 rollback journal 模式下,写操作会锁定整个数据库文件, 阻止所有读操作。而在 WAL 模式下: - **写操作** 追加到 WAL 文件末尾,不修改主数据库文件 - **读操作** 直接读取主数据库文件(如果 WAL 中有未合并的修改, 读操作会透明地合并 WAL 中的修改) - **检查点(checkpoint)** 将 WAL 中的修改合并回主数据库 这意味着读写可以并发进行——写操作不阻塞读操作,读操作也不阻塞写操作。 唯一的限制是同一时刻只能有一个写操作。 SessionDB 在初始化时启用 WAL 模式: .. code-block:: python self._conn = sqlite3.connect( str(self.db_path), check_same_thread=False, timeout=1.0, isolation_level=None, # 手动管理事务 ) self._conn.execute("PRAGMA journal_mode=WAL") self._conn.execute("PRAGMA foreign_keys=ON") 注意 ``isolation_level=None``——Python 的默认隔离级别会在 DML 语句时 自动开启事务,这与我们手动的 ``BEGIN IMMEDIATE`` 冲突。 设置为 ``None`` 表示我们自己管理事务边界。 数据库 Schema 详解 -------------------- Schema 版本管理 ~~~~~~~~~~~~~~~~~ 当前 schema 版本为 **v6** ,通过 ``schema_version`` 表跟踪: .. code-block:: sql CREATE TABLE IF NOT EXISTS schema_version ( version INTEGER NOT NULL ); ``_init_schema()`` 在每次连接数据库时检查版本号, 如果低于当前版本则依次运行迁移脚本。 sessions 表 ~~~~~~~~~~~~~ ``sessions`` 表是会话的元数据中心: .. code-block:: sql CREATE TABLE IF NOT EXISTS sessions ( id TEXT PRIMARY KEY, -- UUID source TEXT NOT NULL, -- 'cli', 'telegram', 'discord', etc. user_id TEXT, -- 平台用户 ID model TEXT, -- 使用的模型名称 model_config TEXT, -- JSON: 模型配置快照 system_prompt TEXT, -- 完整的系统提示快照 parent_session_id TEXT, -- 父会话 ID(分支机制) started_at REAL NOT NULL, -- Unix 时间戳 ended_at REAL, -- 结束时间 end_reason TEXT, -- 结束原因 message_count INTEGER DEFAULT 0, -- 消息计数 tool_call_count INTEGER DEFAULT 0,-- 工具调用计数 -- Token 统计(v5 新增) input_tokens INTEGER DEFAULT 0, output_tokens INTEGER DEFAULT 0, cache_read_tokens INTEGER DEFAULT 0, cache_write_tokens INTEGER DEFAULT 0, reasoning_tokens INTEGER DEFAULT 0, -- 计费信息(v5 新增) billing_provider TEXT, billing_base_url TEXT, billing_mode TEXT, estimated_cost_usd REAL, actual_cost_usd REAL, cost_status TEXT, cost_source TEXT, pricing_version TEXT, -- 标题(v3 新增) title TEXT, FOREIGN KEY (parent_session_id) REFERENCES sessions(id) ); 关键字段说明: **id(TEXT PRIMARY KEY):** UUID 格式的会话标识符。 作为主键,它同时也是外部引用(消息表、子会话)的锚点。 **source(TEXT NOT NULL):** 会话来源平台。 在 gateway 多平台模式下,这个字段用于按平台过滤会话。 **parent_session_id(TEXT):** 实现会话分支机制的核心字段。 当上下文压缩触发会话分割时,新的会话通过这个字段关联到原始会话。 **model_config(TEXT):** JSON 格式的模型配置快照, 记录了会话开始时的模型参数(温度、top_p 等)。 **system_prompt(TEXT):** 完整的组装后的系统提示快照。 这使得会话可以被完整恢复。 索引设计 ~~~~~~~~~~ .. code-block:: sql CREATE INDEX idx_sessions_source ON sessions(source); CREATE INDEX idx_sessions_parent ON sessions(parent_session_id); CREATE INDEX idx_sessions_started ON sessions(started_at DESC); CREATE UNIQUE INDEX idx_sessions_title_unique ON sessions(title) WHERE title IS NOT NULL; 每个索引都有明确的目的: - ``idx_sessions_source`` :按平台过滤会话列表 - ``idx_sessions_parent`` :快速查找会话的所有子会话(分支遍历) - ``idx_sessions_started`` :按时间排序的会话列表(降序,最新的在前) - ``idx_sessions_title_unique`` :确保标题唯一性(部分索引,NULL 不参与唯一性检查) messages 表 ~~~~~~~~~~~~~ .. code-block:: sql CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, session_id TEXT NOT NULL REFERENCES sessions(id), role TEXT NOT NULL, -- 'system', 'user', 'assistant', 'tool' content TEXT, -- 消息正文 tool_call_id TEXT, -- 工具结果引用的调用 ID tool_calls TEXT, -- JSON: 工具调用数组 tool_name TEXT, -- 工具名称 timestamp REAL NOT NULL, -- Unix 时间戳 token_count INTEGER, -- 消息的 token 数 finish_reason TEXT, -- 'stop', 'tool_calls', etc. (v2) reasoning TEXT, -- 推理文本 (v6) reasoning_details TEXT, -- JSON: 推理细节 (v6) codex_reasoning_items TEXT -- JSON: Codex 推理项 (v6) ); CREATE INDEX idx_messages_session ON messages(session_id, timestamp); ``messages`` 表的设计关注点: **AUTOINCREMENT 主键:** 使用自增整数而非 UUID,因为消息的插入频率很高, 整数主键的插入性能更好。``lastrowid`` 被用于返回新插入消息的 ID。 **tool_calls 和 tool_name 分离:** ``tool_calls`` 是完整的 JSON 数组 (包含 id、function.name、function.arguments),而 ``tool_name`` 是从中提取的工具名称,用于快速过滤而不需要解析 JSON。 **reasoning 字段(v6 新增):** 保存助手的推理链文本和结构化推理细节。 没有这些字段,推理链在会话重新加载时会丢失, 导致多轮推理的连续性被破坏。 FTS5 全文搜索 --------------- FTS5 虚拟表结构 ~~~~~~~~~~~~~~~~~ .. code-block:: sql CREATE VIRTUAL TABLE IF NOT EXISTS messages_fts USING fts5( content, content=messages, content_rowid=id ); 这里使用了 FTS5 的 **内容表模式(content table mode)** 。 ``content=messages`` 指定 FTS 索引的内容来自 ``messages`` 表, ``content_rowid=id`` 将 FTS 的 rowid 映射到消息表的自增主键。 这种模式下,FTS 虚拟表本身不存储内容副本,只存储倒排索引, 节省了磁盘空间。 同步触发器 ~~~~~~~~~~~~ FTS5 索引通过三个触发器与 ``messages`` 表保持同步: .. code-block:: sql -- 插入时同步 CREATE TRIGGER messages_fts_insert AFTER INSERT ON messages BEGIN INSERT INTO messages_fts(rowid, content) VALUES (new.id, new.content); END; -- 删除时同步 CREATE TRIGGER messages_fts_delete AFTER DELETE ON messages BEGIN INSERT INTO messages_fts(messages_fts, rowid, content) VALUES('delete', old.id, old.content); END; -- 更新时同步(先删旧索引,再插新索引) CREATE TRIGGER messages_fts_update AFTER UPDATE ON messages BEGIN INSERT INTO messages_fts(messages_fts, rowid, content) VALUES('delete', old.id, old.content); INSERT INTO messages_fts(rowid, content) VALUES (new.id, new.content); END; 注意 FTS5 的删除操作使用特殊的语法——向虚拟表插入一行, 但第一个列名为虚拟表自身名称(``messages_fts``),值为 ``'delete'`` 。 这是 FTS5 内容表模式的要求。 查询清理器(_sanitize_fts5_query) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ FTS5 有自己的查询语法,其中 ``"`` 、``(`` 、``)`` 、``+`` 、``*`` 、 ``{`` 、``}`` 和 ``AND`` 、``OR`` 、``NOT`` 都有特殊含义。 直接将用户输入传递给 ``MATCH`` 子句可能导致 ``OperationalError`` 。 ``_sanitize_fts5_query()`` 方法通过六个步骤处理用户输入: **步骤 1:提取并保护引号短语。** 使用占位符替换 ``"..."`` 格式的精确匹配短语, 防止后续步骤破坏它们。 **步骤 2:移除未匹配的 FTS5 特殊字符。** 去除 ``+{}()\"^`` 等字符。 **步骤 3:规范化通配符。** 将连续的 ``*`` 合并为一个, 移除前导的 ``*`` (FTS5 前缀搜索至少需要一个前导字符)。 **步骤 4:移除两端悬空的布尔运算符。** ``"hello AND"`` 或 ``"OR world"`` 会导致语法错误。 **步骤 5:包装带连字符和点号的术语。** FTS5 默认分词器会在连字符和点号处分割, 导致 ``chat-send`` 变成 ``chat AND send`` ,``P2.2`` 变成 ``p2 AND 2`` 。 将它们用引号包装可以保持短语语义。 **步骤 6:恢复步骤 1 中保护的引号短语。** CJK 回退机制 ~~~~~~~~~~~~~~ FTS5 的默认分词器(tokenizer)是 ``unicode61`` , 它按 Unicode 单词边界分词。对于 CJK 字符, 这个分词器会将每个字符视为一个独立的词元(token)。 这意味着搜索"数据库迁移"会被分解为"数"、"据"、"库"、"迁"、"移"五个独立的词元, 返回包含其中任何一个字符的所有结果。 ``search_messages()`` 方法实现了 CJK 回退: .. code-block:: python # 首先尝试 FTS5 搜索 try: cursor = self._conn.execute(sql, params) except sqlite3.OperationalError: if not self._contains_cjk(query): return [] # 非 CJK 的语法错误直接返回空 matches = [] else: matches = [dict(row) for row in cursor.fetchall()] # 如果 FTS5 无结果且查询包含 CJK,回退到 LIKE if not matches and self._contains_cjk(query): raw_query = query.strip('"').strip() like_sql = """ SELECT ... FROM messages m ... WHERE m.content LIKE ? ... """ like_params = [f"%{raw_query}%"] ``_contains_cjk()`` 通过 Unicode 范围检查判断文本是否包含 CJK 字符: .. code-block:: python @staticmethod def _contains_cjk(text): for ch in text: cp = ord(ch) if (0x4E00 <= cp <= 0x9FFF or # CJK Unified Ideographs 0x3400 <= cp <= 0x4DBF or # CJK Extension A 0x20000 <= cp <= 0x2A6DF or # CJK Extension B 0x3000 <= cp <= 0x303F or # CJK Symbols 0x3040 <= cp <= 0x309F or # Hiragana 0x30A0 <= cp <= 0x30FF or # Katakana 0xAC00 <= cp <= 0xD7AF): # Hangul Syllables return True return False 搜索结果包含上下文(每条匹配消息前后各一条), 使用 FTS5 的 ``snippet()`` 函数生成高亮摘要: .. code-block:: sql snippet(messages_fts, 0, '>>>', '<<<', '...', 40) AS snippet 这会在匹配文本前后添加 ``>>>`` 和 ``<<<`` 标记, 上下文窗口为 40 个字符。 FTS5 搜索流程(含 CJK 回退): .. mermaid:: :name: fig-fts5-search-flow :caption: FTS5 全文搜索流程(含 CJK 回退) flowchart TD A["用户输入搜索查询"] --> B["_sanitize_fts5_query()"] B --> C["构建 FTS5 MATCH 查询"] C --> D{"执行 FTS5 查询"} D -- 成功 --> E{"有结果?"} D -- OperationalError --> F{"查询含 CJK?"} F -- 否 --> G["返回空列表"] F -- 是 --> H["matches = []"] E -- 是 --> K["添加上下文消息"] E -- 否 --> I{"查询含 CJK?"} H --> I I -- 否 --> G I -- 是 --> J["LIKE 回退搜索
%query%"] J --> K K --> L["返回结果
(含 snippet + 上下文)"] class A start class L success class G fail class J warn class B,C,H,K info classDef start fill:#dbeafe,stroke:#3b82f6,color:#1e3a8a classDef success fill:#dcfce7,stroke:#16a34a,color:#166534 classDef warn fill:#fef9c3,stroke:#ca8a04,color:#854d0e classDef fail fill:#fee2e2,stroke:#dc2626,color:#991b1b classDef info fill:#f1f5f9,stroke:#64748b,color:#334155 写入并发控制 -------------- 问题:多进程写竞争 ~~~~~~~~~~~~~~~~~~~~ 在 gateway 模式下,多个 Hermes 进程同时运行: - Gateway 主进程处理来自多个平台的请求 - CLI 会话独立连接数据库 - 工作树 Agent 在并行分支中工作 所有这些进程共享同一个 ``state.db`` 文件。 SQLite 的 WAL 模式允许并发读,但写操作仍然需要排他锁。 SQLite 内置的忙等待处理器使用**确定性的** 退避策略—— 固定的时间间隔重试。在高并发场景下,多个写入者会在相同的时间点重试, 形成**护航效应(convoy effect)** ,导致 TUI 冻结和响应延迟。 解决方案:BEGIN IMMEDIATE + 随机抖动 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ SessionDB 使用**应用层重试 + 随机抖动** 来打破护航效应: .. code-block:: python _WRITE_MAX_RETRIES = 15 _WRITE_RETRY_MIN_S = 0.020 # 20ms _WRITE_RETRY_MAX_S = 0.150 # 150ms _CHECKPOINT_EVERY_N_WRITES = 50 def _execute_write(self, fn): last_err = None for attempt in range(self._WRITE_MAX_RETRIES): try: with self._lock: self._conn.execute("BEGIN IMMEDIATE") try: result = fn(self._conn) self._conn.commit() except BaseException: self._conn.rollback() raise # 成功后周期性检查点 self._write_count += 1 if self._write_count % self._CHECKPOINT_EVERY_N_WRITES == 0: self._try_wal_checkpoint() return result except sqlite3.OperationalError as exc: err_msg = str(exc).lower() if "locked" in err_msg or "busy" in err_msg: last_err = exc if attempt < self._WRITE_MAX_RETRIES - 1: jitter = random.uniform( self._WRITE_RETRY_MIN_S, self._WRITE_RETRY_MAX_S, ) time.sleep(jitter) continue raise raise last_err or sqlite3.OperationalError("database is locked") 关键设计点: **BEGIN IMMEDIATE:** 在事务开始时立即获取写锁, 而不是等到第一次写操作时。这使得锁竞争在事务开始时就显现, 而不是在事务中间——减少了死锁的风险。 **threading.Lock + SQLite 锁:** Python 的 ``threading.Lock`` 确保同一个进程内的多个线程不会同时尝试写入。 SQLite 的 WAL 写锁处理不同进程之间的竞争。 **随机抖动:** ``random.uniform(20ms, 150ms)`` 的随机等待时间 使得竞争的写入者不会在同一时刻重试。 **15 次重试上限:** 最多重试 15 次,每次最长等待 150ms, 总等待时间上限为 15 * 150ms = 2.25 秒。 如果 15 次都失败,抛出异常。 **超时设置为 1 秒:** SQLite 连接的 ``timeout=1.0`` 是短超时—— 如果 SQLite 内部的忙等待在 1 秒内无法获取锁,立即返回错误, 由应用层重试处理。 WAL 检查点 ~~~~~~~~~~~~ 每 50 次成功的写操作后,执行一次 **PASSIVE 检查点** : .. code-block:: python def _try_wal_checkpoint(self): try: with self._lock: result = self._conn.execute( "PRAGMA wal_checkpoint(PASSIVE)" ).fetchone() except Exception: pass # 尽力而为,永远不会致命 PASSIVE 模式不会阻塞——它只合并那些没有其他连接正在使用的 WAL 帧。 如果检查点无法合并某些帧(因为其他连接正在使用),它会静默跳过, 而不是等待。 这个机制防止了 WAL 文件在长时间运行中无限增长。 在 ``close()`` 方法中也执行了一次检查点, 确保退出的进程帮助保持 WAL 文件的合理大小。 写入并发时序图: .. mermaid:: :name: fig-write-concurrency :caption: 写入并发控制时序图 sequenceDiagram autonumber participant T1 as 线程 1 participant Lock as threading.Lock participant DB as SQLite (WAL) participant T2 as 线程 2 T1->>Lock: acquire() T1->>DB: BEGIN IMMEDIATE T1->>DB: INSERT INTO messages ... T1->>DB: COMMIT T1->>Lock: release() Note over T2: T2 在 T1 持有锁期间尝试写入 T2->>Lock: acquire() — 阻塞 T1->>Lock: release() T2->>Lock: acquire() — 成功 T2->>DB: BEGIN IMMEDIATE T2->>DB: INSERT INTO messages ... T2->>DB: COMMIT T2->>Lock: release() Note over T1,T2: 不同进程的竞争由 SQLite WAL 处理 rect rgb(255, 230, 230) Note over DB: Process A: BEGIN IMMEDIATE holds WAL write lock Note over DB: Process B: BEGIN IMMEDIATE - database is locked Note over DB: Process B: sleep random 20-150ms Note over DB: Process B: retry BEGIN IMMEDIATE - success end 会话生命周期 -------------- SessionDB 管理的会话有完整的状态机,从创建到删除。 创建会话(create_session) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. code-block:: python def create_session(self, session_id, source, model=None, model_config=None, system_prompt=None, user_id=None, parent_session_id=None): def _do(conn): conn.execute( """INSERT OR IGNORE INTO sessions (id, source, user_id, model, model_config, system_prompt, parent_session_id, started_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?)""", (session_id, source, user_id, model, json.dumps(model_config) if model_config else None, system_prompt, parent_session_id, time.time()), ) self._execute_write(_do) return session_id 使用 ``INSERT OR IGNORE`` 而非 ``INSERT`` — 如果会话 ID 已存在则静默忽略。 这使得 ``create_session`` 是幂等的,调用者不需要先检查会话是否存在。 追加消息(append_message) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. code-block:: python def append_message(self, session_id, role, content=None, tool_name=None, tool_calls=None, tool_call_id=None, token_count=None, finish_reason=None, reasoning=None, reasoning_details=None, codex_reasoning_items=None): # 预序列化 JSON 字段 tool_calls_json = json.dumps(tool_calls) if tool_calls else None reasoning_details_json = json.dumps(reasoning_details) if reasoning_details else None codex_items_json = json.dumps(codex_reasoning_items) if codex_reasoning_items else None def _do(conn): cursor = conn.execute( """INSERT INTO messages (...) VALUES (...)""", (session_id, role, content, ...), ) msg_id = cursor.lastrowid # 更新计数器 conn.execute( "UPDATE sessions SET message_count = message_count + 1, ...") return msg_id return self._execute_write(_do) 关键细节:JSON 序列化在进入写事务**之前** 完成。 这减少了在持有写锁时的工作量,缩短了锁持有时间,降低了竞争。 结束会话(end_session) ~~~~~~~~~~~~~~~~~~~~~~~~~ .. code-block:: python def end_session(self, session_id, end_reason): def _do(conn): conn.execute( "UPDATE sessions SET ended_at = ?, end_reason = ? WHERE id = ?", (time.time(), end_reason, session_id), ) self._execute_write(_do) ``end_reason`` 的典型值包括:``"user_exit"`` 、``"timeout"`` 、 ``"reset"`` 、``"error"`` 等。 重新打开会话(reopen_session) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. code-block:: python def reopen_session(self, session_id): def _do(conn): conn.execute( "UPDATE sessions SET ended_at = NULL, end_reason = NULL WHERE id = ?", (session_id,), ) self._execute_write(_do) 清除结束时间和原因,使已结束的会话可以恢复。 确保会话存在(ensure_session) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. code-block:: python def ensure_session(self, session_id, source="unknown", model=None): def _do(conn): conn.execute( """INSERT OR IGNORE INTO sessions (id, source, model, started_at) VALUES (?, ?, ?, ?)""", (session_id, source, model, time.time()), ) self._execute_write(_do) 这是一个**恢复机制** 。如果在 Agent 启动时 ``create_session()`` 因为 瞬时的 SQLite 锁而失败,后续的 ``append_message()`` 可以调用 ``ensure_session()`` 来确保会话行存在。 删除会话(delete_session) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. code-block:: python def delete_session(self, session_id): def _do(conn): # 先孤立子会话 conn.execute( "UPDATE sessions SET parent_session_id = NULL " "WHERE parent_session_id = ?", (session_id,)) # 删除消息 conn.execute("DELETE FROM messages WHERE session_id = ?", (session_id,)) # 删除会话 conn.execute("DELETE FROM sessions WHERE id = ?", (session_id,)) return True return self._execute_write(_do) **子会话不会被级联删除** ,而是被孤立(``parent_session_id = NULL``)。 这确保了压缩产生的分支会话不会因为原始会话被删除而消失。 修剪旧会话(prune_sessions) ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ .. code-block:: python def prune_sessions(self, older_than_days=90, source=None): cutoff = time.time() - (older_than_days * 86400) # 只删除已结束的会话 # 先孤立子会话 # 然后逐个删除 默认清理 90 天以前的已结束会话。**活跃会话不会被删除** , 无论多老。 会话生命周期状态图: .. mermaid:: :name: fig-session-lifecycle :caption: 会话生命周期状态图 stateDiagram-v2 [*] --> Created : create_session() Created --> Active : 第一条消息追加 Active --> Active : append_message() Active --> Active : update_token_counts() Active --> Active : set_session_title() Active --> Ended : end_session() Ended --> Reopened : reopen_session() Reopened --> Active : append_message() Active --> Branched : 上下文压缩触发分割 Branched --> Active : 新会话继续 Ended --> Deleted : delete_session() Active --> Deleted : delete_session() Deleted --> [*] 会话分支机制 -------------- 分支的设计动机 ~~~~~~~~~~~~~~~~ 当上下文压缩发生时,Hermes 有两种选择: 1. 在同一个会话中替换消息历史(当前方案) 2. 创建一个新会话,通过 ``parent_session_id`` 关联(可选方案) 当前实现主要使用方案 1(在同一个会话中压缩), 但 ``parent_session_id`` 基础设施已经就绪,支持以下场景: - 子 Agent 任务(通过 ``delegate_task`` 工具创建) - 压缩触发的会话分割 - 用户手动创建的分支 标题的血脉继承 ~~~~~~~~~~~~~~~~ 会话分支在标题层面有特殊的继承规则。 **``get_next_title_in_lineage()``** 生成血脉中的下一个标题: .. code-block:: python def get_next_title_in_lineage(self, base_title): # 去除已有的 #N 后缀 match = re.match(r'^(.*?) #(\d+)$', base_title) if match: base = match.group(1) else: base = base_title # 查找所有现有的编号变体 existing = [查询 base 和 base #N 的所有标题] # 找到最大编号,+1 max_num = 1 # 未编号的原始标题算 #1 for t in existing: m = re.match(r'^.* #(\d+)$', t) if m: max_num = max(max_num, int(m.group(1))) return f"{base} #{max_num + 1}" 例如:"数据库重构" → "数据库重构 #2" → "数据库重构 #3"。 **``resolve_session_by_title()``** 在查找标题时优先返回最新的编号变体: .. code-block:: python def resolve_session_by_title(self, title): exact = self.get_session_by_title(title) numbered = [查询 "title #N" 的所有变体] if numbered: return numbered[0]["id"] # 返回最新的编号变体 elif exact: return exact["id"] return None 标题验证与清理 ~~~~~~~~~~~~~~~~ ``sanitize_title()`` 方法对标题进行严格清理: - 去除 ASCII 控制字符(0x00-0x1F, 0x7F) - 去除零宽字符(U+200B-U+200F, U+FEFF) - 去除方向覆盖(U+202A-U+202E, U+2066-U+2069) - 合并内部空白为单个空格 - 长度上限 100 字符 - 空白字符串规范化为 ``None`` 标题唯一性由部分唯一索引保证: .. code-block:: sql CREATE UNIQUE INDEX idx_sessions_title_unique ON sessions(title) WHERE title IS NOT NULL Token 计数双模式 ------------------ 增量模式 vs. 绝对模式 ~~~~~~~~~~~~~~~~~~~~~~~ ``update_token_counts()`` 支持两种更新模式: **增量模式(absolute=False,默认):** 每次 API 调用后累加 delta 值。 .. code-block:: sql UPDATE sessions SET input_tokens = input_tokens + ?, output_tokens = output_tokens + ?, ... WHERE id = ? **绝对模式(absolute=True):** 直接设置累计值。 .. code-block:: sql UPDATE sessions SET input_tokens = ?, output_tokens = ?, ... WHERE id = ? 为什么需要两种模式?因为 Hermes 有两种运行模式: **CLI 模式:** 每次只看到当前 API 调用的 delta,使用增量模式。 **Gateway 模式:** 缓存的 Agent 在内存中维护累计计数, 通过 ``update_token_counts(absolute=True)`` 一次性设置总值。 计费信息 ~~~~~~~~~~ ``update_token_counts()`` 还更新计费相关字段: - ``billing_provider`` :计费提供商(如 "openrouter") - ``billing_base_url`` :API 基础 URL - ``billing_mode`` :计费模式 - ``estimated_cost_usd`` :估算费用(美元) - ``actual_cost_usd`` :实际费用(来自 API 响应) - ``cost_status`` :费用状态("estimated"、"final") - ``cost_source`` :费用来源 - ``pricing_version`` :定价版本 所有计费字段使用 ``COALESCE`` 模式——只有在提供新值时才更新: .. code-block:: sql billing_provider = COALESCE(billing_provider, ?) 这意味着首次设置后,除非显式提供新值,否则不会被覆盖。 消息加载 ---------- 两种加载格式 ~~~~~~~~~~~~~~ SessionDB 提供两种消息加载方法: **``get_messages()`` :** 返回数据库行格式的字典列表, 所有字段都保留(包括 id、session_id、timestamp 等)。 适合内部处理和数据导出。 **``get_messages_as_conversation()`` :** 返回 OpenAI 对话格式的消息列表, 只包含 ``role`` 、``content`` 和工具相关字段。 适合网关恢复对话历史。 还原时的反序列化 ~~~~~~~~~~~~~~~~~~ ``tool_calls`` 字段在数据库中以 JSON 字符串存储。 加载时需要反序列化: .. code-block:: python if msg.get("tool_calls"): try: msg["tool_calls"] = json.loads(msg["tool_calls"]) except (json.JSONDecodeError, TypeError): logger.warning("Failed to deserialize tool_calls, falling back to []") msg["tool_calls"] = [] v6 新增的 reasoning 字段同样需要反序列化: .. code-block:: python if row["role"] == "assistant": if row["reasoning"]: msg["reasoning"] = row["reasoning"] if row["reasoning_details"]: try: msg["reasoning_details"] = json.loads(row["reasoning_details"]) except (json.JSONDecodeError, TypeError): msg["reasoning_details"] = None 反序列化失败时的策略是**降级而非崩溃**——返回空列表或 None, 让对话可以继续,而不是因为一条损坏的消息而中断整个会话。 会话列表与富预览 ------------------ ``list_sessions_rich()`` 提供了带预览的会话列表。 单查询优化 ~~~~~~~~~~~~ 早期版本使用 N+2 查询(1 次查询会话列表 + N 次查询预览 + N 次查询最后活跃时间), 性能很差。当前版本使用**单个查询 + 关联子查询** : .. code-block:: sql SELECT s.*, COALESCE( (SELECT SUBSTR(REPLACE(REPLACE(m.content, X'0A', ' '), X'0D', ' '), 1, 63) FROM messages m WHERE m.session_id = s.id AND m.role = 'user' AND m.content IS NOT NULL ORDER BY m.timestamp, m.id LIMIT 1), '' ) AS _preview_raw, COALESCE( (SELECT MAX(m2.timestamp) FROM messages m2 WHERE m2.session_id = s.id), s.started_at ) AS last_active FROM sessions s ... 预览取第一条用户消息的前 63 个字符(换行替换为空格), ``last_active`` 取最后一条消息的时间戳。 子会话过滤 ~~~~~~~~~~~~ 默认情况下,子会话(子 Agent 任务、压缩分支)被排除: .. code-block:: python if not include_children: where_clauses.append("s.parent_session_id IS NULL") 这避免了会话列表被大量的子 Agent 任务淹没。 ID 解析 --------- SessionDB 支持多种 ID 解析方式: **精确 ID:** 直接通过 ``get_session()`` 查找。 **前缀匹配:** ``resolve_session_id()`` 支持使用 UUID 的前缀来查找: .. code-block:: python def resolve_session_id(self, session_id_or_prefix): exact = self.get_session(session_id_or_prefix) if exact: return exact["id"] # 模糊匹配 cursor = self._conn.execute( "SELECT id FROM sessions WHERE id LIKE ? ESCAPE '\\' " "ORDER BY started_at DESC LIMIT 2", (f"{escaped}%",), ) matches = [row["id"] for row in cursor.fetchall()] if len(matches) == 1: return matches[0] # 唯一匹配 return None # 无匹配或歧义匹配 注意 ``LIKE`` 的通配符(``%`` 、``_``)在用户输入中被转义, 防止恶意的模式匹配。 **标题解析:** ``resolve_session_by_title()`` 通过标题或标题变体查找。 Schema 迁移链 v1→v6 ---------------------- 迁移策略 ~~~~~~~~~~ SessionDB 使用**顺序迁移** 策略——从当前版本开始, 依次应用每个版本的迁移脚本: .. code-block:: python def _init_schema(self): cursor.execute("SELECT version FROM schema_version LIMIT 1") row = cursor.fetchone() if row is None: cursor.execute("INSERT INTO schema_version (version) VALUES (?)", (SCHEMA_VERSION,)) else: current_version = row["version"] if current_version < 2: # v2 迁移 if current_version < 3: # v3 迁移 if current_version < 4: # v4 迁移 ... 每个迁移都在 ``try/except`` 中执行——如果列或索引已存在,静默跳过。 这使得迁移是**幂等的** 。 迁移历史 ~~~~~~~~~~ **v1 → v2:** 为 ``messages`` 表添加 ``finish_reason TEXT`` 列。 记录模型停止的原因(``"stop"`` 、``"tool_calls"`` 等)。 **v2 → v3:** 为 ``sessions`` 表添加 ``title TEXT`` 列。 支持用户为会话设置可读的标题。 **v3 → v4:** 创建 ``title`` 的唯一索引(部分索引,``WHERE title IS NOT NULL``)。 确保标题不重复。 **v4 → v5:** 批量添加计费和 token 相关列。 .. code-block:: python new_columns = [ ("cache_read_tokens", "INTEGER DEFAULT 0"), ("cache_write_tokens", "INTEGER DEFAULT 0"), ("reasoning_tokens", "INTEGER DEFAULT 0"), ("billing_provider", "TEXT"), ("billing_base_url", "TEXT"), ("billing_mode", "TEXT"), ("estimated_cost_usd", "REAL"), ("actual_cost_usd", "REAL"), ("cost_status", "TEXT"), ("cost_source", "TEXT"), ("pricing_version", "TEXT"), ] 列名通过 ``replace('"', '""')`` 进行了双引号转义—— 虽然列名来自硬编码的元组而非用户输入,但这是防御性编程的一环。 **v5 → v6:** 为 ``messages`` 表添加推理链相关列。 .. code-block:: python for col_name, col_type in [ ("reasoning", "TEXT"), ("reasoning_details", "TEXT"), ("codex_reasoning_items", "TEXT"), ]: cursor.execute(f'ALTER TABLE messages ADD COLUMN "{safe}" {col_type}') 没有这些列,推理链在会话重新加载时会丢失, 导致 OpenRouter、OpenAI、Nous 等提供者的多轮推理连续性被破坏。 Schema 迁移流程图: .. mermaid:: :name: fig-schema-migration :caption: Schema 迁移流程图 (v1-v6) flowchart TD A["_init_schema()"] --> B["创建基础表
(sessions, messages, indexes)"] B --> C{"schema_version
表有记录?"} C -- 否 --> D["插入当前版本号"] C -- 是 --> E["读取 current_version"] E --> F{"< v2?"} F -- 是 --> G["ALTER messages
ADD finish_reason"] G --> H F -- 否 --> H{"< v3?"} H -- 是 --> I["ALTER sessions
ADD title"] I --> J H -- 否 --> J{"< v4?"} J -- 是 --> K["CREATE UNIQUE INDEX
idx_sessions_title_unique"] K --> L J -- 否 --> L{"< v5?"} L -- 是 --> M["批量添加
billing/token 列
(10 列)"] M --> N L -- 否 --> N{"< v6?"} N -- 是 --> O["添加 reasoning 列
(3 列)"] O --> P N -- 否 --> P["确保 title 唯一索引"] P --> Q["初始化 FTS5
(虚拟表 + 触发器)"] Q --> R["COMMIT"] D --> Q class A start class R success class G,I,K,M,O info class B,Q info classDef start fill:#dbeafe,stroke:#3b82f6,color:#1e3a8a classDef success fill:#dcfce7,stroke:#16a34a,color:#166534 classDef warn fill:#fef9c3,stroke:#ca8a04,color:#854d0e classDef fail fill:#fee2e2,stroke:#dc2626,color:#991b1b classDef info fill:#f1f5f9,stroke:#64748b,color:#334155 轨迹记录 ---------- 轨迹格式 ~~~~~~~~~~ ``agent/trajectory.py`` 提供了独立的轨迹保存功能, 与 SessionDB 的会话存储互不干扰。 .. code-block:: python def save_trajectory(trajectory, model, completed, filename=None): if filename is None: filename = "trajectory_samples.jsonl" if completed \ else "failed_trajectories.jsonl" entry = { "conversations": trajectory, "timestamp": datetime.now().isoformat(), "model": model, "completed": completed, } with open(filename, "a", encoding="utf-8") as f: f.write(json.dumps(entry, ensure_ascii=False) + "\n") 轨迹以 JSONL 格式追加到文件中,区分成功和失败的对话。 ``ensure_ascii=False`` 保留了 CJK 字符和 emoji。 Scratchpad 转换 ~~~~~~~~~~~~~~~~~ ``trajectory.py`` 还包含两个辅助函数: **``convert_scratchpad_to_think()`` :** 将 ```` 标签 转换为 ```` 标签——这在流式输出被截断时可能发生。 会话 ID 解析 -------------- SessionDB 支持灵活的会话 ID 解析策略: **精确 UUID 匹配:** ``get_session(session_id)`` 直接按主键查找。 **UUID 前缀匹配:** ``resolve_session_id(prefix)`` 使用 ``LIKE 'prefix%'`` 查找以给定前缀开头的唯一会话。如果前缀匹配多个会话,返回 ``None`` 。 **标题精确匹配:** ``get_session_by_title(title)`` 按标题精确查找。 **标题血脉解析:** ``resolve_session_by_title(title)`` 优先返回最新的 编号变体(例如 "项目 #3" 优先于 "项目")。 **标题安全清理:** ``sanitize_title(title)`` 去除控制字符、零宽字符等。 导出与备份 ------------ 单会话导出 ~~~~~~~~~~~~ .. code-block:: python def export_session(self, session_id): session = self.get_session(session_id) if not session: return None messages = self.get_messages(session_id) return {**session, "messages": messages} 全量导出 ~~~~~~~~~~ .. code-block:: python def export_all(self, source=None): sessions = self.search_sessions(source=source, limit=100000) results = [] for session in sessions: messages = self.get_messages(session["id"]) results.append({**session, "messages": messages}) return results 适合写入 JSONL 文件进行备份和分析。 消息清除 ~~~~~~~~~~ .. code-block:: python def clear_messages(self, session_id): def _do(conn): conn.execute("DELETE FROM messages WHERE session_id = ?", (session_id,)) conn.execute( "UPDATE sessions SET message_count = 0, tool_call_count = 0 " "WHERE id = ?", (session_id,)) self._execute_write(_do) 清除消息但保留会话记录本身——相当于"重置对话历史"。 配置参数速查表 ---------------- .. list-table:: :header-rows: 1 :widths: 35 25 40 * - 参数名 - 默认值 - 说明 * - ``SCHEMA_VERSION`` - 6 - 当前数据库 schema 版本 * - ``_WRITE_MAX_RETRIES`` - 15 - 写入重试最大次数 * - ``_WRITE_RETRY_MIN_S`` - 0.020 - 重试最小等待时间(秒) * - ``_WRITE_RETRY_MAX_S`` - 0.150 - 重试最大等待时间(秒) * - ``_CHECKPOINT_EVERY_N_WRITES`` - 50 - 每 N 次写入执行 WAL 检查点 * - ``MAX_TITLE_LENGTH`` - 100 - 标题最大长度(字符) * - SQLite timeout - 1.0 - SQLite 内部忙等待超时(秒) 总结 ------ Hermes 的会话状态系统是一个经过深思熟虑的持久化架构: - **SQLite + WAL** 提供了零部署、高并发的存储基础 - **Schema 迁移链** 确保了数据库可以平滑升级 - **FTS5 + CJK 回退** 提供了跨语言的全文搜索能力 - **BEGIN IMMEDIATE + 随机抖动** 打破了多进程写竞争的护航效应 - **会话分支和标题血脉** 支持了复杂的会话管理场景 - **双模式 Token 计数** 适配了 CLI 和 Gateway 两种运行模式 - **防御性反序列化** 确保了即使数据损坏,会话也能继续 这个系统是 Hermes 能够在多平台、多进程环境下稳定运行的基石。