Skip to content

feat(trace): add request-level trace metrics and API support#640

Merged
MaojiaSheng merged 1 commit intomainfrom
feat/request-trace-metrics
Mar 15, 2026
Merged

feat(trace): add request-level trace metrics and API support#640
MaojiaSheng merged 1 commit intomainfrom
feat/request-trace-metrics

Conversation

@zhoujh01
Copy link
Copy Markdown
Collaborator

@zhoujh01 zhoujh01 commented Mar 15, 2026

操作级 Telemetry 设计

1. 背景与目标

OpenViking 需要一套统一的 telemetry 机制,用来描述一次操作在执行过程中的关键观测信息。当前已经落地的是操作级 telemetry,主要覆盖:

  • 请求耗时统计
  • token 消耗统计
  • 检索、队列、内存提取等阶段的摘要指标

这里统一使用 telemetry,而不是 trace,原因是这套抽象未来不只服务于“单次操作链路”,还要能承载非操作级数据,例如:

  • 服务整体 token 消耗
  • 各类后端能力的延迟与错误率
  • 存储、向量库、队列等组件级指标
  • 基于 OpenTelemetry 的 exporter / backend 对接

当前实现只对“操作级 telemetry”提供正式接口,但抽象命名和结构已经为后续扩展预留空间。

2. 设计原则

2.1 详细信息显式按需返回

详细 telemetry 由调用方通过 telemetry 参数显式请求,当前对外协议只返回结构化 summary,不返回事件流。

2.3 字段名直接面向用户

内部打点名与对外 summary 字段名保持一致,避免额外的“内部名 -> 外部名”转换层。

2.4 缺失分组不返回

如果某类操作天然不会产出某个 summary 分组,则该分组直接省略,不返回空对象或全 null 字段。

例如:

  • resources.add_resource 不一定有 memory
  • session.commit 一般没有 semantic_nodes
  • 某些操作没有向量检索,就不返回 vector

3. 当前支持范围

3.1 HTTP 接口

当前已接入 operation telemetry 的接口:

  • POST /api/v1/search/find
  • POST /api/v1/search/search
  • POST /api/v1/resources
  • POST /api/v1/skills
  • POST /api/v1/sessions/{session_id}/commit

说明:

  • session.commit 仅在 wait=true 的同步模式下支持返回 telemetry
  • wait=false 的异步任务模式当前不支持 telemetry,请求时会返回 INVALID_ARGUMENT

3.2 SDK 接口

当前已接入 operation telemetry 的 SDK 方法:

  • add_resource
  • add_skill
  • find
  • search
  • commit_session

本地嵌入式 client 和 HTTP client 都遵循同一套 telemetry 请求语义。

4. 响应模型

服务端仍使用统一响应包裹结构:

{
  "status": "ok",
  "result": { "...": "..." },
  "time": 0.031,
  "telemetry": {
    "id": "tm_9f6f4d6b0d0c4f4d93ce5adf82e71c18",
    "summary": {
      "operation": "search.find",
      "status": "ok",
      "duration_ms": 31.224,
      "tokens": {
        "total": 24,
        "llm": {
          "input": 12,
          "output": 6,
          "total": 18
        },
        "embedding": {
          "total": 6
        }
      },
      "vector": {
        "searches": 3,
        "scored": 26,
        "passed": 8,
        "returned": 5,
        "scanned": 26,
        "scan_reason": ""
      }
    }
  }
}

说明:

  • telemetry 只在调用方显式请求时返回
  • telemetry.id 是不透明标识,只用于关联,不要求调用方解析语义

5. telemetry 请求语义

telemetry 字段支持两种形态:

5.1 布尔形态

{
  "telemetry": true
}

语义:

  • 返回 telemetry.id + telemetry.summary

5.2 对象形态

{
  "telemetry": {
    "summary": true
  }
}

语义:

  • summary 默认值为 true
  • 适合只看结构化摘要

当前支持的合法组合如下:

请求值 语义
false 不返回 telemetry
true 返回 id + summary
{"summary": true} 返回 id + summary
{"summary": false} 不返回 telemetry

以下请求非法:

{
  "telemetry": {
    "events": true
  }
}

原因是当前对外 telemetry 已收敛为 summary-only,不再接受事件流选择参数。

6. telemetry 的职责划分

6.1 telemetry.summary

summary 是结构化的操作摘要,用于:

  • 调试
  • 排障
  • 离线分析
  • 上报到外部观测系统

当前 summary 的核心字段包括:

  • operation
  • status
  • duration_ms
  • tokens
  • queue
  • vector
  • semantic_nodes
  • memory
  • errors

其中:

  • tokens 始终存在
  • 其余分组按是否有产出决定是否返回

6.3 telemetry.id

telemetry.id 是请求级关联标识,用于把一次操作的 summary 与内部异步链路统计关联起来。

7. summary 字段约定

7.1 顶层公共字段

所有 summary 至少包含:

  • operation
  • status
  • duration_ms
  • tokens

7.2 tokens

示例:

{
  "tokens": {
    "total": 19,
    "llm": {
      "input": 11,
      "output": 7,
      "total": 18
    },
    "embedding": {
      "total": 1
    }
  }
}

说明:

  • llm 统计输入、输出与总量
  • embedding 当前只统计总量

7.3 queue

队列相关摘要示例:

{
  "queue": {
    "semantic": {
      "processed": 1,
      "error_count": 0
    },
    "embedding": {
      "processed": 1,
      "error_count": 0
    }
  }
}

7.4 vector

向量检索摘要示例:

{
  "vector": {
    "searches": 2,
    "scored": 5,
    "passed": 3,
    "returned": 2,
    "scanned": 5,
    "scan_reason": ""
  }
}

7.5 semantic_nodes

语义检索 DAG / 节点级摘要示例:

{
  "semantic_nodes": {
    "total": 4,
    "done": 3,
    "pending": 1,
    "running": 0
  }
}

7.6 memory

会话提交等内存提取类操作示例:

{
  "memory": {
    "extracted": 4
  }
}

7.7 errors

发生错误时可返回:

{
  "errors": {
    "stage": "resource_processor.parse",
    "error_code": "PROCESSING_ERROR",
    "message": "..."
  }
}

无错误时,该分组可以省略。

8. 缺失字段裁剪策略

summary 采用“按分组裁剪”的策略,而不是固定返回整套字段。

这样做有几个直接收益:

  • 避免返回大量与当前操作无关的空字段
  • 降低调用方理解成本
  • 更适合未来扩展新的 telemetry 分组

示例:

8.1 resources.add_resource

可能返回:

{
  "operation": "resources.add_resource",
  "status": "ok",
  "duration_ms": 152.3,
  "tokens": { "...": "..." },
  "semantic_nodes": { "...": "..." },
  "queue": { "...": "..." }
}

这里不应强行返回 memory

8.2 session.commit

可能返回:

{
  "operation": "session.commit",
  "status": "ok",
  "duration_ms": 48.1,
  "tokens": { "...": "..." },
  "memory": {
    "extracted": 4
  }
}

这里不应强行返回 semantic_nodes

9. 成本模型

当前 collector 只采集 summary 所需的数据:

  • 采集 counters / gauges
  • 记录 error 状态
  • 构造最终 summary
  • 不保留事件列表

10. 实现结构

10.1 核心类型

核心实现位于:

  • openviking/telemetry/operation.py
  • openviking/telemetry/request.py
  • openviking/telemetry/context.py
  • openviking/telemetry/registry.py

主要对象包括:

  • OperationTelemetry
  • TelemetrySnapshot
  • TelemetrySelection

10.2 请求解析

openviking/telemetry/request.py 负责统一解析 telemetry 请求参数:

  • 支持 bool | object
  • 归一化为 TelemetrySelection
  • 校验非法字段,例如 events

这样 server、local client、HTTP client 都共享同一套语义。

10.3 服务端集成

openviking/server/telemetry.py 负责:

  • 根据请求创建 collector
  • 根据 selection 决定是否附带 summary

router 层的职责是:

  1. 创建 collector
  2. 绑定 operation 上下文
  3. 执行实际业务逻辑
  4. 按请求返回 telemetry

10.4 本地与 HTTP client

本地 client 和 HTTP client 都暴露同样的 telemetry 参数语义:

await client.find("memory dedup", telemetry=True)
await client.find("memory dedup", telemetry={"summary": True})

其中:

  • local client 在本地生成 telemetry 并拼回结果
  • HTTP client 负责参数校验并透传给服务端

11. 异步链路与跨组件聚合

当前 operation telemetry 不只覆盖同步请求栈,也支持部分异步处理链路的数据回流。

典型场景包括:

  • 请求线程触发语义队列处理
  • 请求线程触发 embedding 处理
  • 后台处理线程继续向同一个 operation collector 记录指标

实现方式是:

  • collector 生成 telemetry.id
  • 后续消息携带该 id
  • 后台组件通过 registry 找回原 collector
  • 在新的执行上下文中重新绑定 collector

这样一次操作的最终 summary 可以覆盖:

  • 请求入口逻辑
  • 检索过程
  • embedding 处理
  • semantic queue 处理
  • memory 提取结果

12. 与 OpenTelemetry 的关系

当前方案不是直接把 OpenTelemetry 暴露为业务接口,而是先定义 OpenViking 自己的 telemetry 抽象。

这样做的好处是:

  • 对调用方暴露稳定、简单的产品接口
  • 不把业务接口和具体观测框架强绑定
  • 后续可以新增 OpenTelemetry backend,而不影响现有 SDK / HTTP 语义

可以把 OpenTelemetry 看作未来的一种底层实现或导出方式,而不是当前对外协议本身。

13. 未来扩展方向

当前文档描述的是 operation telemetry,但未来需要兼容更广义的 telemetry 数据源。

推荐的扩展方向:

  • 服务级 token 消耗聚合
  • 存储、向量库、模型服务的接口耗时
  • 队列吞吐、失败率、积压长度
  • 与 OpenTelemetry exporter 的桥接
  • 更长期的指标聚合、采样和导出

这些扩展不要求沿用完全相同的 summary schema,但应复用统一的 telemetry 抽象和运行时。

14. 使用示例

14.1 返回 telemetry summary

curl -X POST http://localhost:8080/api/v1/search/find \
  -H 'Content-Type: application/json' \
  -d '{
    "query": "memory dedup",
    "limit": 5,
    "telemetry": true
  }'

14.2 只返回 summary

curl -X POST http://localhost:8080/api/v1/search/find \
  -H 'Content-Type: application/json' \
  -d '{
    "query": "memory dedup",
    "limit": 5,
    "telemetry": {
      "summary": true
    }
  }'

14.4 Python SDK

result = await client.find("memory dedup", telemetry={"summary": True})

print(result.telemetry["summary"]["tokens"]["total"])

15. 新接口接入规范

新接口如果需要接入 operation telemetry,建议遵循以下规则:

  1. 为该操作创建 OperationTelemetry collector。
  2. 用上下文绑定覆盖整个操作生命周期。
  3. 在内部关键阶段记录 counters、gauges 和错误状态。
  4. 仅在调用方请求时返回 telemetry
  5. summary 只返回本次操作真实产出的分组。

这样可以保持默认低成本,同时为调用方提供稳定、可分析的结构化摘要。

@github-actions
Copy link
Copy Markdown

PR Reviewer Guide 🔍

Here are some key observations to aid the review process:

⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
🧪 PR contains tests
🔒 No security concerns identified
⚡ Recommended focus areas for review

Premature Stats Eviction

The request stats order list contains duplicate telemetry IDs, which can lead to evicting stats for a telemetry ID that still has pending operations.

def _merge_request_stats(
    cls, telemetry_id: str, processed: int = 0, error_count: int = 0
) -> None:
    if not telemetry_id:
        return
    with cls._request_stats_lock:
        stats = cls._request_stats_by_telemetry_id.setdefault(telemetry_id, RequestQueueStats())
        stats.processed += processed
        stats.error_count += error_count
        cls._request_stats_order.append(telemetry_id)
        if len(cls._request_stats_order) > cls._max_cached_stats:
            old_telemetry_id = cls._request_stats_order.pop(0)
            if (
                old_telemetry_id != telemetry_id
                and old_telemetry_id in cls._request_stats_by_telemetry_id
            ):
                cls._request_stats_by_telemetry_id.pop(old_telemetry_id, None)
Premature Stats Eviction

The DAG stats order list contains duplicate entries, which can lead to evicting stats for a telemetry ID/URI that still has pending operations.

@classmethod
def _cache_dag_stats(cls, telemetry_id: str, uri: str, stats: DagStats) -> None:
    with cls._stats_lock:
        if telemetry_id:
            cls._dag_stats_by_telemetry_id[telemetry_id] = stats
        cls._dag_stats_by_uri[uri] = stats
        cls._dag_stats_order.append((telemetry_id, uri))
        if len(cls._dag_stats_order) > cls._max_cached_stats:
            old_telemetry_id, old_uri = cls._dag_stats_order.pop(0)
            if old_telemetry_id:
                cls._dag_stats_by_telemetry_id.pop(old_telemetry_id, None)
            cls._dag_stats_by_uri.pop(old_uri, None)
API Response Type Change

When telemetry is present and the original result is None, the method returns a dict instead of None, which may break existing callers.

def _attach_telemetry(result: Any, response_data: Dict[str, Any]) -> Any:
    telemetry = response_data.get("telemetry")
    if telemetry is None:
        return result

    if result is None:
        payload: Dict[str, Any] = {}
        payload["telemetry"] = telemetry
        return payload

    if isinstance(result, dict):
        result["telemetry"] = telemetry
        return result

    return result
Unrelated Example Addition

The memory demo example seems unrelated to the telemetry feature and may not belong in this PR.

"""Memory skill test demo covering T1-T10 with real OpenViking pipeline.

This script runs real memory extraction/dedup/merge against OpenViking, prints
human-readable traces, and outputs pass/fail for each test case.

Usage:
  export OPENVIKING_CONFIG_FILE=ov.conf
  python examples/memory_test_demo.py --verbose
"""

from __future__ import annotations

import argparse
import hashlib
import json
import os
import shutil
from dataclasses import asdict, dataclass, field
from pathlib import Path
from typing import Any, Callable, Dict, Iterable, List, Optional, Set, Tuple

from openviking.message.part import TextPart
from openviking.session.memory_deduplicator import MemoryDeduplicator
from openviking.sync_client import SyncOpenViking


@dataclass
class Turn:
    session_key: str
    user_text: str
    assistant_text: str = "收到。"


@dataclass
class QueryCheck:
    query: str
    require_groups: List[List[str]] = field(default_factory=list)
    forbidden_groups: List[List[str]] = field(default_factory=list)
    min_hits: int = 1


@dataclass
class CaseSpec:
    case_id: str
    title: str
    turns: List[Turn]
    checks: List[QueryCheck]
    expected_categories: Set[str] = field(default_factory=set)
    expect_merge_action: bool = False
    expect_delete_action: bool = False
    expect_skip_decision: bool = False
    expect_none_decision: bool = False
    expect_no_new_memory: bool = False
    expect_merge_from_session_key: str = ""
    max_created_files: Optional[int] = None


@dataclass
class Hit:
    query: str
    uri: str
    abstract: str
    content: str
    score: Optional[float]
    target_uri: str


@dataclass
class DedupRecord:
    round_name: str
    source_session: str
    category: str
    decision: str
    candidate_abstract: str
    actions: List[Dict[str, str]]


@dataclass
class CaseResult:
    case_id: str
    title: str
    passed: bool
    reasons: List[str]
    created: List[str]
    deleted: List[str]
    changed: List[str]


class DedupRecorder:
    """Collect runtime dedup decisions without modifying production modules."""

    def __init__(self) -> None:
        self.records: List[DedupRecord] = []
        self.current_round: str = ""
        self._original: Optional[Callable[..., Any]] = None

    def install(self) -> None:
        if self._original is not None:
            return

        self._original = MemoryDeduplicator.deduplicate
        recorder = self

        async def _wrapped(self_dedup, candidate):
            result = await recorder._original(self_dedup, candidate)
            recorder.records.append(
                DedupRecord(
                    round_name=recorder.current_round,
                    source_session=candidate.source_session,
                    category=candidate.category.value,
                    decision=result.decision.value,
                    candidate_abstract=candidate.abstract,
                    actions=[
                        {
                            "decision": action.decision.value,
                            "uri": action.memory.uri,
                            "reason": action.reason,
                        }
                        for action in (result.actions or [])
                    ],
                )
            )
            return result

        MemoryDeduplicator.deduplicate = _wrapped

    def uninstall(self) -> None:
        if self._original is not None:
            MemoryDeduplicator.deduplicate = self._original
            self._original = None


def _print_section(title: str, body: str = "") -> None:
    print("\n" + "=" * 90)
    print(title)
    if body:
        print("-" * 90)
        print(body)


def _safe_list(items: Iterable[Any]) -> List[Any]:
    try:
        return list(items)
    except Exception:
        return []


def _safe_float(value: Any) -> Optional[float]:
    try:
        return float(value)
    except Exception:
        return None


def _hash_text(text: str) -> str:
    return hashlib.sha1(text.encode("utf-8", errors="ignore")).hexdigest()


def _collect_memory_snapshot(client: SyncOpenViking) -> Dict[str, str]:
    """Snapshot all memory files as uri -> content hash."""
    snapshot: Dict[str, str] = {}
    for root in ["viking://user/memories", "viking://agent/memories"]:
        try:
            entries = client.ls(root, recursive=True, simple=False)
        except Exception:
            continue

        for item in entries:
            if item.get("isDir"):
                continue
            uri = str(item.get("uri", ""))
            if not uri.endswith(".md"):
                continue
            if "/." in uri:
                continue
            try:
                content = client.read(uri)
                snapshot[uri] = _hash_text(content)
            except Exception:
                snapshot[uri] = "<read-failed>"
    return snapshot


def _snapshot_diff(
    before: Dict[str, str],
    after: Dict[str, str],
) -> Tuple[List[str], List[str], List[str]]:
    created = sorted(set(after) - set(before))
    deleted = sorted(set(before) - set(after))
    changed = sorted(uri for uri in (set(before) & set(after)) if before[uri] != after[uri])
    return created, deleted, changed


def _search_hits(client: SyncOpenViking, query: str, limit: int) -> List[Hit]:
    """Search both user/agent memory roots and merge hits by uri."""
    merged: Dict[str, Hit] = {}
    for target_uri in ["viking://user/memories", "viking://agent/memories"]:
        try:
            result = client.find(query, target_uri=target_uri, limit=limit)
        except Exception:
            continue

        for mem in _safe_list(getattr(result, "memories", [])):
            uri = getattr(mem, "uri", "") or ""
            if not uri:
                continue
            hit = Hit(
                query=query,
                uri=uri,
                abstract=getattr(mem, "abstract", "") or "",
                content="",
                score=_safe_float(getattr(mem, "score", None)),
                target_uri=target_uri,
            )
            try:
                hit.content = client.read(uri)
            except Exception:
                hit.content = ""
            old = merged.get(uri)
            if old is None or (hit.score or -1.0) > (old.score or -1.0):
                merged[uri] = hit

    return sorted(merged.values(), key=lambda item: item.score or -1.0, reverse=True)


def _format_hits(hits: List[Hit], max_items: int = 8) -> str:
    if not hits:
        return "(no hit)"
    lines: List[str] = []
    for idx, hit in enumerate(hits[:max_items], 1):
        score_text = "n/a" if hit.score is None else f"{hit.score:.4f}"
        content_preview = hit.content.replace("\n", " ").strip()
        if len(content_preview) > 120:
            content_preview = content_preview[:117] + "..."
        lines.append(
            f"{idx}. score={score_text} | {hit.abstract} | {hit.uri}\n   content={content_preview}"
        )
    return "\n".join(lines)


def _joined_hit_texts(hits: List[Hit]) -> List[str]:
    return [f"{hit.abstract} {hit.content} {hit.uri}".lower() for hit in hits]


def _group_satisfied_anywhere(group: List[str], texts: List[str]) -> bool:
    if not group:
        return True
    options = [opt.lower() for opt in group]
    return any(any(opt in text for text in texts) for opt in options)


def _group_fully_matched_in_single_hit(group: List[str], texts: List[str]) -> bool:
    if not group:
        return False
    options = [opt.lower() for opt in group]
    for text in texts:
        if all(opt in text for opt in options):
            return True
    return False


def _evaluate_query_check(check: QueryCheck, hits: List[Hit]) -> List[str]:
    reasons: List[str] = []
    texts = _joined_hit_texts(hits)

    if len(hits) < check.min_hits:
        reasons.append(f"query '{check.query}' hit count {len(hits)} < expected {check.min_hits}")

    for group in check.require_groups:
        if not _group_satisfied_anywhere(group, texts):
            reasons.append(f"query '{check.query}' missing required group: {' | '.join(group)}")

    for group in check.forbidden_groups:
        if _group_fully_matched_in_single_hit(group, texts):
            reasons.append(f"query '{check.query}' matched forbidden group: {' + '.join(group)}")
    return reasons


def _format_records(records: List[DedupRecord]) -> str:
    if not records:
        return "(no dedup record in this case)"
    lines: List[str] = []
    for idx, rec in enumerate(records, 1):
        lines.append(
            f"{idx}. session={rec.source_session} category={rec.category} "
            f"decision={rec.decision} abstract={rec.candidate_abstract}"
        )
        for action in rec.actions:
            lines.append(f"   - action={action['decision']} uri={action['uri']}")
    return "\n".join(lines)


def _build_cases() -> List[CaseSpec]:
    return [
        CaseSpec(
            case_id="T1",
            title="Profile - Basic Identity",
            turns=[
                Turn(
                    session_key="profile",
                    user_text="我叫张明,在字节跳动做后端开发,base北京。",
                )
            ],
            checks=[
                QueryCheck(
                    query="张明是谁",
                    require_groups=[["张明"], ["字节跳动"], ["后端"], ["北京"]],
                )
            ],
        ),
        CaseSpec(
            case_id="T2",
            title="Profile - Incremental Update (Merge)",
            turns=[
                Turn(session_key="profile", user_text="我叫张明,做后端开发。"),
                Turn(session_key="profile", user_text="最近转岗了,现在做 infra。"),
            ],
            checks=[
                QueryCheck(
                    query="张明做什么工作",
                    require_groups=[["张明"], ["infra", "基础设施", "基础架构"]],
                )
            ],
            max_created_files=0,
        ),
        CaseSpec(
            case_id="T3",
            title="Preferences",
            turns=[
                Turn(
                    session_key="prefs",
                    user_text=(
                        "写代码的时候我习惯用 vim + tmux,不喜欢 IDE。"
                        "回复我的时候用中文就好,技术术语保持英文。"
                    ),
                )
            ],
            checks=[
                QueryCheck(
                    query="用户开发工具偏好",
                    require_groups=[["vim"], ["tmux"], ["ide"]],
                ),
                QueryCheck(
                    query="回复语言偏好",
                    require_groups=[["中文"], ["english", "英文"]],
                ),
            ],
        ),
        CaseSpec(
            case_id="T4",
            title="Entities - People and Projects",
            turns=[
                Turn(
                    session_key="entities",
                    user_text=(
                        "我们组的 tech lead 是 Kevin,他主推用 Go 重写网关。"
                        "目前在做 Project Atlas,是一个内部 API 网关平台。"
                    ),
                )
            ],
            checks=[
                QueryCheck(
                    query="Kevin",
                    require_groups=[["kevin"], ["tech lead", "技术负责人"], ["go"]],
                ),
                QueryCheck(
                    query="Project Atlas",
                    require_groups=[["atlas"], ["api"], ["网关", "gateway"]],
                ),
            ],
        ),
        CaseSpec(
            case_id="T5",
            title="Events - Decision Point",
            turns=[
                Turn(
                    session_key="events",
                    user_text=(
                        "今天和老板聊了,决定放弃 Python 方案,全面转 Go。"
                        "主要原因是性能瓶颈和团队技术栈统一。"
                    ),
                )
            ],
            checks=[
                QueryCheck(
                    query="为什么选 Go",
                    require_groups=[
                        ["go"],
                        ["python"],
                        ["性能", "performance"],
                        ["技术栈", "stack"],
                    ],
                )
            ],
        ),
        CaseSpec(
            case_id="T6",
            title="Cases - Problem to Solution",
            turns=[
                Turn(
                    session_key="cases",
                    user_text=(
                        "我们的 gRPC 服务偶尔出现 deadline exceeded,大概每天几十次。"
                        "查了 trace 发现是下游 Redis 偶尔 latency spike。"
                        "试了连接池调大没用,最后发现是 Redis cluster 有个慢节点。"
                        "把那个节点摘掉换了新实例就好了。"
                    ),
                )
            ],
            checks=[
                QueryCheck(
                    query="gRPC deadline exceeded 怎么解决",
                    require_groups=[
                        ["grpc"],
                        ["deadline exceeded"],
                        ["redis"],
                        ["慢节点", "slow node"],
                        ["替换", "换了", "replace", "摘掉", "摘除", "更换", "新实例"],
                    ],
                )
            ],
        ),
        CaseSpec(
            case_id="T7",
            title="Patterns - Reusable Practice",
            turns=[
                Turn(
                    session_key="patterns",
                    user_text=(
                        "我发现做 code review 有个好办法。"
                        "先看测试理解意图,再看 diff,最后跑一遍确认。"
                        "这样比直接看 diff 效率高很多,漏的也少。"
                    ),
                )
            ],
            checks=[
                QueryCheck(
                    query="code review 方法",
                    require_groups=[
                        ["code review", "代码评审"],
                        ["测试", "test"],
                        ["diff"],
                        ["运行", "确认", "run"],
                    ],
                )
            ],
        ),
        CaseSpec(
            case_id="T8",
            title="Patterns - Merge Existing Across Sessions",
            turns=[
                Turn(session_key="t8_a", user_text="部署前一定要先跑 smoke test。"),
                Turn(
                    session_key="t8_b",
                    user_text="部署前除了 smoke test,还要检查 config diff。",
                ),
            ],
            checks=[
                QueryCheck(
                    query="部署前检查",
                    require_groups=[["smoke"], ["config diff", "配置", "config"]],
                )
            ],
            expect_merge_action=True,
            expect_none_decision=True,
            expect_merge_from_session_key="t8_b",
        ),
        CaseSpec(
            case_id="T9",
            title="Complex Multi-Round - Mixed Categories",
            turns=[
                Turn(
                    session_key="mixed",
                    user_text=(
                        "我在做一个 RAG 系统的 chunk 策略优化。"
                        "现在用的是固定 512 token 切分,效果不好。"
                        "我试了 semantic chunking,用 embedding similarity 找分割点。"
                        "同事 Lisa 建议试 late chunking,她在另一个项目上效果不错。"
                        "最后我们决定用 semantic chunking + overlap 50 token 的方案。"
                        "关键 insight 是:chunk boundary 要对齐语义边界,不能硬切。"
                        "以后做 RAG 都应该先评估 chunk 质量再调 retrieval。"
                    ),
                )
            ],
            checks=[
                QueryCheck(
                    query="RAG chunking 怎么做",
                    require_groups=[
                        [
                            "semantic",
                            "semantic chunking",
                            "语义",
                            "语义切分",
                            "embedding",
                            "相似度",
                        ],
                        ["overlap", "50"],
                        ["512"],
                    ],
                ),
                QueryCheck(
                    query="Lisa",
                    require_groups=[["lisa"], ["late chunking", "chunking"]],
                ),
                QueryCheck(
                    query="chunk 优化经验",
                    require_groups=[
                        ["chunk"],
                        ["质量", "quality", "边界", "semantic"],
                        ["retrieval", "检索", "调优", "优化流程"],
                    ],
                ),
            ],
            expected_categories=set(),
        ),
        CaseSpec(
            case_id="T10",
            title="Noise Resistance - Should Not Store",
            turns=[
                Turn(
                    session_key="noise",
                    user_text="今天天气不错。帮我写个 hello world。谢谢,挺好的。",
                )
            ],
            checks=[
                QueryCheck(
                    query="天气",
                    forbidden_groups=[["天气"]],
                    min_hits=0,
                ),
                QueryCheck(
                    query="hello world",
                    forbidden_groups=[["hello", "world"]],
                    min_hits=0,
                ),
            ],
        ),
    ]


def _get_or_create_session(
    client: SyncOpenViking,
    cache: Dict[str, Any],
    key: str,
) -> Any:
    sess = cache.get(key)
    if sess is not None:
        return sess
    session_id = client.create_session()["session_id"]
    sess = client.session(session_id)
    cache[key] = sess
    cache[f"__id__{key}"] = session_id
    return sess


def _session_id(cache: Dict[str, Any], key: str) -> str:
    return str(cache.get(f"__id__{key}", ""))


def _evaluate_case(
    case: CaseSpec,
    hits_by_query: Dict[str, List[Hit]],
    records: List[DedupRecord],
    created: List[str],
    deleted: List[str],
    changed: List[str],
    session_cache: Dict[str, Any],
) -> List[str]:
    reasons: List[str] = []

    for check in case.checks:
        reasons.extend(_evaluate_query_check(check, hits_by_query.get(check.query, [])))

    if case.expected_categories:
        observed_categories = {record.category for record in records}
        missing_categories = sorted(case.expected_categories - observed_categories)
        if missing_categories:
            reasons.append(
                "missing expected categories in dedup records: " + ", ".join(missing_categories)
            )

    if case.expect_merge_action and not any(
        action.get("decision") == "merge" for record in records for action in record.actions
    ):
        reasons.append("expected merge action, but none observed")

    if case.expect_delete_action and not any(
        action.get("decision") == "delete" for record in records for action in record.actions
    ):
        reasons.append("expected delete action, but none observed")

    if case.expect_skip_decision and not any(record.decision == "skip" for record in records):
        reasons.append("expected decision=skip, but not observed")

    if case.expect_none_decision and not any(record.decision == "none" for record in records):
        reasons.append("expected decision=none, but not observed")

    if case.expect_merge_from_session_key:
        expected_sid = _session_id(session_cache, case.expect_merge_from_session_key)
        if expected_sid:
            merge_from_expected = any(
                record.source_session == expected_sid
                and any(action.get("decision") == "merge" for action in record.actions)
                for record in records
            )
            if not merge_from_expected:
                reasons.append(
                    "expected merge from session "
                    + case.expect_merge_from_session_key
                    + ", but not observed"
                )

    if case.expect_no_new_memory and (created or deleted or changed):
        reasons.append(
            "expected no memory mutation, but snapshot changed "
            + f"(created={len(created)} deleted={len(deleted)} changed={len(changed)})"
        )

    if case.max_created_files is not None and len(created) > case.max_created_files:
        reasons.append(f"created file count {len(created)} exceeds max {case.max_created_files}")

    return reasons


def _decision_coverage(records: List[DedupRecord]) -> Dict[str, bool]:
    return {
        "merge_action": any(
            action.get("decision") == "merge" for record in records for action in record.actions
        ),
        "delete_action": any(
            action.get("decision") == "delete" for record in records for action in record.actions
        ),
        "decision_none": any(record.decision == "none" for record in records),
        "decision_skip": any(record.decision == "skip" for record in records),
    }


def main() -> int:
    parser = argparse.ArgumentParser(description="OpenViking memory skill T1-T10 test demo")
    parser.add_argument(
        "--path",
        default="./ov_data_memory_test_demo",
        help="Demo storage path. This script clears it at startup.",
    )
    parser.add_argument(
        "--wait-timeout",
        type=float,
        default=60.0,
        help="Queue wait timeout in seconds.",
    )
    parser.add_argument("--limit", type=int, default=8, help="Top-k retrieval limit per query.")
    parser.add_argument(
        "--json-report",
        default="",
        help="Optional output path for a JSON report.",
    )
    parser.add_argument(
        "--verbose",
        action="store_true",
        default=True,
        help="Print per-case trace logs.",
    )
    args = parser.parse_args()

    if not os.environ.get("OPENVIKING_CONFIG_FILE"):
        repo_root = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
        cfg = os.path.join(repo_root, "ov.conf")
        if os.path.exists(cfg):
            os.environ["OPENVIKING_CONFIG_FILE"] = cfg

    data_path = Path(args.path)
    if data_path.exists():
        shutil.rmtree(data_path)
    data_path.mkdir(parents=True, exist_ok=True)

    recorder = DedupRecorder()
    recorder.install()

    client = SyncOpenViking(path=str(data_path))
    client.initialize()

    cases = _build_cases()
    session_cache: Dict[str, Any] = {}
    case_results: List[CaseResult] = []

    try:
        client.is_healthy()

        for case in cases:
            before_snapshot = _collect_memory_snapshot(client)
            record_start = len(recorder.records)
            commit_results: List[Dict[str, Any]] = []

            for turn in case.turns:
                session = _get_or_create_session(client, session_cache, turn.session_key)
                recorder.current_round = f"{case.case_id}:{case.title}"
                session.add_message("user", parts=[TextPart(text=turn.user_text)])
                session.add_message("assistant", parts=[TextPart(text=turn.assistant_text)])
                commit_results.append(session.commit())
                try:
                    client.wait_processed(timeout=args.wait_timeout)
                except Exception:
                    pass

            after_snapshot = _collect_memory_snapshot(client)
            created, deleted, changed = _snapshot_diff(before_snapshot, after_snapshot)
            case_records = recorder.records[record_start:]

            hits_by_query: Dict[str, List[Hit]] = {}
            for check in case.checks:
                hits_by_query[check.query] = _search_hits(client, check.query, args.limit)

            reasons = _evaluate_case(
                case,
                hits_by_query,
                case_records,
                created,
                deleted,
                changed,
                session_cache,
            )
            passed = len(reasons) == 0
            case_results.append(
                CaseResult(
                    case_id=case.case_id,
                    title=case.title,
                    passed=passed,
                    reasons=reasons,
                    created=created,
                    deleted=deleted,
                    changed=changed,
                )
            )

            if args.verbose:
                session_lines = [
                    f"session_key={turn.session_key} session_id={_session_id(session_cache, turn.session_key)}"
                    for turn in case.turns
                ]
                _print_section(
                    f"{case.case_id} {case.title} - commits",
                    body="\n".join(session_lines + [f"commit={item}" for item in commit_results]),
                )
                _print_section(f"{case.case_id} dedup trace", body=_format_records(case_records))
                _print_section(
                    f"{case.case_id} memory diff",
                    body="\n".join(
                        [f"created={len(created)} deleted={len(deleted)} changed={len(changed)}"]
                        + [f"+ {uri}" for uri in created]
                        + [f"- {uri}" for uri in deleted]
                        + [f"~ {uri}" for uri in changed]
                    ),
                )
                for query, hits in hits_by_query.items():
                    _print_section(f"{case.case_id} find: {query}", body=_format_hits(hits))
                _print_section(
                    f"{case.case_id} result: {'PASS' if passed else 'FAIL'}",
                    body=(
                        "All checks passed"
                        if passed
                        else "\n".join(f"- {item}" for item in reasons)
                    ),
                )

        passed_count = sum(1 for item in case_results if item.passed)
        failed_count = len(case_results) - passed_count
        coverage = _decision_coverage(recorder.records)

        summary_lines = [
            f"Total: {len(case_results)}",
            f"Passed: {passed_count}",
            f"Failed: {failed_count}",
            "",
            "Decision coverage:",
            f"- merge_action: {'YES' if coverage['merge_action'] else 'NO'}",
            f"- delete_action: {'YES' if coverage['delete_action'] else 'NO'}",
            f"- decision_none: {'YES' if coverage['decision_none'] else 'NO'}",
            f"- decision_skip: {'YES' if coverage['decision_skip'] else 'NO'}",
        ]

        failed_cases = [item for item in case_results if not item.passed]
        if failed_cases:
            summary_lines.append("")
            summary_lines.append("Failed cases:")
            for item in failed_cases:
                summary_lines.append(f"- {item.case_id} {item.title}")
                for reason in item.reasons:
                    summary_lines.append(f"  * {reason}")

        _print_section("Final Report", body="\n".join(summary_lines))

        if args.json_report:
            report = {
                "summary": {
                    "total": len(case_results),
                    "passed": passed_count,
                    "failed": failed_count,
                    "coverage": coverage,
                },
                "cases": [asdict(item) for item in case_results],
            }
            report_path = Path(args.json_report)
            report_path.parent.mkdir(parents=True, exist_ok=True)
            report_path.write_text(
                json.dumps(report, ensure_ascii=False, indent=2),
                encoding="utf-8",
            )
            _print_section("JSON report", body=str(report_path))

        return 0 if failed_count == 0 else 1
    finally:
        recorder.uninstall()
        try:
            client.close()
        except Exception:
            pass


if __name__ == "__main__":
    raise SystemExit(main())

@github-actions
Copy link
Copy Markdown

PR Code Suggestions ✨

Explore these optional code suggestions:

CategorySuggestion                                                                                                                                    Impact
General
Use OrderedDict for LRU request stats tracking

Use an OrderedDict to track request stats order for true LRU eviction and eliminate
duplicates. This prevents the order list from being flooded with duplicate entries,
which could cause premature eviction of other telemetry IDs.

openviking/storage/collection_schemas.py [143-184]

+from collections import OrderedDict
+
 _request_stats_lock = threading.Lock()
 _request_stats_by_telemetry_id: Dict[str, RequestQueueStats] = {}
-_request_stats_order: List[str] = []
+_request_stats_order: OrderedDict[str, None] = OrderedDict()
 _max_cached_stats = 1024
 
 @classmethod
 def _merge_request_stats(
     cls, telemetry_id: str, processed: int = 0, error_count: int = 0
 ) -> None:
     if not telemetry_id:
         return
     with cls._request_stats_lock:
         stats = cls._request_stats_by_telemetry_id.setdefault(telemetry_id, RequestQueueStats())
         stats.processed += processed
         stats.error_count += error_count
-        cls._request_stats_order.append(telemetry_id)
+        # Update LRU order
+        if telemetry_id in cls._request_stats_order:
+            cls._request_stats_order.move_to_end(telemetry_id)
+        else:
+            cls._request_stats_order[telemetry_id] = None
+        # Evict oldest entry if needed
         if len(cls._request_stats_order) > cls._max_cached_stats:
-            old_telemetry_id = cls._request_stats_order.pop(0)
-            if (
-                old_telemetry_id != telemetry_id
-                and old_telemetry_id in cls._request_stats_by_telemetry_id
-            ):
-                cls._request_stats_by_telemetry_id.pop(old_telemetry_id, None)
+            old_telemetry_id, _ = cls._request_stats_order.popitem(last=False)
+            cls._request_stats_by_telemetry_id.pop(old_telemetry_id, None)
Suggestion importance[1-10]: 4

__

Why: The suggestion improves the request stats cache's LRU eviction logic by using OrderedDict, which eliminates duplicate entries in the order list and prevents premature eviction of other telemetry IDs. This is a valid maintainability/efficiency improvement but not critical for functionality.

Low

refactor: replace operation trace with telemetry

fix telemetry demo skill ingestion

simplify telemetry summary metric keys

rename remaining trace telemetry artifacts

feat: support configurable telemetry payloads

docs: rewrite operation telemetry design in chinese

fix: reject telemetry for async session commit

refactor: isolate telemetry orchestration

refactor: remove telemetry from find payloads

refactor: remove telemetry event payloads

fix(trace): keep only telemetry-related changes

fix(trace): remove top-level usage from telemetry responses

feat(console): default telemetry on proxied operations
@zhoujh01 zhoujh01 force-pushed the feat/request-trace-metrics branch from 58f3d26 to cc22716 Compare March 15, 2026 14:28
@MaojiaSheng MaojiaSheng merged commit b280b56 into main Mar 15, 2026
6 checks passed
@MaojiaSheng MaojiaSheng deleted the feat/request-trace-metrics branch March 15, 2026 14:44
@github-project-automation github-project-automation bot moved this from Backlog to Done in OpenViking project Mar 15, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

Status: Done

Development

Successfully merging this pull request may close these issues.

2 participants