Skip to content

🧠 [Roadmap v2.6.0 - v3.0.0] 认知科学驱动的记忆系统重构:遗忘曲线 + 情节分割 + 语义记忆层 + 矛盾检测 #97

@SonicBotMan

Description

@SonicBotMan

📋 背景与动机

由 Perplexity AI(Claude Sonnet 4.6)基于认知科学论文 + 代码审查后提出

当前 v2.5.0 的设计是工程驱动的:DAG 层次压缩 + TF-IDF 静态评分 + 固定 token 分块。这套方案解决了「消息不丢失」的问题,但缺少对人脑记忆工作机制的建模。

2024-2026 的顶尖论文(EM-LLM @ ICLR 2025、HiMem @ arXiv 2601.06377、FOREVER @ arXiv 2601.03938、Graph-based Memory @ arXiv 2602.05665)表明:把认知科学原理引入 LLM 记忆管理,可以在不增加 token 成本的前提下大幅提升上下文质量。

本 Issue 提出 4 个渐进式优化方向,并附完整可执行代码,规划为 v2.6.0(P0)和 v3.0.0(P1/P2)两个里程碑。


🗺️ 总体路线图

v2.5.1  ← 修复 Bug(见 Issue #95)
  │
v2.6.0  ← P0:遗忘曲线动态评分 + 情节边界分割
  │         预计开发周期:2 周
  │
v3.0.0  ← P1:笔记层(语义记忆)+ 矛盾检测
  │         预计开发周期:4 周
  │
v3.x.0  ← P2:图谱化关系边 + 信息密度上下文组装(长期)

🔴 P0 — v2.6.0(先做,收益高、改动小)


Feature 1:遗忘曲线驱动的动态重要性衰减

理论基础

Ebbinghaus 遗忘曲线:记忆保留率 R(t) = e^(-t/S),其中 t 是遗忘时间(天),S 是「稳定性参数」(由记忆类型决定)。FOREVER 论文(arXiv 2601.03938)将此模型引入 LLM 记忆管理,在长对话 benchmark 上提升 18% 的关键信息召回率。

当前问题tfidf_score 是入库时一次性计算的静态值,一条 chitchat 消息和一条 decision 消息 30 天后的「重要性」是一样的,这不符合认知规律。

核心改动

  1. database.py 新增 last_accessed_at 字段
  2. tfidf_scorer.py 新增 compute_dynamic_score() 方法
  3. incremental_compressor.py 中压缩决策改用动态分数
  4. lobster_grep 命中消息后更新 last_accessed_at(触发「记忆巩固」)

实现代码

Step 1:database.py 新增字段和迁移

# src/database.py

def migrate_v26(self):
    """v2.6.0 schema 迁移:支持遗忘曲线动态评分"""
    migrations = [
        "ALTER TABLE messages ADD COLUMN last_accessed_at TEXT",
        "ALTER TABLE messages ADD COLUMN access_count INTEGER DEFAULT 0",
        "ALTER TABLE messages ADD COLUMN stability REAL DEFAULT 14.0",
    ]
    for sql in migrations:
        try:
            self.cursor.execute(sql)
        except sqlite3.OperationalError:
            pass
    self.conn.commit()

def touch_message(self, message_id: str):
    """
    更新消息的最后访问时间和访问次数。
    每次 lobster_grep 命中时调用,模拟「记忆巩固」——
    被引用的记忆重置衰减计数器,稳定性提升。
    """
    now = datetime.utcnow().isoformat()
    self.cursor.execute("""
        UPDATE messages
        SET last_accessed_at = ?,
            access_count = access_count + 1,
            stability = stability * 1.3  -- 每次访问稳定性提升 30%(间隔重复效应)
        WHERE message_id = ?
    """, (now, message_id))
    self.conn.commit()

def get_messages_with_dynamic_score(
    self, conversation_id: str, current_time: datetime = None
) -> List[Dict]:
    """获取消息列表,附带实时计算的动态重要性分数"""
    if current_time is None:
        current_time = datetime.utcnow()
    
    messages = self.get_messages(conversation_id)
    for msg in messages:
        msg['dynamic_score'] = self._compute_retention(
            msg, current_time
        )
    return messages

def _compute_retention(self, msg: Dict, current_time: datetime) -> float:
    """
    R(t) = base_score * e^(-t / stability)
    
    stability(半衰期天数)按消息类型设置:
      decision : 90 天  — 架构决策应该长期记住
      config   : 120 天 — 配置信息极少变化
      code     : 60 天  — 代码片段中期保留
      error    : 30 天  — 错误日志短期高价值
      chitchat : 3 天   — 闲聊迅速归零
      unknown  : 14 天  — 默认两周
    """
    STABILITY_MAP = {
        'decision': 90.0,
        'config':   120.0,
        'code':     60.0,
        'error':    30.0,
        'chitchat': 3.0,
        'question': 7.0,
        'unknown':  14.0,
    }
    
    base_score = msg.get('tfidf_score', 1.0) + msg.get('structural_bonus', 0.0)
    msg_type = msg.get('msg_type', 'unknown')
    stability = msg.get('stability') or STABILITY_MAP.get(msg_type, 14.0)
    
    # 上次访问时间(优先用 last_accessed_at,其次 created_at)
    ref_time_str = msg.get('last_accessed_at') or msg.get('created_at')
    try:
        ref_time = datetime.fromisoformat(ref_time_str)
        delta_days = (current_time - ref_time).total_seconds() / 86400.0
    except Exception:
        delta_days = 0.0
    
    import math
    retention = math.exp(-max(delta_days, 0) / stability)
    
    # compression_exempt 的消息保留率不衰减
    if msg.get('compression_exempt'):
        return base_score  # 不乘衰减系数
    
    return base_score * retention

Step 2:incremental_compressor.py 压缩决策改用动态分数

# src/incremental_compressor.py

def _select_compression_candidates(
    self, conversation_id: str, exclude_fresh_tail: bool = True
) -> List[Dict]:
    """
    v2.6.0:改用动态分数排序,低保留率消息优先被压缩。
    (替换原来按 tfidf_score 静态排序的逻辑)
    """
    messages = self.db.get_messages_with_dynamic_score(conversation_id)
    
    if exclude_fresh_tail:
        messages = messages[:-self.fresh_tail_count] if len(messages) > self.fresh_tail_count else []
    
    # 按动态分数升序——分数最低(最「应该被遗忘」)的优先压缩
    return sorted(messages, key=lambda m: m.get('dynamic_score', 0.0))

Step 3:agent_tools.py lobster_grep 命中后触发记忆巩固

# src/agent_tools.py lobster_grep() 函数末尾添加

    # v2.6.0:命中的消息触发「记忆巩固」,重置衰减计数器
    if results:
        for r in results:
            if r['type'] == 'message':
                db.touch_message(r['id'])
    
    return results[:limit]

Feature 2:情节边界分割(Event Segmentation)

理论基础

EM-LLM(ICLR 2025)将认知科学的 Event Segmentation Theory 引入 LLM,在 InfiniteBench 上达到 SOTA。核心思路:当预测下一个 token 的「意外程度」(surprisal)显著升高时,大脑自动标记一个新情节的开始。 对应到对话:话题切换、超过 1 小时的时间断层、角色切换处才是真正的分块边界。

当前问题dag_compressor.py 按固定 leaf_chunk_tokens=20000 切分,完全无视语义边界,导致同一话题被切成两个摘要,相关话题又被强行合并。

新增文件:src/pipeline/event_segmenter.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Event Segmenter - 情节边界检测

基于 EM-LLM (ICLR 2025) 的情节分割理论:
- 话题突变(TF-IDF 余弦相似度骤降)
- 时间断层(消息间隔 > threshold)
- 显式边界信号(system 消息、角色重置等)
"""

import re
import math
from typing import List, Dict, Tuple
from datetime import datetime
from collections import Counter


class EventSegmenter:
    """
    将消息序列切分为语义连贯的「情节」(episode)。
    每个情节独立压缩为一个叶子摘要,保证摘要内部话题一致性。
    """
    
    def __init__(
        self,
        similarity_threshold: float = 0.25,  # 低于此值判定为话题突变
        time_gap_seconds: int = 3600,         # 超过 1 小时判定为时间断层
        min_episode_tokens: int = 500,        # 情节最小 token 数(防止过度分割)
        max_episode_tokens: int = 20000,      # 情节最大 token 数(硬上限)
    ):
        self.similarity_threshold = similarity_threshold
        self.time_gap_seconds = time_gap_seconds
        self.min_episode_tokens = min_episode_tokens
        self.max_episode_tokens = max_episode_tokens
    
    def segment(self, messages: List[Dict]) -> List[List[Dict]]:
        """
        将消息列表分割为情节列表。
        
        Args:
            messages: 按 seq 排序的消息列表
        
        Returns:
            情节列表,每个情节是一个消息列表
        """
        if not messages:
            return []
        if len(messages) == 1:
            return [messages]
        
        boundaries = self._detect_boundaries(messages)
        return self._split_by_boundaries(messages, boundaries)
    
    def _detect_boundaries(self, messages: List[Dict]) -> List[int]:
        """
        返回边界位置索引列表(边界 = 新情节的起始索引)。
        索引 0 始终是边界(第一个情节的开始)。
        """
        boundaries = [0]
        
        for i in range(1, len(messages)):
            prev = messages[i - 1]
            curr = messages[i]
            
            if self._is_boundary(prev, curr, messages, i):
                boundaries.append(i)
        
        return boundaries
    
    def _is_boundary(self, prev: Dict, curr: Dict,
                     messages: List[Dict], idx: int) -> bool:
        """判断 prev -> curr 之间是否存在情节边界"""
        
        # 1. 显式边界:system 消息(角色切换、对话重置)
        if curr.get('role') == 'system':
            return True
        
        # 2. 时间断层
        time_gap = self._get_time_gap(prev, curr)
        if time_gap is not None and time_gap > self.time_gap_seconds:
            return True
        
        # 3. 话题突变(TF-IDF 余弦距离)
        prev_content = prev.get('content', '')
        curr_content = curr.get('content', '')
        if len(prev_content) > 20 and len(curr_content) > 20:
            similarity = self._cosine_similarity(
                self._tokenize(prev_content),
                self._tokenize(curr_content)
            )
            if similarity < self.similarity_threshold:
                return True
        
        # 4. 硬上限:当前情节累计 token 超过 max_episode_tokens
        # 找到最近一个边界,累计该边界到 idx 的 token
        # (简化:直接检查 idx 附近窗口的 token 总量)
        window = messages[max(0, idx - 50):idx + 1]
        window_tokens = sum(self._estimate_tokens(m.get('content', ''))
                            for m in window)
        if window_tokens > self.max_episode_tokens:
            return True
        
        return False
    
    def _split_by_boundaries(self, messages: List[Dict],
                              boundaries: List[int]) -> List[List[Dict]]:
        """按边界索引切分消息列表,并合并过小的情节"""
        episodes = []
        boundaries_set = set(boundaries)
        
        current_episode = []
        for i, msg in enumerate(messages):
            if i in boundaries_set and current_episode:
                episodes.append(current_episode)
                current_episode = []
            current_episode.append(msg)
        
        if current_episode:
            episodes.append(current_episode)
        
        # 合并过小的情节(避免碎片化)
        return self._merge_small_episodes(episodes)
    
    def _merge_small_episodes(self,
                               episodes: List[List[Dict]]) -> List[List[Dict]]:
        """将 token 数不足 min_episode_tokens 的情节与前一个情节合并"""
        if not episodes:
            return episodes
        
        merged = [episodes[0]]
        for episode in episodes[1:]:
            episode_tokens = sum(
                self._estimate_tokens(m.get('content', ''))
                for m in episode
            )
            if episode_tokens < self.min_episode_tokens:
                merged[-1] = merged[-1] + episode  # 合并到前一个
            else:
                merged.append(episode)
        
        return merged
    
    def _get_time_gap(self, prev: Dict, curr: Dict) -> float | None:
        """获取两条消息间的时间差(秒),解析失败返回 None"""
        try:
            t1 = datetime.fromisoformat(
                prev.get('created_at') or prev.get('timestamp', '')
            )
            t2 = datetime.fromisoformat(
                curr.get('created_at') or curr.get('timestamp', '')
            )
            return abs((t2 - t1).total_seconds())
        except Exception:
            return None
    
    def _tokenize(self, text: str) -> Counter:
        """简单分词(支持中英文),返回词频 Counter"""
        # 中文按字切分,英文按单词切分
        words = re.findall(r'[\u4e00-\u9fff]|[a-zA-Z]{2,}', text.lower())
        return Counter(words)
    
    def _cosine_similarity(self, a: Counter, b: Counter) -> float:
        """计算两个词频向量的余弦相似度"""
        if not a or not b:
            return 0.0
        
        common = set(a.keys()) & set(b.keys())
        dot = sum(a[w] * b[w] for w in common)
        norm_a = math.sqrt(sum(v ** 2 for v in a.values()))
        norm_b = math.sqrt(sum(v ** 2 for v in b.values()))
        
        if norm_a == 0 or norm_b == 0:
            return 0.0
        return dot / (norm_a * norm_b)
    
    def _estimate_tokens(self, text: str) -> int:
        """粗估 token 数(与 database.py 保持一致)"""
        chinese = sum(1 for c in text if '\u4e00' <= c <= '\u9fff')
        return int((len(text) - chinese) / 4 + chinese / 1.5)


# ==================== 单元测试 ====================

if __name__ == '__main__':
    segmenter = EventSegmenter()
    
    # 构造测试数据:两个明显不同话题
    msgs_topic_a = [
        {'role': 'user', 'content': '帮我设计数据库表结构,需要存用户信息', 'created_at': '2026-03-17T10:00:00'},
        {'role': 'assistant', 'content': '好的,用户表需要 id, name, email 字段', 'created_at': '2026-03-17T10:01:00'},
        {'role': 'user', 'content': '还需要存用户的登录时间和 IP', 'created_at': '2026-03-17T10:02:00'},
    ]
    msgs_topic_b = [
        {'role': 'user', 'content': '我们聊聊今天的午饭吧,想吃火锅', 'created_at': '2026-03-17T12:00:00'},
        {'role': 'assistant', 'content': '火锅不错!推荐川式鸳鸯锅', 'created_at': '2026-03-17T12:01:00'},
    ]
    
    episodes = segmenter.segment(msgs_topic_a + msgs_topic_b)
    print(f'✅ 情节数: {len(episodes)}')
    for i, ep in enumerate(episodes):
        print(f'  情节 {i+1}: {len(ep)} 条消息')
        print(f'    第一条: {ep[0]["content"][:40]}...')

Step 2:修改 dag_compressor.py 使用情节分割替代固定分块

# src/dag_compressor.py

from pipeline.event_segmenter import EventSegmenter

class DAGCompressor:
    def __init__(self, db, llm_client, ...):
        # ... 现有初始化代码 ...
        self.event_segmenter = EventSegmenter(
            similarity_threshold=0.25,
            time_gap_seconds=3600,
            min_episode_tokens=500,
            max_episode_tokens=self.leaf_chunk_tokens,
        )
    
    def compact_to_leaves(self, conversation_id: str,
                          messages: List[Dict]) -> List[str]:
        """
        v2.6.0:改用情节分割替代固定 token 分块。
        每个情节独立压缩为一个叶子摘要。
        """
        # v2.5.0 旧逻辑(删除):
        # chunks = self._split_by_tokens(messages, self.leaf_chunk_tokens)
        
        # v2.6.0 新逻辑:语义感知分块
        episodes = self.event_segmenter.segment(messages)
        
        summary_ids = []
        for episode in episodes:
            if not episode:
                continue
            summary_id = self._compress_episode_to_leaf(conversation_id, episode)
            if summary_id:
                summary_ids.append(summary_id)
        
        return summary_ids
    
    def _compress_episode_to_leaf(self, conversation_id: str,
                                   episode: List[Dict]) -> str | None:
        """将单个情节压缩为一个叶子摘要(原 _compress_chunk_to_leaf 逻辑)"""
        # 此处逻辑与原 _compress_chunk_to_leaf 相同,只是参数名从 chunk 改为 episode
        # ... 保持不变 ...
        pass

🟡 P1 — v3.0.0


Feature 3:笔记层(语义记忆 / Semantic Memory Layer)

理论基础

HiMem(arXiv 2601.06377)将记忆分为两轨:

  • 情节记忆(Episodic):「什么时候发生了什么」——对应现有 DAG
  • 语义记忆(Semantic):提炼出的稳定知识「这个用户喜欢 PostgreSQL」——对应新增 Notes 层

语义记忆始终注入上下文顶部,成本极低(< 500 tokens),但可以让 Agent 在每轮对话开始前就「知道」关键背景。

新增文件:src/semantic_memory.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Semantic Memory Layer - 语义记忆层

独立于 DAG 的稳定知识库,存储从对话中提炼的持久性事实。
每轮对话的上下文组装时,Notes 层始终注入在最前面。
"""

import json
import sqlite3
from typing import List, Dict, Optional
from datetime import datetime


NOTE_EXTRACTION_PROMPT = """
请从以下对话片段中提取「稳定的语义知识」,只提取明确陈述的事实,不要推断。

格式要求(JSON 数组):
[
  {"category": "preference", "content": "用户偏好使用 PostgreSQL"},
  {"category": "decision",   "content": "项目采用 React 18 + TypeScript"},
  {"category": "constraint", "content": "部署环境为 AWS,不能使用 GCP"}
]

类别说明:
- preference:用户/项目偏好
- decision:技术选型、架构决策
- constraint:硬性约束、限制条件
- fact:客观事实(版本号、API 端点等)

如果没有稳定知识,返回空数组 []。

对话片段:
{context}
"""


class SemanticMemory:
    """语义记忆层:管理从对话中提炼的稳定事实"""
    
    def __init__(self, db):
        self.db = db
        self._ensure_schema()
    
    def _ensure_schema(self):
        """创建 notes 表(如不存在)"""
        self.db.cursor.execute("""
            CREATE TABLE IF NOT EXISTS notes (
                note_id         TEXT UNIQUE NOT NULL,
                conversation_id TEXT NOT NULL,
                category        TEXT NOT NULL,
                content         TEXT NOT NULL,
                confidence      REAL DEFAULT 1.0,
                source_msg_ids  TEXT,        -- JSON 数组
                created_at      TEXT NOT NULL,
                updated_at      TEXT NOT NULL,
                superseded_by   TEXT,        -- 被矛盾更新时指向新 note_id
                FOREIGN KEY (conversation_id)
                    REFERENCES conversations(conversation_id)
            );
        """)
        self.db.cursor.execute("""
            CREATE INDEX IF NOT EXISTS idx_notes_conversation
            ON notes(conversation_id, category);
        """)
        self.db.conn.commit()
    
    def extract_and_store(
        self,
        conversation_id: str,
        messages: List[Dict],
        llm_client,          # 与 dag_compressor.py 相同的 LLM client
        source_msg_ids: List[str] = None
    ) -> List[str]:
        """
        调用 LLM 从消息中提取语义知识,存入 notes 表。
        通常在每次 DAG 叶子压缩后调用(一次 LLM 调用,顺带提取)。
        
        Returns:
            新创建的 note_id 列表
        """
        context = self._format_messages(messages)
        prompt = NOTE_EXTRACTION_PROMPT.format(context=context)
        
        try:
            response = llm_client.complete(prompt, max_tokens=500)
            notes_data = json.loads(response.strip())
        except Exception as e:
            print(f'⚠️ Note 提取失败: {e}')
            return []
        
        created_ids = []
        for note in notes_data:
            if not note.get('content') or not note.get('category'):
                continue
            note_id = self._save_note(
                conversation_id=conversation_id,
                category=note['category'],
                content=note['content'],
                source_msg_ids=source_msg_ids or []
            )
            if note_id:
                created_ids.append(note_id)
        
        return created_ids
    
    def _save_note(
        self,
        conversation_id: str,
        category: str,
        content: str,
        confidence: float = 1.0,
        source_msg_ids: List[str] = None
    ) -> Optional[str]:
        """保存单条 note,去重(相同内容不重复插入)"""
        # 简单去重:content 完全相同则跳过
        self.db.cursor.execute("""
            SELECT note_id FROM notes
            WHERE conversation_id = ? AND content = ? AND superseded_by IS NULL
        """, (conversation_id, content))
        if self.db.cursor.fetchone():
            return None  # 已存在
        
        import hashlib
        note_id = 'note_' + hashlib.sha256(
            (conversation_id + content).encode()
        ).hexdigest()[:16]
        now = datetime.utcnow().isoformat()
        
        self.db.cursor.execute("""
            INSERT INTO notes
            (note_id, conversation_id, category, content, confidence,
             source_msg_ids, created_at, updated_at)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
        """, (
            note_id, conversation_id, category, content, confidence,
            json.dumps(source_msg_ids or []), now, now
        ))
        self.db.conn.commit()
        return note_id
    
    def get_active_notes(
        self,
        conversation_id: str,
        categories: List[str] = None,
        max_tokens: int = 500
    ) -> List[Dict]:
        """
        获取当前生效的 notes(未被 supersede 的)。
        上下文组装时调用,注入上下文头部。
        """
        if categories:
            placeholders = ','.join('?' * len(categories))
            self.db.cursor.execute(f"""
                SELECT * FROM notes
                WHERE conversation_id = ?
                  AND category IN ({placeholders})
                  AND superseded_by IS NULL
                ORDER BY category, created_at
            """, [conversation_id] + categories)
        else:
            self.db.cursor.execute("""
                SELECT * FROM notes
                WHERE conversation_id = ? AND superseded_by IS NULL
                ORDER BY category, created_at
            """, (conversation_id,))
        
        notes = self._rows_to_notes()
        
        # token 预算控制
        result = []
        used = 0
        for note in notes:
            token_cost = len(note['content']) // 4 + 10
            if used + token_cost > max_tokens:
                break
            result.append(note)
            used += token_cost
        
        return result
    
    def format_for_context(self, notes: List[Dict]) -> str:
        """
        将 notes 格式化为注入上下文的文本块。
        
        输出示例:
        [背景知识]
        • [技术决策] 项目采用 React 18 + TypeScript
        • [约束条件] 部署环境为 AWS,不能使用 GCP
        • [用户偏好] 用户偏好使用 PostgreSQL
        """
        if not notes:
            return ''
        
        CATEGORY_LABELS = {
            'decision':   '技术决策',
            'constraint': '约束条件',
            'preference': '用户偏好',
            'fact':       '已知事实',
        }
        
        lines = ['[背景知识]']
        for note in notes:
            label = CATEGORY_LABELS.get(note['category'], note['category'])
            lines.append(f'• [{label}] {note["content"]}')
        
        return '\n'.join(lines)
    
    def _format_messages(self, messages: List[Dict]) -> str:
        """将消息列表格式化为提示文本"""
        lines = []
        for msg in messages[:20]:  # 最多传 20 条
            role = msg.get('role', 'unknown')
            content = msg.get('content', '')[:200]  # 截断长消息
            lines.append(f'{role}: {content}')
        return '\n'.join(lines)
    
    def _rows_to_notes(self) -> List[Dict]:
        """将 cursor 结果转为字典列表"""
        cols = [d[0] for d in self.db.cursor.description]
        return [dict(zip(cols, row)) for row in self.db.cursor.fetchall()]

Feature 4:记忆矛盾检测与自我修正

理论基础

当用户说「改用 MySQL」时,notes 表里已有「项目使用 PostgreSQL」,两条 notes 相互矛盾。SSGM / Truth Maintenance System(arXiv 2603.11768)提出用 NLI(自然语言推理)模型检测矛盾,矛盾发生时旧 note 标记 superseded_by 而非删除,保留完整溯源链。

新增文件:src/pipeline/conflict_detector.py

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Conflict Detector - 记忆矛盾检测

使用本地轻量 NLI 模型(零 API 成本)检测新消息是否与
已有 notes 产生矛盾,并触发「记忆重巩固」更新 notes 表。

推荐模型:cross-encoder/nli-deberta-v3-small(~90MB,本地运行)
备选方案:规则 + 关键词(零依赖,精度较低)
"""

from typing import List, Dict, Tuple, Optional
from dataclasses import dataclass


@dataclass
class ConflictResult:
    old_note_id: str
    old_content: str
    new_claim: str
    conflict_score: float  # 0.0 - 1.0,越高越矛盾


class ConflictDetector:
    """
    记忆矛盾检测器。
    
    优先使用 NLI 模型(高精度),
    不可用时自动降级为规则检测(零依赖)。
    """
    
    def __init__(self, use_nli: bool = True, nli_threshold: float = 0.85):
        self.nli_threshold = nli_threshold
        self.nli_model = None
        
        if use_nli:
            try:
                from sentence_transformers import CrossEncoder
                self.nli_model = CrossEncoder(
                    'cross-encoder/nli-deberta-v3-small',
                    max_length=256
                )
                print('✅ ConflictDetector: 使用 NLI 模型(高精度)')
            except ImportError:
                print('⚠️ ConflictDetector: sentence-transformers 未安装,降级为规则检测')
    
    def detect(
        self,
        new_message: Dict,
        existing_notes: List[Dict]
    ) -> List[ConflictResult]:
        """
        检测新消息是否与已有 notes 矛盾。
        
        Args:
            new_message: 新到达的消息
            existing_notes: 当前生效的 notes 列表
        
        Returns:
            矛盾结果列表(通常为空,偶尔 1-2 个)
        """
        content = new_message.get('content', '')
        if not content or len(content) < 10:
            return []
        
        conflicts = []
        for note in existing_notes:
            conflict = self._check_pair(note['content'], content)
            if conflict and conflict.conflict_score >= self.nli_threshold:
                conflict.old_note_id = note['note_id']
                conflicts.append(conflict)
        
        return conflicts
    
    def _check_pair(
        self, premise: str, hypothesis: str
    ) -> Optional[ConflictResult]:
        """检查单对 (premise, hypothesis) 是否矛盾"""
        if self.nli_model:
            return self._check_with_nli(premise, hypothesis)
        else:
            return self._check_with_rules(premise, hypothesis)
    
    def _check_with_nli(
        self, premise: str, hypothesis: str
    ) -> Optional[ConflictResult]:
        """使用 NLI 模型检测矛盾(entailment / neutral / contradiction)"""
        try:
            # CrossEncoder 输出 [entailment, neutral, contradiction] 分数
            scores = self.nli_model.predict(
                [(premise, hypothesis)],
                apply_softmax=True
            )[0]
            contradiction_score = float(scores[2])
            
            if contradiction_score >= self.nli_threshold:
                return ConflictResult(
                    old_note_id='',
                    old_content=premise,
                    new_claim=hypothesis,
                    conflict_score=contradiction_score
                )
        except Exception as e:
            print(f'⚠️ NLI 检测失败: {e}')
        return None
    
    def _check_with_rules(
        self, premise: str, hypothesis: str
    ) -> Optional[ConflictResult]:
        """
        规则降级检测:检查否定词 + 关键词共现。
        精度较低,但零依赖。
        """
        import re
        
        NEGATION_PATTERNS = [
            r'不(用|要|采用|使用|选择)',
            r'改(用|为|成)',
            r'放弃|弃用|替换|迁移到',
            r"don't use|switch to|replace|migrate to|no longer",
        ]
        
        # 提取 premise 中的关键词(技术名词、版本号等)
        tech_words = re.findall(
            r'[A-Z][a-zA-Z]+|[a-z]{3,}(?:\s+\d+\.\d+)?|[\u4e00-\u9fff]{2,4}',
            premise
        )
        
        # 检查 hypothesis 是否包含「否定 + 关键词」
        for word in tech_words:
            for neg in NEGATION_PATTERNS:
                pattern = neg + r'[^\n]{0,20}' + re.escape(word)
                if re.search(pattern, hypothesis, re.IGNORECASE):
                    return ConflictResult(
                        old_note_id='',
                        old_content=premise,
                        new_claim=hypothesis,
                        conflict_score=0.9  # 规则命中,给高分
                    )
        return None
    
    def reconcile(
        self,
        semantic_memory,    # SemanticMemory 实例
        conflicts: List[ConflictResult],
        new_message: Dict,
        conversation_id: str
    ):
        """
        执行记忆重巩固(Memory Reconsolidation):
        1. 将旧 note 标记为 superseded_by 新 note
        2. 创建反映新信息的 note
        3. 旧 note 不删除(保留溯源链)
        """
        for conflict in conflicts:
            # 创建新 note(反映最新信息)
            new_note_id = semantic_memory._save_note(
                conversation_id=conversation_id,
                category='decision',
                content=f'[更新] {conflict.new_claim[:200]}',
                confidence=0.9,
                source_msg_ids=[new_message.get('id', '')]
            )
            
            if new_note_id:
                # 标记旧 note 被取代(不删除,保留历史)
                semantic_memory.db.cursor.execute("""
                    UPDATE notes
                    SET superseded_by = ?, updated_at = ?
                    WHERE note_id = ?
                """, (new_note_id, 
                       datetime.utcnow().isoformat(),
                       conflict.old_note_id))
                semantic_memory.db.conn.commit()
                
                print(f'🔄 记忆重巩固: [{conflict.old_content[:40]}...] '
                      f'→ [{conflict.new_claim[:40]}...]')

将矛盾检测集成进 incremental_compressor.py

# src/incremental_compressor.py
# 在 on_new_message() 末尾、commit 之前插入

def on_new_message(self, conversation_id: str, message: Dict) -> Dict:
    # ... 现有逻辑(保存消息、触发压缩)...
    
    # v3.0.0:矛盾检测
    if hasattr(self, 'conflict_detector') and hasattr(self, 'semantic_memory'):
        active_notes = self.semantic_memory.get_active_notes(conversation_id)
        if active_notes:
            conflicts = self.conflict_detector.detect(message, active_notes)
            if conflicts:
                self.conflict_detector.reconcile(
                    self.semantic_memory, conflicts, message, conversation_id
                )
    
    return result

✅ 各阶段验收标准

v2.6.0 验收(Feature 1 + 2)

# tests/test_v260.py

def test_dynamic_score_decays_over_time():
    """chitchat 消息 30 天后动态分数 < 原始分数的 10%"""
    msg = {'tfidf_score': 10.0, 'msg_type': 'chitchat',
           'created_at': '2026-02-15T00:00:00', 'compression_exempt': False}
    score = db._compute_retention(msg, datetime(2026, 3, 17))
    assert score < 1.0  # 30 天后 chitchat 几乎归零

def test_touch_resets_decay():
    """touch_message 后动态分数回升"""
    db.save_message(test_msg)
    score_before = db.get_messages_with_dynamic_score('conv_1')[0]['dynamic_score']
    db.touch_message(test_msg['id'])
    score_after = db.get_messages_with_dynamic_score('conv_1')[0]['dynamic_score']
    assert score_after >= score_before

def test_event_segmenter_splits_topics():
    """明显不同的话题应被分割为 2 个情节"""
    msgs = db_topic_a + db_topic_b  # 数据库设计 + 午饭话题
    episodes = segmenter.segment(msgs)
    assert len(episodes) == 2

def test_event_segmenter_respects_time_gap():
    """超过 1 小时的时间断层触发新情节"""
    msgs = make_msgs_with_gap(gap_seconds=7200)
    episodes = segmenter.segment(msgs)
    assert len(episodes) >= 2

def test_event_segmenter_merges_tiny_episodes():
    """单条消息的情节应被合并到前一个情节"""
    msgs = [*topic_a_msgs, single_msg, *topic_a_continued]
    episodes = segmenter.segment(msgs)
    # single_msg 应被合并,不单独成情节
    assert all(len(ep) > 1 for ep in episodes)

v3.0.0 验收(Feature 3 + 4)

# tests/test_v300.py

def test_notes_extracted_after_compression():
    """DAG 压缩后 notes 表中应有对应的语义知识"""
    # 包含明确决策的对话压缩后
    notes = sem_mem.get_active_notes('conv_decision')
    assert any('PostgreSQL' in n['content'] for n in notes)

def test_notes_injected_in_context():
    """上下文组装时 notes 应出现在文本头部"""
    context_text = sem_mem.format_for_context(notes)
    assert context_text.startswith('[背景知识]')

def test_conflict_detection_triggers_reconsolidation():
    """新消息与已有 note 矛盾时,旧 note 应被 supersede"""
    # 先存入 note: "项目使用 PostgreSQL"
    # 然后添加消息: "我们决定改用 MySQL"
    new_msg = {'content': '我们决定改用 MySQL', 'role': 'user'}
    conflicts = detector.detect(new_msg, active_notes)
    assert len(conflicts) == 1
    assert conflicts[0].old_content == '项目使用 PostgreSQL'
    
    detector.reconcile(sem_mem, conflicts, new_msg, 'conv_1')
    
    # 旧 note 应被标记 superseded,不再出现在 active notes
    active = sem_mem.get_active_notes('conv_1')
    assert not any('PostgreSQL' in n['content'] for n in active)

📦 新增依赖

# requirements.txt 新增(v2.6.0,全部可选,降级方案均已实现)
# Feature 4 矛盾检测(可选,不安装则自动降级为规则检测)
sentence-transformers>=3.0.0
torch>=2.0.0

Feature 1、2、3 零新增依赖,全部使用标准库 + 项目现有依赖实现。


🗂️ 文件改动清单

文件 操作 所属 Feature
src/database.py 修改:新增 touch_messageget_messages_with_dynamic_score_compute_retentionmigrate_v26 F1
src/incremental_compressor.py 修改:压缩候选改用动态分数排序;集成矛盾检测 F1, F4
src/pipeline/event_segmenter.py 新增 F2
src/dag_compressor.py 修改:compact_to_leaves 改用 EventSegmenter F2
src/agent_tools.py 修改:lobster_grep 命中后调用 touch_message F1
src/semantic_memory.py 新增 F3
src/pipeline/conflict_detector.py 新增 F4
tests/test_v260.py 新增 F1, F2
tests/test_v300.py 新增 F3, F4
requirements.txt 修改:新增可选依赖 F4

本 Issue 由 Perplexity AI(Claude Sonnet 4.6)基于以下论文自动生成

  • EM-LLM: Human-inspired Episodic Memory for Infinite Context LLMs (ICLR 2025)
  • HiMem: Hierarchical Long-Term Memory for LLM Long-Horizon Agents (arXiv 2601.06377)
  • FOREVER: Forgetting Curve-Inspired Memory Replay (arXiv 2601.03938)
  • Graph-based Agent Memory Survey (arXiv 2602.05665)
  • Governing Evolving Memory in LLM Agents (arXiv 2603.11768)
  • Cognitive Load Limits in Large Language Models (arXiv 2509.19517)

所有代码已可直接使用,欢迎在评论区讨论实现细节。

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions