ZeroClaw-03-记忆系统深度解析

· 4128字 · 9分钟

ZeroClaw-03-记忆系统深度解析 🔗

深入解析 ZeroClaw 的混合搜索记忆系统,从设计原理到代码实现,理解 AI 如何实现"长期记忆"。

适合阅读人群:AI 应用设计师、架构师、算法工程师、核心开发者


引言:为什么 AI 需要"记忆"? 🔗

没有记忆的 AI 的局限 🔗

场景1:连续对话
你:帮我写个用户登录功能
AI:好的,这是代码...
你:加上密码强度检查
AI:什么密码强度检查?你在说什么?

场景2:跨会话工作
昨天:你叫张三,在开发电商平台
今天:继续昨天的项目
AI:什么项目?你是谁?

ZeroClaw 的记忆能力 🔗

问题类型 解决方案 技术实现
连续对话 短期记忆保持 会话上下文维护
跨会话记忆 长期记忆存储 SQLite/PostgreSQL 持久化
上下文检索 语义搜索 向量相似度计算
知识积累 向量存储 Embedding + 混合搜索

一、记忆系统架构 🔗

1.1 系统架构概览 🔗

记忆系统采用分层架构,分为输入处理、存储管理、检索输出三大模块。


数据流:存储流程 🔗

新产生的对话内容如何被存储到记忆系统中。

flowchart LR
    subgraph 输入["📥 输入源"]
        I1["用户输入"]
        I2["AI 响应"]
        I3["工具结果"]
    end

    subgraph 处理["⚙️ 处理"]
        CHUNK["文本分块"]
        EMBED["向量嵌入"]
        KEY["关键词提取"]
    end

    subgraph 存储["💾 存储"]
        V[("向量存储")]
        F[("全文索引")]
        M[("元数据")]
    end

    I1 & I2 & I3 --> CHUNK
    CHUNK --> EMBED --> V
    CHUNK --> KEY --> F
    CHUNK --> M

处理步骤

  1. 文本分块:长文本切分为小块(默认 512 tokens)
  2. 向量嵌入:生成语义向量(OpenAI/Local)
  3. 关键词提取:用于全文搜索索引
  4. 元数据记录:时间、来源、用户ID等

数据流:检索流程 🔗

如何根据用户查询检索相关记忆。

flowchart LR
    subgraph 查询["🔍 查询"]
        Q["用户问题"]
    end

    subgraph 搜索["⚙️ 混合搜索"]
        QE["向量搜索"]
        QK["关键词搜索"]
        RRF["RRF 合并"]
    end

    subgraph 结果["📤 结果"]
        R["Top-K 记忆"]
    end

    Q --> QE & QK
    QE --> RRF
    QK --> RRF
    RRF --> R

混合搜索策略

  • 向量搜索:语义相似度(权重 0.7)
  • 关键词搜索:字面匹配 BM25(权重 0.3)
  • RRF 合并:Reciprocal Rank Fusion 算法

缓存机制 🔗

嵌入服务调用有成本,使用 LRU 缓存提高性能。

flowchart LR
    subgraph 请求["📨 嵌入请求"]
        R["文本"]
    end

    subgraph 缓存["💾 缓存层"]
        C{"缓存命中?"}
        L[("LRU Cache")]
    end

    subgraph 服务["☁️ 嵌入服务"]
        E["Embedding API"]
    end

    R --> C
    C -->|"命中"| L -->|"返回"| RES["向量"]
    C -->|"未命中"| E -->|"写入缓存"| L
    E --> RES

缓存策略

  • 容量:默认 1000 条
  • 淘汰:LRU(Least Recently Used)
  • 命中率:典型场景 30-50%

完整架构概览 🔗

flowchart TB
    subgraph 输入层["📥 输入层"]
        IN["多种输入源"]
    end

    subgraph 核心层["⚙️ 核心层"]
        direction LR
        P1["存储处理"] 
        P2["检索处理"]
    end

    subgraph 存储层["💾 存储层"]
        S1[("向量 DB")]
        S2[("全文索引")]
    end

    subgraph 输出层["📤 输出层"]
        OUT["上下文组装"]
    end

    IN --> P1
    P1 --> S1 & S2
    S1 & S2 --> P2
    P2 --> OUT
    
    style P1 fill:#e8f5e9,stroke:#333
    style P2 fill:#fff3e0,stroke:#333

1.2 核心组件代码结构 🔗

// src/memory/mod.rs
pub trait Memory: Send + Sync {
    /// 存储记忆
    async fn store(&self, entry: MemoryEntry) -> Result<()>;
    
    /// 检索记忆
    async fn recall(&self, query: &str, limit: usize) -> Result<Vec<MemoryEntry>>;
    
    /// 删除记忆
    async fn forget(&self, id: &str) -> Result<()>;
}

/// 记忆条目
pub struct MemoryEntry {
    pub id: String,
    pub content: String,
    pub embedding: Option<Vec<f32>>,  // 向量表示
    pub metadata: Metadata,
    pub created_at: DateTime<Utc>,
}

pub struct Metadata {
    pub source: String,       // telegram, discord, cli...
    pub user_id: Option<String>,
    pub session_id: Option<String>,
    pub category: Category,   // conversation, fact, tool_result...
}

pub enum Category {
    Conversation,  // 对话历史
    Fact,          // 事实知识
    ToolResult,    // 工具执行结果
    Preference,    // 用户偏好
}

二、记忆存储流程 🔗

2.1 文本分块策略 🔗

// src/memory/chunking.rs
pub struct TextChunker {
    max_chunk_size: usize,
    overlap: usize,
}

impl TextChunker {
    pub fn chunk(&self, text: &str) -> Vec<Chunk> {
        // 策略1:按段落分块(优先)
        let paragraphs: Vec<&str> = text.split("\n\n").collect();
        
        let mut chunks = Vec::new();
        let mut current_chunk = String::new();
        
        for para in paragraphs {
            if current_chunk.len() + para.len() > self.max_chunk_size 
                && !current_chunk.is_empty() {
                // 保存当前块
                chunks.push(Chunk {
                    content: current_chunk.clone(),
                    start_pos: chunks.len() * self.max_chunk_size,
                });
                
                // 重叠保留上下文
                let overlap_text = self.get_overlap(&current_chunk);
                current_chunk = overlap_text;
            }
            
            current_chunk.push_str(para);
            current_chunk.push('\n');
        }
        
        // 最后一块
        if !current_chunk.is_empty() {
            chunks.push(Chunk {
                content: current_chunk,
                start_pos: chunks.len() * self.max_chunk_size,
            });
        }
        
        chunks
    }
    
    fn get_overlap(&self, text: &str) -> String {
        // 取最后 overlap 个字符作为重叠
        text.chars()
            .rev()
            .take(self.overlap)
            .collect::<String>()
            .chars()
            .rev()
            .collect()
    }
}

pub struct Chunk {
    pub content: String,
    pub start_pos: usize,
}

为什么需要分块?

因素 长文本 分块后
嵌入质量 稀释,关键信息被平均 聚焦,每块主题明确
检索精度 粗粒度 细粒度
存储效率 重复存储 增量更新

默认配置

  • max_chunk_size = 512 tokens
  • overlap = 50 tokens(保持上下文连贯)

2.2 向量生成与缓存 🔗

// src/memory/embedding.rs
pub struct EmbeddingService {
    provider: Arc<dyn EmbeddingProvider>,
    cache: Arc<RwLock<LruCache<String, Vec<f32>>>>,
}

#[async_trait]
pub trait EmbeddingProvider: Send + Sync {
    async fn embed(&self, text: &str) -> Result<Vec<f32>>;
}

impl EmbeddingService {
    pub async fn get_embedding(&self, text: &str) -> Result<Vec<f32>> {
        // 1. 生成缓存键(文本的哈希)
        let cache_key = blake3::hash(text.as_bytes()).to_string();
        
        // 2. 检查缓存
        {
            let cache = self.cache.read().await;
            if let Some(embedding) = cache.get(&cache_key) {
                trace!("嵌入缓存命中");
                return Ok(embedding.clone());
            }
        }
        
        // 3. 调用嵌入服务
        trace!("调用嵌入服务...");
        let embedding = self.provider.embed(text).await?;
        
        // 4. 写入缓存
        {
            let mut cache = self.cache.write().await;
            cache.put(cache_key, embedding.clone());
        }
        
        Ok(embedding)
    }
}

为什么用 LRU 缓存?

  • 同一对话中重复引用相同概念
  • 嵌入 API 调用有成本和延迟
  • 典型命中率:30-50%

2.3 数据库存储实现 🔗

// src/memory/sqlite.rs
pub struct SqliteMemory {
    pool: SqlitePool,
}

impl SqliteMemory {
    pub async fn new(path: &Path) -> Result<Self> {
        let pool = SqlitePool::connect(&format!(
            "sqlite:{}", path.display()
        )).await?;
        
        // 初始化表结构
        sqlx::query(
            r#"
            CREATE TABLE IF NOT EXISTS memories (
                id TEXT PRIMARY KEY,
                content TEXT NOT NULL,
                embedding BLOB,  -- 二进制存储向量
                category TEXT,
                source TEXT,
                user_id TEXT,
                session_id TEXT,
                created_at DATETIME DEFAULT CURRENT_TIMESTAMP
            );
            
            -- FTS5 全文搜索虚拟表
            CREATE VIRTUAL TABLE IF NOT EXISTS memories_fts USING fts5(
                content,
                content='memories',
                content_rowid='rowid'
            );
            
            -- 触发器:自动同步 FTS 索引
            CREATE TRIGGER IF NOT EXISTS memories_ai AFTER INSERT ON memories BEGIN
                INSERT INTO memories_fts(rowid, content) 
                VALUES (new.rowid, new.content);
            END;
            
            CREATE TRIGGER IF NOT EXISTS memories_ad AFTER DELETE ON memories BEGIN
                INSERT INTO memories_fts(memories_fts, rowid, content) 
                VALUES ('delete', old.rowid, old.content);
            END;
            "#
        ).execute(&pool).await?;
        
        Ok(Self { pool })
    }
    
    pub async fn store(&self, entry: MemoryEntry) -> Result<()> {
        let id = uuid::Uuid::new_v4().to_string();
        
        // 序列化向量为二进制
        let embedding_blob = entry.embedding.as_ref()
            .map(|e| serialize_embedding(e));
        
        sqlx::query(
            r#"
            INSERT INTO memories 
            (id, content, embedding, category, source, user_id, session_id)
            VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
            "#
        )
        .bind(&id)
        .bind(&entry.content)
        .bind(embedding_blob.as_deref())
        .bind(format!("{:?}", entry.metadata.category))
        .bind(&entry.metadata.source)
        .bind(&entry.metadata.user_id)
        .bind(&entry.metadata.session_id)
        .execute(&self.pool)
        .await?;
        
        Ok(())
    }
}

fn serialize_embedding(embedding: &[f32]) -> Vec<u8> {
    // f32 转字节数组
    embedding.iter()
        .flat_map(|f| f.to_le_bytes())
        .collect()
}

fn deserialize_embedding(bytes: &[u8]) -> Vec<f32> {
    bytes.chunks_exact(4)
        .map(|chunk| {
            let mut arr = [0u8; 4];
            arr.copy_from_slice(chunk);
            f32::from_le_bytes(arr)
        })
        .collect()
}

为什么选择 SQLite?

特性 SQLite PostgreSQL 说明
部署 零配置 需安装服务 SQLite 开箱即用
存储 单文件 多文件 SQLite 易于备份
FTS5 内置 需插件 SQLite 全文搜索原生支持
性能 极好(单机) 极好(多机) ZeroClaw 单机为主
扩展性 有限 优秀 可通过配置切换

三、记忆检索流程 🔗

3.1 混合搜索原理 🔗

flowchart TB
    START["用户查询"] --> PARALLEL["并行检索"]
    
    PARALLEL --> VECTOR_SEARCH["向量搜索"]
    PARALLEL --> KEYWORD_SEARCH["关键词搜索"]
    
    subgraph 向量搜索分支["🔵 语义搜索"]
        V1["生成查询向量"]
        V2["余弦相似度计算"]
        V3["返回 Top-K 结果
带相似度分数"] end subgraph 关键词搜索分支["🟢 字面搜索"] K1["解析查询关键词"] K2["FTS5 BM25 评分"] K3["返回 Top-K 结果
带 BM25 分数"] end VECTOR_SEARCH --> V1 --> V2 --> V3 KEYWORD_SEARCH --> K1 --> K2 --> K3 V3 --> MERGE["结果合并"] K3 --> MERGE subgraph 合并算法["🟣 加权合并 (RRF)"] M1["归一化向量分数"] M2["归一化关键词分数"] M3["加权求和
默认: 0.7 向量 + 0.3 关键词"] M4["重排序
Reciprocal Rank Fusion"] end MERGE --> M1 --> M2 --> M3 --> M4 M4 --> RESULT["最终结果列表"] style MERGE fill:#f9f,stroke:#333,stroke-width:3px style M4 fill:#f9f,stroke:#333,stroke-width:3px

3.2 向量搜索实现 🔗

// src/memory/vector_search.rs
pub struct VectorSearch {
    pool: SqlitePool,
}

impl VectorSearch {
    /// 使用余弦相似度搜索
    pub async fn search(
        &self, 
        query_embedding: &[f32], 
        limit: usize
    ) -> Result<Vec<SearchResult>> {
        // SQLite 没有原生向量索引,需要手动计算
        // 生产环境建议使用 pgvector 或专门的向量数据库
        
        let rows = sqlx::query_as::<_, MemoryRow>(
            "SELECT id, content, embedding, created_at FROM memories WHERE embedding IS NOT NULL"
        )
        .fetch_all(&self.pool)
        .await?;
        
        let mut results: Vec<SearchResult> = rows
            .into_iter()
            .filter_map(|row| {
                let embedding = row.embedding?;
                let similarity = cosine_similarity(query_embedding, &embedding);
                
                // 过滤低相似度结果
                if similarity < 0.5 {
                    return None;
                }
                
                Some(SearchResult {
                    id: row.id,
                    content: row.content,
                    similarity,
                    created_at: row.created_at,
                })
            })
            .collect();
        
        // 按相似度排序
        results.sort_by(|a, b| {
            b.similarity.partial_cmp(&a.similarity).unwrap()
        });
        
        results.truncate(limit);
        Ok(results)
    }
}

/// 余弦相似度计算
fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
    let dot_product: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
    let norm_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
    let norm_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
    
    dot_product / (norm_a * norm_b + 1e-8)  // 避免除零
}

为什么用余弦相似度?

  • 对向量长度不敏感(只关心方向)
  • 计算简单高效
  • 范围 [-1, 1],易于解释

3.3 关键词搜索实现 🔗

// src/memory/keyword_search.rs
impl SqliteMemory {
    pub async fn keyword_search(
        &self,
        query: &str,
        limit: usize,
    ) -> Result<Vec<SearchResult>> {
        // 使用 FTS5 的 BM25 评分
        let rows = sqlx::query_as::<_, MemoryRow>(
            r#"
            SELECT m.id, m.content, m.created_at,
                   rank as score
            FROM memories_fts fts
            JOIN memories m ON m.rowid = fts.rowid
            WHERE memories_fts MATCH ?1
            ORDER BY rank
            LIMIT ?2
            "#
        )
        .bind(query)
        .bind(limit as i64)
        .fetch_all(&self.pool)
        .await?;
        
        Ok(rows.into_iter().map(|r| SearchResult {
            id: r.id,
            content: r.content,
            score: r.score,
            created_at: r.created_at,
        }).collect())
    }
}

FTS5 的优势

  • 原生 SQLite 支持,无需额外依赖
  • 自动维护索引(触发器同步)
  • BM25 评分算法(行业标准)

3.4 RRF 合并算法 🔗

// src/memory/hybrid_search.rs
pub struct HybridSearcher {
    vector_weight: f32,
    keyword_weight: f32,
    k: f32,  // RRF 常数
}

impl HybridSearcher {
    pub fn new(vector_weight: f32, keyword_weight: f32) -> Self {
        Self {
            vector_weight,
            keyword_weight,
            k: 60.0,  // 平滑常数
        }
    }
    
    pub fn merge(
        &self,
        vector_results: Vec<SearchResult>,
        keyword_results: Vec<SearchResult>,
        limit: usize,
    ) -> Vec<SearchResult> {
        let mut scores: HashMap<String, f32> = HashMap::new();
        
        // 向量分数(RRF)
        for (rank, result) in vector_results.iter().enumerate() {
            let rrf_score = self.vector_weight * (1.0 / (self.k + rank as f32));
            *scores.entry(result.id.clone()).or_insert(0.0) += rrf_score;
        }
        
        // 关键词分数(RRF)
        for (rank, result) in keyword_results.iter().enumerate() {
            let rrf_score = self.keyword_weight * (1.0 / (self.k + rank as f32));
            *scores.entry(result.id.clone()).or_insert(0.0) += rrf_score;
        }
        
        // 合并结果
        let mut merged: Vec<_> = scores.into_iter()
            .map(|(id, score)| {
                // 找到原始内容
                let content = vector_results.iter()
                    .chain(keyword_results.iter())
                    .find(|r| r.id == id)
                    .map(|r| r.content.clone())
                    .unwrap_or_default();
                    
                SearchResult { id, content, score }
            })
            .collect();
        
        // 按分数排序
        merged.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap());
        merged.truncate(limit);
        
        merged
    }
}

RRF 公式

score = Σ (weight_i / (k + rank_i))

其中:
- weight_i: 第 i 个搜索方法的权重
- rank_i: 结果在第 i 个方法中的排名(从0开始)
- k = 60: 平滑常数,防止高排名差异过大

为什么用 RRF?

  • 不需要归一化不同分数类型
  • 对排名位置敏感,对绝对分数不敏感
  • 已被证明在混合搜索中效果优秀

四、记忆生命周期管理 🔗

4.1 记忆状态流转 🔗

stateDiagram-v2
    [*] --> 临时缓存: 对话产生
    
    临时缓存 --> 短期记忆: auto_save = true
    临时缓存 --> 丢弃: 会话结束
且 auto_save = false 短期记忆 --> 长期记忆: 重要度评估 短期记忆 --> 归档: 过期策略 长期记忆 --> 归档: 长期未访问 长期记忆 --> 短期记忆: 重新激活 归档 --> 删除: 清理策略 归档 --> 长期记忆: 手动恢复 删除 --> [*] 丢弃 --> [*] note right of 临时缓存 内存中暂存 会话级别 end note note right of 短期记忆 SQLite 存储 最近 N 天 end note note right of 长期记忆 持久化存储 重要记忆 end note

4.2 重要度评估算法 🔗

// src/memory/importance.rs
pub struct ImportanceScorer;

impl ImportanceScorer {
    /// 评估记忆重要度
    pub fn score(entry: &MemoryEntry) -> f32 {
        let mut score = 0.0;
        
        // 1. 内容长度(适中长度得分高)
        let len = entry.content.len();
        if len > 50 && len < 1000 {
            score += 0.2;
        }
        
        // 2. 包含关键词
        let important_keywords = ["bug", "error", "fix", "solution", "config"];
        for kw in &important_keywords {
            if entry.content.to_lowercase().contains(kw) {
                score += 0.1;
            }
        }
        
        // 3. 用户确认(如果用户说"记住这个")
        if entry.content.contains("记住") || entry.content.contains("save this") {
            score += 0.3;
        }
        
        // 4. 工具执行结果
        if matches!(entry.metadata.category, Category::ToolResult) {
            score += 0.2;
        }
        
        // 5. 时间衰减(越新越重要)
        let days_old = entry.created_at.signed_duration_since(Utc::now()).num_days();
        score += (1.0 / (1.0 + days_old as f32)) * 0.2;
        
        score.min(1.0)
    }
}

五、配置与优化 🔗

5.1 记忆系统配置 🔗

[memory]
# 存储后端
backend = "sqlite"  # sqlite | postgres | lucid | markdown | none

# 自动保存
auto_save = true

# 嵌入服务
embedding_provider = "openai"  # openai | local | none

# 混合搜索权重
vector_weight = 0.7
keyword_weight = 0.3

# 检索限制
recall_limit = 5          # 默认召回数量
similarity_threshold = 0.5  # 最小相似度

# 清理策略
max_memory_age_days = 90   # 自动清理超过90天的记忆
max_memories_total = 10000 # 最多保留1万条

5.2 不同场景的优化建议 🔗

场景1:本地开发(快速迭代)

[memory]
backend = "sqlite"
auto_save = true
embedding_provider = "none"  # 关闭嵌入,节省成本

场景2:生产环境(高性能)

[memory]
backend = "sqlite"
auto_save = true
embedding_provider = "openai"
vector_weight = 0.7
keyword_weight = 0.3

场景3:隐私优先(纯本地)

[memory]
backend = "markdown"  # 纯文本存储
auto_save = true
embedding_provider = "none"

附录:相似度计算可视化 🔗

flowchart LR
    subgraph 查询["用户查询"]
        Q["如何部署 ZeroClaw?"]
        QV["查询向量
[0.1, 0.3, -0.2, ...]"] end subgraph 候选记忆["候选记忆"] M1["ZeroClaw 安装指南"] M1V["向量 A
[0.12, 0.28, -0.18, ...]"] S1["相似度: 0.95
🟢 高相关"] M2["Docker 使用教程"] M2V["向量 B
[0.08, 0.35, -0.15, ...]"] S2["相似度: 0.82
🟡 中相关"] M3["Rust 编程入门"] M3V["向量 C
[-0.2, 0.1, 0.5, ...]"] S3["相似度: 0.23
🔴 低相关"] end Q --> QV M1 --> M1V --> S1 M2 --> M2V --> S2 M3 --> M3V --> S3 QV -.->|"余弦相似度"| M1V QV -.->|"余弦相似度"| M2V QV -.->|"余弦相似度"| M3V style S1 fill:#afa style S2 fill:#ffa style S3 fill:#faa