7. 状态与会话管理:Agent 的记忆系统

7.1. 为什么 Agent 需要持久化

7.1.1. 无状态 Agent 的局限

最简单的 Agent 实现是无状态的——每次对话都从零开始,所有历史都存在于内存中。 一旦进程退出,一切消失。对于单次问答这没什么问题,但对于一个编程助手来说, 无状态意味着:

  • 用户关闭终端后,Agent 忘记了之前做了什么

  • 系统崩溃或网络中断后,对话无法恢复

  • 无法搜索"上周那个关于数据库迁移的对话"

  • 多个平台(CLI、Telegram、Discord)无法共享状态

  • 无法追踪 token 消耗和费用

Hermes 选择了 SQLite 持久化 作为解决方案, 提供了完整的会话生命周期管理、全文搜索和跨平台状态共享。

7.1.2. 持久化的需求层次

Hermes 的持久化需求可以分为四个层次:

1. 消息存储: 完整保存每条消息(用户、助手、工具调用、工具结果), 包括时间戳、token 计数和推理链。

2. 会话管理: 创建、恢复、结束、删除会话, 支持会话标题、来源标记(CLI/gateway/Telegram)和父子关系。

3. 全文搜索: 在所有会话的所有消息中快速搜索关键词, 支持中英日韩等 CJK 字符。

4. 并发安全: 多个进程(gateway + CLI + 工作树 Agent) 同时访问同一个数据库,不丢失数据也不死锁。

7.2. SessionDB 架构概览

7.2.1. 技术选型:为什么是 SQLite?

Hermes 选择了 SQLite 而非 PostgreSQL、Redis 或文件系统,原因如下:

特性

SQLite

其他方案

部署

零配置,单个文件

PostgreSQL 需要服务器

并发读

WAL 模式下无限并发读

文件系统需要锁

事务

完整 ACID

JSONL 无事务保证

全文搜索

内置 FTS5

需要外部搜索引擎

跨平台

Python 标准库自带

依赖外部服务

核心设计决策记录在 hermes_state.py 的模块文档中:

  • WAL 模式用于并发读 + 单写(gateway 多平台场景)

  • FTS5 虚拟表用于快速文本搜索

  • parent_session_id 链支持压缩触发的会话分割

  • 批量运行器和 RL 轨迹不在这里存储(独立系统)

  • 会话来源标记('cli', 'telegram', 'discord' 等)用于过滤

7.2.2. WAL 模式详解

SQLite 的 WAL(Write-Ahead Logging) 模式是整个并发架构的基础。

在默认的 rollback journal 模式下,写操作会锁定整个数据库文件, 阻止所有读操作。而在 WAL 模式下:

  • 写操作 追加到 WAL 文件末尾,不修改主数据库文件

  • 读操作 直接读取主数据库文件(如果 WAL 中有未合并的修改, 读操作会透明地合并 WAL 中的修改)

  • 检查点(checkpoint) 将 WAL 中的修改合并回主数据库

这意味着读写可以并发进行——写操作不阻塞读操作,读操作也不阻塞写操作。 唯一的限制是同一时刻只能有一个写操作。

SessionDB 在初始化时启用 WAL 模式:

self._conn = sqlite3.connect(
    str(self.db_path),
    check_same_thread=False,
    timeout=1.0,
    isolation_level=None,  # 手动管理事务
)
self._conn.execute("PRAGMA journal_mode=WAL")
self._conn.execute("PRAGMA foreign_keys=ON")

注意 isolation_level=None——Python 的默认隔离级别会在 DML 语句时 自动开启事务,这与我们手动的 BEGIN IMMEDIATE 冲突。 设置为 None 表示我们自己管理事务边界。

7.3. 数据库 Schema 详解

7.3.1. Schema 版本管理

当前 schema 版本为 v6 ,通过 schema_version 表跟踪:

CREATE TABLE IF NOT EXISTS schema_version (
    version INTEGER NOT NULL
);

_init_schema() 在每次连接数据库时检查版本号, 如果低于当前版本则依次运行迁移脚本。

7.3.2. sessions 表

sessions 表是会话的元数据中心:

CREATE TABLE IF NOT EXISTS sessions (
    id TEXT PRIMARY KEY,              -- UUID
    source TEXT NOT NULL,             -- 'cli', 'telegram', 'discord', etc.
    user_id TEXT,                     -- 平台用户 ID
    model TEXT,                       -- 使用的模型名称
    model_config TEXT,                -- JSON: 模型配置快照
    system_prompt TEXT,               -- 完整的系统提示快照
    parent_session_id TEXT,           -- 父会话 ID(分支机制)
    started_at REAL NOT NULL,         -- Unix 时间戳
    ended_at REAL,                    -- 结束时间
    end_reason TEXT,                  -- 结束原因
    message_count INTEGER DEFAULT 0,  -- 消息计数
    tool_call_count INTEGER DEFAULT 0,-- 工具调用计数
    -- Token 统计(v5 新增)
    input_tokens INTEGER DEFAULT 0,
    output_tokens INTEGER DEFAULT 0,
    cache_read_tokens INTEGER DEFAULT 0,
    cache_write_tokens INTEGER DEFAULT 0,
    reasoning_tokens INTEGER DEFAULT 0,
    -- 计费信息(v5 新增)
    billing_provider TEXT,
    billing_base_url TEXT,
    billing_mode TEXT,
    estimated_cost_usd REAL,
    actual_cost_usd REAL,
    cost_status TEXT,
    cost_source TEXT,
    pricing_version TEXT,
    -- 标题(v3 新增)
    title TEXT,
    FOREIGN KEY (parent_session_id) REFERENCES sessions(id)
);

关键字段说明:

id(TEXT PRIMARY KEY): UUID 格式的会话标识符。 作为主键,它同时也是外部引用(消息表、子会话)的锚点。

source(TEXT NOT NULL): 会话来源平台。 在 gateway 多平台模式下,这个字段用于按平台过滤会话。

parent_session_id(TEXT): 实现会话分支机制的核心字段。 当上下文压缩触发会话分割时,新的会话通过这个字段关联到原始会话。

model_config(TEXT): JSON 格式的模型配置快照, 记录了会话开始时的模型参数(温度、top_p 等)。

system_prompt(TEXT): 完整的组装后的系统提示快照。 这使得会话可以被完整恢复。

7.3.3. 索引设计

CREATE INDEX idx_sessions_source ON sessions(source);
CREATE INDEX idx_sessions_parent ON sessions(parent_session_id);
CREATE INDEX idx_sessions_started ON sessions(started_at DESC);
CREATE UNIQUE INDEX idx_sessions_title_unique
    ON sessions(title) WHERE title IS NOT NULL;

每个索引都有明确的目的:

  • idx_sessions_source :按平台过滤会话列表

  • idx_sessions_parent :快速查找会话的所有子会话(分支遍历)

  • idx_sessions_started :按时间排序的会话列表(降序,最新的在前)

  • idx_sessions_title_unique :确保标题唯一性(部分索引,NULL 不参与唯一性检查)

7.3.4. messages 表

CREATE TABLE IF NOT EXISTS messages (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    session_id TEXT NOT NULL REFERENCES sessions(id),
    role TEXT NOT NULL,                -- 'system', 'user', 'assistant', 'tool'
    content TEXT,                      -- 消息正文
    tool_call_id TEXT,                 -- 工具结果引用的调用 ID
    tool_calls TEXT,                   -- JSON: 工具调用数组
    tool_name TEXT,                    -- 工具名称
    timestamp REAL NOT NULL,           -- Unix 时间戳
    token_count INTEGER,               -- 消息的 token 数
    finish_reason TEXT,                -- 'stop', 'tool_calls', etc. (v2)
    reasoning TEXT,                    -- 推理文本 (v6)
    reasoning_details TEXT,            -- JSON: 推理细节 (v6)
    codex_reasoning_items TEXT         -- JSON: Codex 推理项 (v6)
);

CREATE INDEX idx_messages_session
    ON messages(session_id, timestamp);

messages 表的设计关注点:

AUTOINCREMENT 主键: 使用自增整数而非 UUID,因为消息的插入频率很高, 整数主键的插入性能更好。lastrowid 被用于返回新插入消息的 ID。

tool_calls 和 tool_name 分离: tool_calls 是完整的 JSON 数组 (包含 id、function.name、function.arguments),而 tool_name 是从中提取的工具名称,用于快速过滤而不需要解析 JSON。

reasoning 字段(v6 新增): 保存助手的推理链文本和结构化推理细节。 没有这些字段,推理链在会话重新加载时会丢失, 导致多轮推理的连续性被破坏。

7.4. FTS5 全文搜索

7.4.1. FTS5 虚拟表结构

CREATE VIRTUAL TABLE IF NOT EXISTS messages_fts USING fts5(
    content,
    content=messages,
    content_rowid=id
);

这里使用了 FTS5 的 内容表模式(content table mode)content=messages 指定 FTS 索引的内容来自 messages 表, content_rowid=id 将 FTS 的 rowid 映射到消息表的自增主键。 这种模式下,FTS 虚拟表本身不存储内容副本,只存储倒排索引, 节省了磁盘空间。

7.4.2. 同步触发器

FTS5 索引通过三个触发器与 messages 表保持同步:

-- 插入时同步
CREATE TRIGGER messages_fts_insert AFTER INSERT ON messages BEGIN
    INSERT INTO messages_fts(rowid, content) VALUES (new.id, new.content);
END;

-- 删除时同步
CREATE TRIGGER messages_fts_delete AFTER DELETE ON messages BEGIN
    INSERT INTO messages_fts(messages_fts, rowid, content)
        VALUES('delete', old.id, old.content);
END;

-- 更新时同步(先删旧索引,再插新索引)
CREATE TRIGGER messages_fts_update AFTER UPDATE ON messages BEGIN
    INSERT INTO messages_fts(messages_fts, rowid, content)
        VALUES('delete', old.id, old.content);
    INSERT INTO messages_fts(rowid, content) VALUES (new.id, new.content);
END;

注意 FTS5 的删除操作使用特殊的语法——向虚拟表插入一行, 但第一个列名为虚拟表自身名称(messages_fts),值为 'delete' 。 这是 FTS5 内容表模式的要求。

7.4.3. 查询清理器(_sanitize_fts5_query)

FTS5 有自己的查询语法,其中 "()+*{}ANDORNOT 都有特殊含义。 直接将用户输入传递给 MATCH 子句可能导致 OperationalError

_sanitize_fts5_query() 方法通过六个步骤处理用户输入:

步骤 1:提取并保护引号短语。 使用占位符替换 "..." 格式的精确匹配短语, 防止后续步骤破坏它们。

步骤 2:移除未匹配的 FTS5 特殊字符。 去除 +{}()\"^ 等字符。

步骤 3:规范化通配符。 将连续的 * 合并为一个, 移除前导的 * (FTS5 前缀搜索至少需要一个前导字符)。

步骤 4:移除两端悬空的布尔运算符。 "hello AND""OR world" 会导致语法错误。

步骤 5:包装带连字符和点号的术语。 FTS5 默认分词器会在连字符和点号处分割, 导致 chat-send 变成 chat AND sendP2.2 变成 p2 AND 2 。 将它们用引号包装可以保持短语语义。

步骤 6:恢复步骤 1 中保护的引号短语。

7.4.4. CJK 回退机制

FTS5 的默认分词器(tokenizer)是 unicode61 , 它按 Unicode 单词边界分词。对于 CJK 字符, 这个分词器会将每个字符视为一个独立的词元(token)。 这意味着搜索"数据库迁移"会被分解为"数"、"据"、"库"、"迁"、"移"五个独立的词元, 返回包含其中任何一个字符的所有结果。

search_messages() 方法实现了 CJK 回退:

# 首先尝试 FTS5 搜索
try:
    cursor = self._conn.execute(sql, params)
except sqlite3.OperationalError:
    if not self._contains_cjk(query):
        return []  # 非 CJK 的语法错误直接返回空
    matches = []
else:
    matches = [dict(row) for row in cursor.fetchall()]

# 如果 FTS5 无结果且查询包含 CJK,回退到 LIKE
if not matches and self._contains_cjk(query):
    raw_query = query.strip('"').strip()
    like_sql = """
        SELECT ... FROM messages m ...
        WHERE m.content LIKE ?
        ...
    """
    like_params = [f"%{raw_query}%"]

_contains_cjk() 通过 Unicode 范围检查判断文本是否包含 CJK 字符:

@staticmethod
def _contains_cjk(text):
    for ch in text:
        cp = ord(ch)
        if (0x4E00 <= cp <= 0x9FFF or   # CJK Unified Ideographs
            0x3400 <= cp <= 0x4DBF or   # CJK Extension A
            0x20000 <= cp <= 0x2A6DF or # CJK Extension B
            0x3000 <= cp <= 0x303F or   # CJK Symbols
            0x3040 <= cp <= 0x309F or   # Hiragana
            0x30A0 <= cp <= 0x30FF or   # Katakana
            0xAC00 <= cp <= 0xD7AF):    # Hangul Syllables
            return True
    return False

搜索结果包含上下文(每条匹配消息前后各一条), 使用 FTS5 的 snippet() 函数生成高亮摘要:

snippet(messages_fts, 0, '>>>', '<<<', '...', 40) AS snippet

这会在匹配文本前后添加 >>><<< 标记, 上下文窗口为 40 个字符。

FTS5 搜索流程(含 CJK 回退):

        flowchart TD
    A["用户输入搜索查询"] --> B["_sanitize_fts5_query()"]
    B --> C["构建 FTS5 MATCH 查询"]
    C --> D{"执行 FTS5 查询"}
    D -- 成功 --> E{"有结果?"}
    D -- OperationalError --> F{"查询含 CJK?"}
    F -- 否 --> G["返回空列表"]
    F -- 是 --> H["matches = []"]
    E -- 是 --> K["添加上下文消息"]
    E -- 否 --> I{"查询含 CJK?"}
    H --> I
    I -- 否 --> G
    I -- 是 --> J["LIKE 回退搜索<br/>%query%"]
    J --> K
    K --> L["返回结果<br/>(含 snippet + 上下文)"]

    class A start
    class L success
    class G fail
    class J warn
    class B,C,H,K info

    classDef start fill:#dbeafe,stroke:#3b82f6,color:#1e3a8a
    classDef success fill:#dcfce7,stroke:#16a34a,color:#166534
    classDef warn fill:#fef9c3,stroke:#ca8a04,color:#854d0e
    classDef fail fill:#fee2e2,stroke:#dc2626,color:#991b1b
    classDef info fill:#f1f5f9,stroke:#64748b,color:#334155
    

FTS5 全文搜索流程(含 CJK 回退)

7.5. 写入并发控制

7.5.1. 问题:多进程写竞争

在 gateway 模式下,多个 Hermes 进程同时运行:

  • Gateway 主进程处理来自多个平台的请求

  • CLI 会话独立连接数据库

  • 工作树 Agent 在并行分支中工作

所有这些进程共享同一个 state.db 文件。 SQLite 的 WAL 模式允许并发读,但写操作仍然需要排他锁。

SQLite 内置的忙等待处理器使用确定性的 退避策略—— 固定的时间间隔重试。在高并发场景下,多个写入者会在相同的时间点重试, 形成护航效应(convoy effect) ,导致 TUI 冻结和响应延迟。

7.5.2. 解决方案:BEGIN IMMEDIATE + 随机抖动

SessionDB 使用应用层重试 + 随机抖动 来打破护航效应:

_WRITE_MAX_RETRIES = 15
_WRITE_RETRY_MIN_S = 0.020   # 20ms
_WRITE_RETRY_MAX_S = 0.150   # 150ms
_CHECKPOINT_EVERY_N_WRITES = 50

def _execute_write(self, fn):
    last_err = None
    for attempt in range(self._WRITE_MAX_RETRIES):
        try:
            with self._lock:
                self._conn.execute("BEGIN IMMEDIATE")
                try:
                    result = fn(self._conn)
                    self._conn.commit()
                except BaseException:
                    self._conn.rollback()
                    raise
            # 成功后周期性检查点
            self._write_count += 1
            if self._write_count % self._CHECKPOINT_EVERY_N_WRITES == 0:
                self._try_wal_checkpoint()
            return result
        except sqlite3.OperationalError as exc:
            err_msg = str(exc).lower()
            if "locked" in err_msg or "busy" in err_msg:
                last_err = exc
                if attempt < self._WRITE_MAX_RETRIES - 1:
                    jitter = random.uniform(
                        self._WRITE_RETRY_MIN_S,
                        self._WRITE_RETRY_MAX_S,
                    )
                    time.sleep(jitter)
                    continue
            raise
    raise last_err or sqlite3.OperationalError("database is locked")

关键设计点:

BEGIN IMMEDIATE: 在事务开始时立即获取写锁, 而不是等到第一次写操作时。这使得锁竞争在事务开始时就显现, 而不是在事务中间——减少了死锁的风险。

threading.Lock + SQLite 锁: Python 的 threading.Lock 确保同一个进程内的多个线程不会同时尝试写入。 SQLite 的 WAL 写锁处理不同进程之间的竞争。

随机抖动: random.uniform(20ms, 150ms) 的随机等待时间 使得竞争的写入者不会在同一时刻重试。

15 次重试上限: 最多重试 15 次,每次最长等待 150ms, 总等待时间上限为 15 * 150ms = 2.25 秒。 如果 15 次都失败,抛出异常。

超时设置为 1 秒: SQLite 连接的 timeout=1.0 是短超时—— 如果 SQLite 内部的忙等待在 1 秒内无法获取锁,立即返回错误, 由应用层重试处理。

7.5.3. WAL 检查点

每 50 次成功的写操作后,执行一次 PASSIVE 检查点

def _try_wal_checkpoint(self):
    try:
        with self._lock:
            result = self._conn.execute(
                "PRAGMA wal_checkpoint(PASSIVE)"
            ).fetchone()
    except Exception:
        pass  # 尽力而为,永远不会致命

PASSIVE 模式不会阻塞——它只合并那些没有其他连接正在使用的 WAL 帧。 如果检查点无法合并某些帧(因为其他连接正在使用),它会静默跳过, 而不是等待。

这个机制防止了 WAL 文件在长时间运行中无限增长。 在 close() 方法中也执行了一次检查点, 确保退出的进程帮助保持 WAL 文件的合理大小。

写入并发时序图:

        sequenceDiagram
    autonumber
    participant T1 as 线程 1
    participant Lock as threading.Lock
    participant DB as SQLite (WAL)
    participant T2 as 线程 2

    T1->>Lock: acquire()
    T1->>DB: BEGIN IMMEDIATE
    T1->>DB: INSERT INTO messages ...
    T1->>DB: COMMIT
    T1->>Lock: release()

    Note over T2: T2 在 T1 持有锁期间尝试写入

    T2->>Lock: acquire() — 阻塞
    T1->>Lock: release()
    T2->>Lock: acquire() — 成功
    T2->>DB: BEGIN IMMEDIATE
    T2->>DB: INSERT INTO messages ...
    T2->>DB: COMMIT
    T2->>Lock: release()

    Note over T1,T2: 不同进程的竞争由 SQLite WAL 处理

    rect rgb(255, 230, 230)
        Note over DB: Process A: BEGIN IMMEDIATE holds WAL write lock
        Note over DB: Process B: BEGIN IMMEDIATE - database is locked
        Note over DB: Process B: sleep random 20-150ms
        Note over DB: Process B: retry BEGIN IMMEDIATE - success
    end
    

写入并发控制时序图

7.6. 会话生命周期

SessionDB 管理的会话有完整的状态机,从创建到删除。

7.6.1. 创建会话(create_session)

def create_session(self, session_id, source, model=None, model_config=None,
                   system_prompt=None, user_id=None, parent_session_id=None):
    def _do(conn):
        conn.execute(
            """INSERT OR IGNORE INTO sessions
               (id, source, user_id, model, model_config,
                system_prompt, parent_session_id, started_at)
               VALUES (?, ?, ?, ?, ?, ?, ?, ?)""",
            (session_id, source, user_id, model,
             json.dumps(model_config) if model_config else None,
             system_prompt, parent_session_id, time.time()),
        )
    self._execute_write(_do)
    return session_id

使用 INSERT OR IGNORE 而非 INSERT — 如果会话 ID 已存在则静默忽略。 这使得 create_session 是幂等的,调用者不需要先检查会话是否存在。

7.6.2. 追加消息(append_message)

def append_message(self, session_id, role, content=None,
                   tool_name=None, tool_calls=None, tool_call_id=None,
                   token_count=None, finish_reason=None,
                   reasoning=None, reasoning_details=None,
                   codex_reasoning_items=None):
    # 预序列化 JSON 字段
    tool_calls_json = json.dumps(tool_calls) if tool_calls else None
    reasoning_details_json = json.dumps(reasoning_details) if reasoning_details else None
    codex_items_json = json.dumps(codex_reasoning_items) if codex_reasoning_items else None

    def _do(conn):
        cursor = conn.execute(
            """INSERT INTO messages (...) VALUES (...)""",
            (session_id, role, content, ...),
        )
        msg_id = cursor.lastrowid
        # 更新计数器
        conn.execute(
            "UPDATE sessions SET message_count = message_count + 1, ...")
        return msg_id

    return self._execute_write(_do)

关键细节:JSON 序列化在进入写事务之前 完成。 这减少了在持有写锁时的工作量,缩短了锁持有时间,降低了竞争。

7.6.3. 结束会话(end_session)

def end_session(self, session_id, end_reason):
    def _do(conn):
        conn.execute(
            "UPDATE sessions SET ended_at = ?, end_reason = ? WHERE id = ?",
            (time.time(), end_reason, session_id),
        )
    self._execute_write(_do)

end_reason 的典型值包括:"user_exit""timeout""reset""error" 等。

7.6.4. 重新打开会话(reopen_session)

def reopen_session(self, session_id):
    def _do(conn):
        conn.execute(
            "UPDATE sessions SET ended_at = NULL, end_reason = NULL WHERE id = ?",
            (session_id,),
        )
    self._execute_write(_do)

清除结束时间和原因,使已结束的会话可以恢复。

7.6.5. 确保会话存在(ensure_session)

def ensure_session(self, session_id, source="unknown", model=None):
    def _do(conn):
        conn.execute(
            """INSERT OR IGNORE INTO sessions
               (id, source, model, started_at) VALUES (?, ?, ?, ?)""",
            (session_id, source, model, time.time()),
        )
    self._execute_write(_do)

这是一个恢复机制 。如果在 Agent 启动时 create_session() 因为 瞬时的 SQLite 锁而失败,后续的 append_message() 可以调用 ensure_session() 来确保会话行存在。

7.6.6. 删除会话(delete_session)

def delete_session(self, session_id):
    def _do(conn):
        # 先孤立子会话
        conn.execute(
            "UPDATE sessions SET parent_session_id = NULL "
            "WHERE parent_session_id = ?", (session_id,))
        # 删除消息
        conn.execute("DELETE FROM messages WHERE session_id = ?", (session_id,))
        # 删除会话
        conn.execute("DELETE FROM sessions WHERE id = ?", (session_id,))
        return True
    return self._execute_write(_do)

子会话不会被级联删除 ,而是被孤立(parent_session_id = NULL)。 这确保了压缩产生的分支会话不会因为原始会话被删除而消失。

7.6.7. 修剪旧会话(prune_sessions)

def prune_sessions(self, older_than_days=90, source=None):
    cutoff = time.time() - (older_than_days * 86400)
    # 只删除已结束的会话
    # 先孤立子会话
    # 然后逐个删除

默认清理 90 天以前的已结束会话。活跃会话不会被删除 , 无论多老。

会话生命周期状态图:

        stateDiagram-v2
    [*] --> Created : create_session()
    Created --> Active : 第一条消息追加
    Active --> Active : append_message()
    Active --> Active : update_token_counts()
    Active --> Active : set_session_title()
    Active --> Ended : end_session()
    Ended --> Reopened : reopen_session()
    Reopened --> Active : append_message()
    Active --> Branched : 上下文压缩触发分割
    Branched --> Active : 新会话继续
    Ended --> Deleted : delete_session()
    Active --> Deleted : delete_session()
    Deleted --> [*]
    

会话生命周期状态图

7.7. 会话分支机制

7.7.1. 分支的设计动机

当上下文压缩发生时,Hermes 有两种选择:

  1. 在同一个会话中替换消息历史(当前方案)

  2. 创建一个新会话,通过 parent_session_id 关联(可选方案)

当前实现主要使用方案 1(在同一个会话中压缩), 但 parent_session_id 基础设施已经就绪,支持以下场景:

  • 子 Agent 任务(通过 delegate_task 工具创建)

  • 压缩触发的会话分割

  • 用户手动创建的分支

7.7.2. 标题的血脉继承

会话分支在标题层面有特殊的继承规则。

``get_next_title_in_lineage()`` 生成血脉中的下一个标题:

def get_next_title_in_lineage(self, base_title):
    # 去除已有的 #N 后缀
    match = re.match(r'^(.*?) #(\d+)$', base_title)
    if match:
        base = match.group(1)
    else:
        base = base_title

    # 查找所有现有的编号变体
    existing = [查询 base  base #N 的所有标题]

    # 找到最大编号,+1
    max_num = 1  # 未编号的原始标题算 #1
    for t in existing:
        m = re.match(r'^.* #(\d+)$', t)
        if m:
            max_num = max(max_num, int(m.group(1)))
    return f"{base} #{max_num + 1}"

例如:"数据库重构" → "数据库重构 #2" → "数据库重构 #3"。

``resolve_session_by_title()`` 在查找标题时优先返回最新的编号变体:

def resolve_session_by_title(self, title):
    exact = self.get_session_by_title(title)
    numbered = [查询 "title #N" 的所有变体]

    if numbered:
        return numbered[0]["id"]  # 返回最新的编号变体
    elif exact:
        return exact["id"]
    return None

7.7.3. 标题验证与清理

sanitize_title() 方法对标题进行严格清理:

  • 去除 ASCII 控制字符(0x00-0x1F, 0x7F)

  • 去除零宽字符(U+200B-U+200F, U+FEFF)

  • 去除方向覆盖(U+202A-U+202E, U+2066-U+2069)

  • 合并内部空白为单个空格

  • 长度上限 100 字符

  • 空白字符串规范化为 None

标题唯一性由部分唯一索引保证:

CREATE UNIQUE INDEX idx_sessions_title_unique
    ON sessions(title) WHERE title IS NOT NULL

7.8. Token 计数双模式

7.8.1. 增量模式 vs. 绝对模式

update_token_counts() 支持两种更新模式:

增量模式(absolute=False,默认): 每次 API 调用后累加 delta 值。

UPDATE sessions SET
    input_tokens = input_tokens + ?,
    output_tokens = output_tokens + ?,
    ...
WHERE id = ?

绝对模式(absolute=True): 直接设置累计值。

UPDATE sessions SET
    input_tokens = ?,
    output_tokens = ?,
    ...
WHERE id = ?

为什么需要两种模式?因为 Hermes 有两种运行模式:

CLI 模式: 每次只看到当前 API 调用的 delta,使用增量模式。

Gateway 模式: 缓存的 Agent 在内存中维护累计计数, 通过 update_token_counts(absolute=True) 一次性设置总值。

7.8.2. 计费信息

update_token_counts() 还更新计费相关字段:

  • billing_provider :计费提供商(如 "openrouter")

  • billing_base_url :API 基础 URL

  • billing_mode :计费模式

  • estimated_cost_usd :估算费用(美元)

  • actual_cost_usd :实际费用(来自 API 响应)

  • cost_status :费用状态("estimated"、"final")

  • cost_source :费用来源

  • pricing_version :定价版本

所有计费字段使用 COALESCE 模式——只有在提供新值时才更新:

billing_provider = COALESCE(billing_provider, ?)

这意味着首次设置后,除非显式提供新值,否则不会被覆盖。

7.9. 消息加载

7.9.1. 两种加载格式

SessionDB 提供两种消息加载方法:

``get_messages()`` : 返回数据库行格式的字典列表, 所有字段都保留(包括 id、session_id、timestamp 等)。 适合内部处理和数据导出。

``get_messages_as_conversation()`` : 返回 OpenAI 对话格式的消息列表, 只包含 rolecontent 和工具相关字段。 适合网关恢复对话历史。

7.9.2. 还原时的反序列化

tool_calls 字段在数据库中以 JSON 字符串存储。 加载时需要反序列化:

if msg.get("tool_calls"):
    try:
        msg["tool_calls"] = json.loads(msg["tool_calls"])
    except (json.JSONDecodeError, TypeError):
        logger.warning("Failed to deserialize tool_calls, falling back to []")
        msg["tool_calls"] = []

v6 新增的 reasoning 字段同样需要反序列化:

if row["role"] == "assistant":
    if row["reasoning"]:
        msg["reasoning"] = row["reasoning"]
    if row["reasoning_details"]:
        try:
            msg["reasoning_details"] = json.loads(row["reasoning_details"])
        except (json.JSONDecodeError, TypeError):
            msg["reasoning_details"] = None

反序列化失败时的策略是降级而非崩溃——返回空列表或 None, 让对话可以继续,而不是因为一条损坏的消息而中断整个会话。

7.10. 会话列表与富预览

list_sessions_rich() 提供了带预览的会话列表。

7.10.1. 单查询优化

早期版本使用 N+2 查询(1 次查询会话列表 + N 次查询预览 + N 次查询最后活跃时间), 性能很差。当前版本使用单个查询 + 关联子查询

SELECT s.*,
    COALESCE(
        (SELECT SUBSTR(REPLACE(REPLACE(m.content, X'0A', ' '), X'0D', ' '), 1, 63)
         FROM messages m
         WHERE m.session_id = s.id AND m.role = 'user' AND m.content IS NOT NULL
         ORDER BY m.timestamp, m.id LIMIT 1),
        ''
    ) AS _preview_raw,
    COALESCE(
        (SELECT MAX(m2.timestamp) FROM messages m2 WHERE m2.session_id = s.id),
        s.started_at
    ) AS last_active
FROM sessions s
...

预览取第一条用户消息的前 63 个字符(换行替换为空格), last_active 取最后一条消息的时间戳。

7.10.2. 子会话过滤

默认情况下,子会话(子 Agent 任务、压缩分支)被排除:

if not include_children:
    where_clauses.append("s.parent_session_id IS NULL")

这避免了会话列表被大量的子 Agent 任务淹没。

7.11. ID 解析

SessionDB 支持多种 ID 解析方式:

精确 ID: 直接通过 get_session() 查找。

前缀匹配: resolve_session_id() 支持使用 UUID 的前缀来查找:

def resolve_session_id(self, session_id_or_prefix):
    exact = self.get_session(session_id_or_prefix)
    if exact:
        return exact["id"]
    # 模糊匹配
    cursor = self._conn.execute(
        "SELECT id FROM sessions WHERE id LIKE ? ESCAPE '\\' "
        "ORDER BY started_at DESC LIMIT 2",
        (f"{escaped}%",),
    )
    matches = [row["id"] for row in cursor.fetchall()]
    if len(matches) == 1:
        return matches[0]  # 唯一匹配
    return None  # 无匹配或歧义匹配

注意 LIKE 的通配符(%_)在用户输入中被转义, 防止恶意的模式匹配。

标题解析: resolve_session_by_title() 通过标题或标题变体查找。

7.12. Schema 迁移链 v1→v6

7.12.1. 迁移策略

SessionDB 使用顺序迁移 策略——从当前版本开始, 依次应用每个版本的迁移脚本:

def _init_schema(self):
    cursor.execute("SELECT version FROM schema_version LIMIT 1")
    row = cursor.fetchone()
    if row is None:
        cursor.execute("INSERT INTO schema_version (version) VALUES (?)",
                       (SCHEMA_VERSION,))
    else:
        current_version = row["version"]
        if current_version < 2:
            # v2 迁移
        if current_version < 3:
            # v3 迁移
        if current_version < 4:
            # v4 迁移
        ...

每个迁移都在 try/except 中执行——如果列或索引已存在,静默跳过。 这使得迁移是幂等的

7.12.2. 迁移历史

v1 → v2:messages 表添加 finish_reason TEXT 列。 记录模型停止的原因("stop""tool_calls" 等)。

v2 → v3:sessions 表添加 title TEXT 列。 支持用户为会话设置可读的标题。

v3 → v4: 创建 title 的唯一索引(部分索引,WHERE title IS NOT NULL)。 确保标题不重复。

v4 → v5: 批量添加计费和 token 相关列。

new_columns = [
    ("cache_read_tokens", "INTEGER DEFAULT 0"),
    ("cache_write_tokens", "INTEGER DEFAULT 0"),
    ("reasoning_tokens", "INTEGER DEFAULT 0"),
    ("billing_provider", "TEXT"),
    ("billing_base_url", "TEXT"),
    ("billing_mode", "TEXT"),
    ("estimated_cost_usd", "REAL"),
    ("actual_cost_usd", "REAL"),
    ("cost_status", "TEXT"),
    ("cost_source", "TEXT"),
    ("pricing_version", "TEXT"),
]

列名通过 replace('"', '""') 进行了双引号转义—— 虽然列名来自硬编码的元组而非用户输入,但这是防御性编程的一环。

v5 → v6:messages 表添加推理链相关列。

for col_name, col_type in [
    ("reasoning", "TEXT"),
    ("reasoning_details", "TEXT"),
    ("codex_reasoning_items", "TEXT"),
]:
    cursor.execute(f'ALTER TABLE messages ADD COLUMN "{safe}" {col_type}')

没有这些列,推理链在会话重新加载时会丢失, 导致 OpenRouter、OpenAI、Nous 等提供者的多轮推理连续性被破坏。

Schema 迁移流程图:

        flowchart TD
    A["_init_schema()"] --> B["创建基础表<br/>(sessions, messages, indexes)"]
    B --> C{"schema_version<br/>表有记录?"}
    C -- 否 --> D["插入当前版本号"]
    C -- 是 --> E["读取 current_version"]
    E --> F{"< v2?"}
    F -- 是 --> G["ALTER messages<br/>ADD finish_reason"]
    G --> H
    F -- 否 --> H{"< v3?"}
    H -- 是 --> I["ALTER sessions<br/>ADD title"]
    I --> J
    H -- 否 --> J{"< v4?"}
    J -- 是 --> K["CREATE UNIQUE INDEX<br/>idx_sessions_title_unique"]
    K --> L
    J -- 否 --> L{"< v5?"}
    L -- 是 --> M["批量添加<br/>billing/token 列<br/>(10 列)"]
    M --> N
    L -- 否 --> N{"< v6?"}
    N -- 是 --> O["添加 reasoning 列<br/>(3 列)"]
    O --> P
    N -- 否 --> P["确保 title 唯一索引"]
    P --> Q["初始化 FTS5<br/>(虚拟表 + 触发器)"]
    Q --> R["COMMIT"]
    D --> Q

    class A start
    class R success
    class G,I,K,M,O info
    class B,Q info

    classDef start fill:#dbeafe,stroke:#3b82f6,color:#1e3a8a
    classDef success fill:#dcfce7,stroke:#16a34a,color:#166534
    classDef warn fill:#fef9c3,stroke:#ca8a04,color:#854d0e
    classDef fail fill:#fee2e2,stroke:#dc2626,color:#991b1b
    classDef info fill:#f1f5f9,stroke:#64748b,color:#334155
    

Schema 迁移流程图 (v1-v6)

7.13. 轨迹记录

7.13.1. 轨迹格式

agent/trajectory.py 提供了独立的轨迹保存功能, 与 SessionDB 的会话存储互不干扰。

def save_trajectory(trajectory, model, completed, filename=None):
    if filename is None:
        filename = "trajectory_samples.jsonl" if completed \
                   else "failed_trajectories.jsonl"

    entry = {
        "conversations": trajectory,
        "timestamp": datetime.now().isoformat(),
        "model": model,
        "completed": completed,
    }

    with open(filename, "a", encoding="utf-8") as f:
        f.write(json.dumps(entry, ensure_ascii=False) + "\n")

轨迹以 JSONL 格式追加到文件中,区分成功和失败的对话。 ensure_ascii=False 保留了 CJK 字符和 emoji。

7.13.2. Scratchpad 转换

trajectory.py 还包含两个辅助函数:

``convert_scratchpad_to_think()`` :<REASONING_SCRATCHPAD> 标签 转换为 <think 标签,用于 ShareGPT 格式的兼容性。

``has_incomplete_scratchpad()`` : 检查内容中是否有未闭合的 <REASONING_SCRATCHPAD> 标签——这在流式输出被截断时可能发生。

7.14. 会话 ID 解析

SessionDB 支持灵活的会话 ID 解析策略:

精确 UUID 匹配: get_session(session_id) 直接按主键查找。

UUID 前缀匹配: resolve_session_id(prefix) 使用 LIKE 'prefix%' 查找以给定前缀开头的唯一会话。如果前缀匹配多个会话,返回 None

标题精确匹配: get_session_by_title(title) 按标题精确查找。

标题血脉解析: resolve_session_by_title(title) 优先返回最新的 编号变体(例如 "项目 #3" 优先于 "项目")。

标题安全清理: sanitize_title(title) 去除控制字符、零宽字符等。

7.15. 导出与备份

7.15.1. 单会话导出

def export_session(self, session_id):
    session = self.get_session(session_id)
    if not session:
        return None
    messages = self.get_messages(session_id)
    return {**session, "messages": messages}

7.15.2. 全量导出

def export_all(self, source=None):
    sessions = self.search_sessions(source=source, limit=100000)
    results = []
    for session in sessions:
        messages = self.get_messages(session["id"])
        results.append({**session, "messages": messages})
    return results

适合写入 JSONL 文件进行备份和分析。

7.15.3. 消息清除

def clear_messages(self, session_id):
    def _do(conn):
        conn.execute("DELETE FROM messages WHERE session_id = ?", (session_id,))
        conn.execute(
            "UPDATE sessions SET message_count = 0, tool_call_count = 0 "
            "WHERE id = ?", (session_id,))
    self._execute_write(_do)

清除消息但保留会话记录本身——相当于"重置对话历史"。

7.16. 配置参数速查表

参数名

默认值

说明

SCHEMA_VERSION

6

当前数据库 schema 版本

_WRITE_MAX_RETRIES

15

写入重试最大次数

_WRITE_RETRY_MIN_S

0.020

重试最小等待时间(秒)

_WRITE_RETRY_MAX_S

0.150

重试最大等待时间(秒)

_CHECKPOINT_EVERY_N_WRITES

50

每 N 次写入执行 WAL 检查点

MAX_TITLE_LENGTH

100

标题最大长度(字符)

SQLite timeout

1.0

SQLite 内部忙等待超时(秒)

7.17. 总结

Hermes 的会话状态系统是一个经过深思熟虑的持久化架构:

  • SQLite + WAL 提供了零部署、高并发的存储基础

  • Schema 迁移链 确保了数据库可以平滑升级

  • FTS5 + CJK 回退 提供了跨语言的全文搜索能力

  • BEGIN IMMEDIATE + 随机抖动 打破了多进程写竞争的护航效应

  • 会话分支和标题血脉 支持了复杂的会话管理场景

  • 双模式 Token 计数 适配了 CLI 和 Gateway 两种运行模式

  • 防御性反序列化 确保了即使数据损坏,会话也能继续

这个系统是 Hermes 能够在多平台、多进程环境下稳定运行的基石。