ZeroClaw-03-记忆系统深度解析 🔗
深入解析 ZeroClaw 的混合搜索记忆系统,从设计原理到代码实现,理解 AI 如何实现"长期记忆"。
适合阅读人群:AI 应用设计师、架构师、算法工程师、核心开发者
引言:为什么 AI 需要"记忆"? 🔗
没有记忆的 AI 的局限 🔗
场景1:连续对话
你:帮我写个用户登录功能
AI:好的,这是代码...
你:加上密码强度检查
AI:什么密码强度检查?你在说什么?
场景2:跨会话工作
昨天:你叫张三,在开发电商平台
今天:继续昨天的项目
AI:什么项目?你是谁?
ZeroClaw 的记忆能力 🔗
| 问题类型 | 解决方案 | 技术实现 |
|---|---|---|
| 连续对话 | 短期记忆保持 | 会话上下文维护 |
| 跨会话记忆 | 长期记忆存储 | SQLite/PostgreSQL 持久化 |
| 上下文检索 | 语义搜索 | 向量相似度计算 |
| 知识积累 | 向量存储 | Embedding + 混合搜索 |
一、记忆系统架构 🔗
1.1 系统架构概览 🔗
记忆系统采用分层架构,分为输入处理、存储管理、检索输出三大模块。
数据流:存储流程 🔗
新产生的对话内容如何被存储到记忆系统中。
flowchart LR
subgraph 输入["📥 输入源"]
I1["用户输入"]
I2["AI 响应"]
I3["工具结果"]
end
subgraph 处理["⚙️ 处理"]
CHUNK["文本分块"]
EMBED["向量嵌入"]
KEY["关键词提取"]
end
subgraph 存储["💾 存储"]
V[("向量存储")]
F[("全文索引")]
M[("元数据")]
end
I1 & I2 & I3 --> CHUNK
CHUNK --> EMBED --> V
CHUNK --> KEY --> F
CHUNK --> M
处理步骤:
- 文本分块:长文本切分为小块(默认 512 tokens)
- 向量嵌入:生成语义向量(OpenAI/Local)
- 关键词提取:用于全文搜索索引
- 元数据记录:时间、来源、用户ID等
数据流:检索流程 🔗
如何根据用户查询检索相关记忆。
flowchart LR
subgraph 查询["🔍 查询"]
Q["用户问题"]
end
subgraph 搜索["⚙️ 混合搜索"]
QE["向量搜索"]
QK["关键词搜索"]
RRF["RRF 合并"]
end
subgraph 结果["📤 结果"]
R["Top-K 记忆"]
end
Q --> QE & QK
QE --> RRF
QK --> RRF
RRF --> R
混合搜索策略:
- 向量搜索:语义相似度(权重 0.7)
- 关键词搜索:字面匹配 BM25(权重 0.3)
- RRF 合并:Reciprocal Rank Fusion 算法
缓存机制 🔗
嵌入服务调用有成本,使用 LRU 缓存提高性能。
flowchart LR
subgraph 请求["📨 嵌入请求"]
R["文本"]
end
subgraph 缓存["💾 缓存层"]
C{"缓存命中?"}
L[("LRU Cache")]
end
subgraph 服务["☁️ 嵌入服务"]
E["Embedding API"]
end
R --> C
C -->|"命中"| L -->|"返回"| RES["向量"]
C -->|"未命中"| E -->|"写入缓存"| L
E --> RES
缓存策略:
- 容量:默认 1000 条
- 淘汰:LRU(Least Recently Used)
- 命中率:典型场景 30-50%
完整架构概览 🔗
flowchart TB
subgraph 输入层["📥 输入层"]
IN["多种输入源"]
end
subgraph 核心层["⚙️ 核心层"]
direction LR
P1["存储处理"]
P2["检索处理"]
end
subgraph 存储层["💾 存储层"]
S1[("向量 DB")]
S2[("全文索引")]
end
subgraph 输出层["📤 输出层"]
OUT["上下文组装"]
end
IN --> P1
P1 --> S1 & S2
S1 & S2 --> P2
P2 --> OUT
style P1 fill:#e8f5e9,stroke:#333
style P2 fill:#fff3e0,stroke:#333
1.2 核心组件代码结构 🔗
// src/memory/mod.rs
pub trait Memory: Send + Sync {
/// 存储记忆
async fn store(&self, entry: MemoryEntry) -> Result<()>;
/// 检索记忆
async fn recall(&self, query: &str, limit: usize) -> Result<Vec<MemoryEntry>>;
/// 删除记忆
async fn forget(&self, id: &str) -> Result<()>;
}
/// 记忆条目
pub struct MemoryEntry {
pub id: String,
pub content: String,
pub embedding: Option<Vec<f32>>, // 向量表示
pub metadata: Metadata,
pub created_at: DateTime<Utc>,
}
pub struct Metadata {
pub source: String, // telegram, discord, cli...
pub user_id: Option<String>,
pub session_id: Option<String>,
pub category: Category, // conversation, fact, tool_result...
}
pub enum Category {
Conversation, // 对话历史
Fact, // 事实知识
ToolResult, // 工具执行结果
Preference, // 用户偏好
}
二、记忆存储流程 🔗
2.1 文本分块策略 🔗
// src/memory/chunking.rs
pub struct TextChunker {
max_chunk_size: usize,
overlap: usize,
}
impl TextChunker {
pub fn chunk(&self, text: &str) -> Vec<Chunk> {
// 策略1:按段落分块(优先)
let paragraphs: Vec<&str> = text.split("\n\n").collect();
let mut chunks = Vec::new();
let mut current_chunk = String::new();
for para in paragraphs {
if current_chunk.len() + para.len() > self.max_chunk_size
&& !current_chunk.is_empty() {
// 保存当前块
chunks.push(Chunk {
content: current_chunk.clone(),
start_pos: chunks.len() * self.max_chunk_size,
});
// 重叠保留上下文
let overlap_text = self.get_overlap(¤t_chunk);
current_chunk = overlap_text;
}
current_chunk.push_str(para);
current_chunk.push('\n');
}
// 最后一块
if !current_chunk.is_empty() {
chunks.push(Chunk {
content: current_chunk,
start_pos: chunks.len() * self.max_chunk_size,
});
}
chunks
}
fn get_overlap(&self, text: &str) -> String {
// 取最后 overlap 个字符作为重叠
text.chars()
.rev()
.take(self.overlap)
.collect::<String>()
.chars()
.rev()
.collect()
}
}
pub struct Chunk {
pub content: String,
pub start_pos: usize,
}
为什么需要分块?
| 因素 | 长文本 | 分块后 |
|---|---|---|
| 嵌入质量 | 稀释,关键信息被平均 | 聚焦,每块主题明确 |
| 检索精度 | 粗粒度 | 细粒度 |
| 存储效率 | 重复存储 | 增量更新 |
默认配置:
max_chunk_size = 512tokensoverlap = 50tokens(保持上下文连贯)
2.2 向量生成与缓存 🔗
// src/memory/embedding.rs
pub struct EmbeddingService {
provider: Arc<dyn EmbeddingProvider>,
cache: Arc<RwLock<LruCache<String, Vec<f32>>>>,
}
#[async_trait]
pub trait EmbeddingProvider: Send + Sync {
async fn embed(&self, text: &str) -> Result<Vec<f32>>;
}
impl EmbeddingService {
pub async fn get_embedding(&self, text: &str) -> Result<Vec<f32>> {
// 1. 生成缓存键(文本的哈希)
let cache_key = blake3::hash(text.as_bytes()).to_string();
// 2. 检查缓存
{
let cache = self.cache.read().await;
if let Some(embedding) = cache.get(&cache_key) {
trace!("嵌入缓存命中");
return Ok(embedding.clone());
}
}
// 3. 调用嵌入服务
trace!("调用嵌入服务...");
let embedding = self.provider.embed(text).await?;
// 4. 写入缓存
{
let mut cache = self.cache.write().await;
cache.put(cache_key, embedding.clone());
}
Ok(embedding)
}
}
为什么用 LRU 缓存?
- 同一对话中重复引用相同概念
- 嵌入 API 调用有成本和延迟
- 典型命中率:30-50%
2.3 数据库存储实现 🔗
// src/memory/sqlite.rs
pub struct SqliteMemory {
pool: SqlitePool,
}
impl SqliteMemory {
pub async fn new(path: &Path) -> Result<Self> {
let pool = SqlitePool::connect(&format!(
"sqlite:{}", path.display()
)).await?;
// 初始化表结构
sqlx::query(
r#"
CREATE TABLE IF NOT EXISTS memories (
id TEXT PRIMARY KEY,
content TEXT NOT NULL,
embedding BLOB, -- 二进制存储向量
category TEXT,
source TEXT,
user_id TEXT,
session_id TEXT,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
);
-- FTS5 全文搜索虚拟表
CREATE VIRTUAL TABLE IF NOT EXISTS memories_fts USING fts5(
content,
content='memories',
content_rowid='rowid'
);
-- 触发器:自动同步 FTS 索引
CREATE TRIGGER IF NOT EXISTS memories_ai AFTER INSERT ON memories BEGIN
INSERT INTO memories_fts(rowid, content)
VALUES (new.rowid, new.content);
END;
CREATE TRIGGER IF NOT EXISTS memories_ad AFTER DELETE ON memories BEGIN
INSERT INTO memories_fts(memories_fts, rowid, content)
VALUES ('delete', old.rowid, old.content);
END;
"#
).execute(&pool).await?;
Ok(Self { pool })
}
pub async fn store(&self, entry: MemoryEntry) -> Result<()> {
let id = uuid::Uuid::new_v4().to_string();
// 序列化向量为二进制
let embedding_blob = entry.embedding.as_ref()
.map(|e| serialize_embedding(e));
sqlx::query(
r#"
INSERT INTO memories
(id, content, embedding, category, source, user_id, session_id)
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)
"#
)
.bind(&id)
.bind(&entry.content)
.bind(embedding_blob.as_deref())
.bind(format!("{:?}", entry.metadata.category))
.bind(&entry.metadata.source)
.bind(&entry.metadata.user_id)
.bind(&entry.metadata.session_id)
.execute(&self.pool)
.await?;
Ok(())
}
}
fn serialize_embedding(embedding: &[f32]) -> Vec<u8> {
// f32 转字节数组
embedding.iter()
.flat_map(|f| f.to_le_bytes())
.collect()
}
fn deserialize_embedding(bytes: &[u8]) -> Vec<f32> {
bytes.chunks_exact(4)
.map(|chunk| {
let mut arr = [0u8; 4];
arr.copy_from_slice(chunk);
f32::from_le_bytes(arr)
})
.collect()
}
为什么选择 SQLite?
| 特性 | SQLite | PostgreSQL | 说明 |
|---|---|---|---|
| 部署 | 零配置 | 需安装服务 | SQLite 开箱即用 |
| 存储 | 单文件 | 多文件 | SQLite 易于备份 |
| FTS5 | 内置 | 需插件 | SQLite 全文搜索原生支持 |
| 性能 | 极好(单机) | 极好(多机) | ZeroClaw 单机为主 |
| 扩展性 | 有限 | 优秀 | 可通过配置切换 |
三、记忆检索流程 🔗
3.1 混合搜索原理 🔗
flowchart TB
START["用户查询"] --> PARALLEL["并行检索"]
PARALLEL --> VECTOR_SEARCH["向量搜索"]
PARALLEL --> KEYWORD_SEARCH["关键词搜索"]
subgraph 向量搜索分支["🔵 语义搜索"]
V1["生成查询向量"]
V2["余弦相似度计算"]
V3["返回 Top-K 结果
带相似度分数"]
end
subgraph 关键词搜索分支["🟢 字面搜索"]
K1["解析查询关键词"]
K2["FTS5 BM25 评分"]
K3["返回 Top-K 结果
带 BM25 分数"]
end
VECTOR_SEARCH --> V1 --> V2 --> V3
KEYWORD_SEARCH --> K1 --> K2 --> K3
V3 --> MERGE["结果合并"]
K3 --> MERGE
subgraph 合并算法["🟣 加权合并 (RRF)"]
M1["归一化向量分数"]
M2["归一化关键词分数"]
M3["加权求和
默认: 0.7 向量 + 0.3 关键词"]
M4["重排序
Reciprocal Rank Fusion"]
end
MERGE --> M1 --> M2 --> M3 --> M4
M4 --> RESULT["最终结果列表"]
style MERGE fill:#f9f,stroke:#333,stroke-width:3px
style M4 fill:#f9f,stroke:#333,stroke-width:3px
3.2 向量搜索实现 🔗
// src/memory/vector_search.rs
pub struct VectorSearch {
pool: SqlitePool,
}
impl VectorSearch {
/// 使用余弦相似度搜索
pub async fn search(
&self,
query_embedding: &[f32],
limit: usize
) -> Result<Vec<SearchResult>> {
// SQLite 没有原生向量索引,需要手动计算
// 生产环境建议使用 pgvector 或专门的向量数据库
let rows = sqlx::query_as::<_, MemoryRow>(
"SELECT id, content, embedding, created_at FROM memories WHERE embedding IS NOT NULL"
)
.fetch_all(&self.pool)
.await?;
let mut results: Vec<SearchResult> = rows
.into_iter()
.filter_map(|row| {
let embedding = row.embedding?;
let similarity = cosine_similarity(query_embedding, &embedding);
// 过滤低相似度结果
if similarity < 0.5 {
return None;
}
Some(SearchResult {
id: row.id,
content: row.content,
similarity,
created_at: row.created_at,
})
})
.collect();
// 按相似度排序
results.sort_by(|a, b| {
b.similarity.partial_cmp(&a.similarity).unwrap()
});
results.truncate(limit);
Ok(results)
}
}
/// 余弦相似度计算
fn cosine_similarity(a: &[f32], b: &[f32]) -> f32 {
let dot_product: f32 = a.iter().zip(b.iter()).map(|(x, y)| x * y).sum();
let norm_a: f32 = a.iter().map(|x| x * x).sum::<f32>().sqrt();
let norm_b: f32 = b.iter().map(|x| x * x).sum::<f32>().sqrt();
dot_product / (norm_a * norm_b + 1e-8) // 避免除零
}
为什么用余弦相似度?
- 对向量长度不敏感(只关心方向)
- 计算简单高效
- 范围 [-1, 1],易于解释
3.3 关键词搜索实现 🔗
// src/memory/keyword_search.rs
impl SqliteMemory {
pub async fn keyword_search(
&self,
query: &str,
limit: usize,
) -> Result<Vec<SearchResult>> {
// 使用 FTS5 的 BM25 评分
let rows = sqlx::query_as::<_, MemoryRow>(
r#"
SELECT m.id, m.content, m.created_at,
rank as score
FROM memories_fts fts
JOIN memories m ON m.rowid = fts.rowid
WHERE memories_fts MATCH ?1
ORDER BY rank
LIMIT ?2
"#
)
.bind(query)
.bind(limit as i64)
.fetch_all(&self.pool)
.await?;
Ok(rows.into_iter().map(|r| SearchResult {
id: r.id,
content: r.content,
score: r.score,
created_at: r.created_at,
}).collect())
}
}
FTS5 的优势:
- 原生 SQLite 支持,无需额外依赖
- 自动维护索引(触发器同步)
- BM25 评分算法(行业标准)
3.4 RRF 合并算法 🔗
// src/memory/hybrid_search.rs
pub struct HybridSearcher {
vector_weight: f32,
keyword_weight: f32,
k: f32, // RRF 常数
}
impl HybridSearcher {
pub fn new(vector_weight: f32, keyword_weight: f32) -> Self {
Self {
vector_weight,
keyword_weight,
k: 60.0, // 平滑常数
}
}
pub fn merge(
&self,
vector_results: Vec<SearchResult>,
keyword_results: Vec<SearchResult>,
limit: usize,
) -> Vec<SearchResult> {
let mut scores: HashMap<String, f32> = HashMap::new();
// 向量分数(RRF)
for (rank, result) in vector_results.iter().enumerate() {
let rrf_score = self.vector_weight * (1.0 / (self.k + rank as f32));
*scores.entry(result.id.clone()).or_insert(0.0) += rrf_score;
}
// 关键词分数(RRF)
for (rank, result) in keyword_results.iter().enumerate() {
let rrf_score = self.keyword_weight * (1.0 / (self.k + rank as f32));
*scores.entry(result.id.clone()).or_insert(0.0) += rrf_score;
}
// 合并结果
let mut merged: Vec<_> = scores.into_iter()
.map(|(id, score)| {
// 找到原始内容
let content = vector_results.iter()
.chain(keyword_results.iter())
.find(|r| r.id == id)
.map(|r| r.content.clone())
.unwrap_or_default();
SearchResult { id, content, score }
})
.collect();
// 按分数排序
merged.sort_by(|a, b| b.score.partial_cmp(&a.score).unwrap());
merged.truncate(limit);
merged
}
}
RRF 公式:
score = Σ (weight_i / (k + rank_i))
其中:
- weight_i: 第 i 个搜索方法的权重
- rank_i: 结果在第 i 个方法中的排名(从0开始)
- k = 60: 平滑常数,防止高排名差异过大
为什么用 RRF?
- 不需要归一化不同分数类型
- 对排名位置敏感,对绝对分数不敏感
- 已被证明在混合搜索中效果优秀
四、记忆生命周期管理 🔗
4.1 记忆状态流转 🔗
stateDiagram-v2
[*] --> 临时缓存: 对话产生
临时缓存 --> 短期记忆: auto_save = true
临时缓存 --> 丢弃: 会话结束
且 auto_save = false
短期记忆 --> 长期记忆: 重要度评估
短期记忆 --> 归档: 过期策略
长期记忆 --> 归档: 长期未访问
长期记忆 --> 短期记忆: 重新激活
归档 --> 删除: 清理策略
归档 --> 长期记忆: 手动恢复
删除 --> [*]
丢弃 --> [*]
note right of 临时缓存
内存中暂存
会话级别
end note
note right of 短期记忆
SQLite 存储
最近 N 天
end note
note right of 长期记忆
持久化存储
重要记忆
end note
4.2 重要度评估算法 🔗
// src/memory/importance.rs
pub struct ImportanceScorer;
impl ImportanceScorer {
/// 评估记忆重要度
pub fn score(entry: &MemoryEntry) -> f32 {
let mut score = 0.0;
// 1. 内容长度(适中长度得分高)
let len = entry.content.len();
if len > 50 && len < 1000 {
score += 0.2;
}
// 2. 包含关键词
let important_keywords = ["bug", "error", "fix", "solution", "config"];
for kw in &important_keywords {
if entry.content.to_lowercase().contains(kw) {
score += 0.1;
}
}
// 3. 用户确认(如果用户说"记住这个")
if entry.content.contains("记住") || entry.content.contains("save this") {
score += 0.3;
}
// 4. 工具执行结果
if matches!(entry.metadata.category, Category::ToolResult) {
score += 0.2;
}
// 5. 时间衰减(越新越重要)
let days_old = entry.created_at.signed_duration_since(Utc::now()).num_days();
score += (1.0 / (1.0 + days_old as f32)) * 0.2;
score.min(1.0)
}
}
五、配置与优化 🔗
5.1 记忆系统配置 🔗
[memory]
# 存储后端
backend = "sqlite" # sqlite | postgres | lucid | markdown | none
# 自动保存
auto_save = true
# 嵌入服务
embedding_provider = "openai" # openai | local | none
# 混合搜索权重
vector_weight = 0.7
keyword_weight = 0.3
# 检索限制
recall_limit = 5 # 默认召回数量
similarity_threshold = 0.5 # 最小相似度
# 清理策略
max_memory_age_days = 90 # 自动清理超过90天的记忆
max_memories_total = 10000 # 最多保留1万条
5.2 不同场景的优化建议 🔗
场景1:本地开发(快速迭代)
[memory]
backend = "sqlite"
auto_save = true
embedding_provider = "none" # 关闭嵌入,节省成本
场景2:生产环境(高性能)
[memory]
backend = "sqlite"
auto_save = true
embedding_provider = "openai"
vector_weight = 0.7
keyword_weight = 0.3
场景3:隐私优先(纯本地)
[memory]
backend = "markdown" # 纯文本存储
auto_save = true
embedding_provider = "none"
附录:相似度计算可视化 🔗
flowchart LR
subgraph 查询["用户查询"]
Q["如何部署 ZeroClaw?"]
QV["查询向量
[0.1, 0.3, -0.2, ...]"]
end
subgraph 候选记忆["候选记忆"]
M1["ZeroClaw 安装指南"]
M1V["向量 A
[0.12, 0.28, -0.18, ...]"]
S1["相似度: 0.95
🟢 高相关"]
M2["Docker 使用教程"]
M2V["向量 B
[0.08, 0.35, -0.15, ...]"]
S2["相似度: 0.82
🟡 中相关"]
M3["Rust 编程入门"]
M3V["向量 C
[-0.2, 0.1, 0.5, ...]"]
S3["相似度: 0.23
🔴 低相关"]
end
Q --> QV
M1 --> M1V --> S1
M2 --> M2V --> S2
M3 --> M3V --> S3
QV -.->|"余弦相似度"| M1V
QV -.->|"余弦相似度"| M2V
QV -.->|"余弦相似度"| M3V
style S1 fill:#afa
style S2 fill:#ffa
style S3 fill:#faa