Skip to content

fix(daemon): restore replay history after SSE ring eviction#4678

Closed
ytahdn wants to merge 1 commit into
QwenLM:daemon_mode_b_mainfrom
chiga0:feat/daemon-event-resync-recovery
Closed

fix(daemon): restore replay history after SSE ring eviction#4678
ytahdn wants to merge 1 commit into
QwenLM:daemon_mode_b_mainfrom
chiga0:feat/daemon-event-resync-recovery

Conversation

@ytahdn

@ytahdn ytahdn commented Jun 1, 2026

Copy link
Copy Markdown
Collaborator

概要

  • 变更内容:将完整会话历史恢复与有界 SSE ring buffer 解耦。loadSession 返回 replayEventslastEventId 用于重建页面历史;SSE ring 只负责短暂断线后的增量追赶。
  • 变更原因:长 daemon session 的事件数可能超过 SSE ring 容量。页面刷新或重连后,如果客户端从 Last-Event-ID: 0 订阅 SSE,服务端会因为早期事件已被 ring 淘汰而返回 ring_evicted,导致页面无法恢复完整 transcript。
  • 重点关注:请重点确认长会话刷新后能通过 HTTP replay log 恢复完整历史,resumeSession 不再返回完整 replay payload,以及 ring_evicted 后 WebUI 会重新走 load 恢复状态。

验证

  • 执行命令:
    cd packages/acp-bridge && npx vitest run src/eventBus.test.ts src/bridge.test.ts
    cd packages/sdk-typescript && npx vitest run test/unit/DaemonSessionClient.test.ts test/unit/daemonUi.test.ts
    cd packages/webui && npx vitest run src/daemon/session/DaemonSessionProvider.test.tsx src/daemon/session/transcriptToMessages.test.ts
    npm run build
  • 使用输入:无,当前改动属于 daemon/session 恢复链路,主要通过单元测试覆盖。
  • 预期结果:即使长会话历史已经超过 SSE ring 容量,页面仍能通过 loadSession 返回的 replay events 恢复完整历史,并从返回的 watermark 后继续接收实时增量。
  • 实际结果:相关测试全部通过,npm run build 成功完成。构建过程中仅出现已有的 VS Code companion lint warning 和 Browserslist 数据过期提示,没有错误。
  • Reviewer 最快验证方式:运行上面的测试命令,并重点查看 replayEventslastEventIdresumering_evicted 恢复路径。
  • 结果摘要:
    • packages/acp-bridge:254 个测试通过。
    • packages/sdk-typescript:220 个测试通过。
    • packages/webui:99 个测试通过。
    • 全量 npm run build 成功。

范围 / 风险

  • 主要风险或取舍:loadSession 现在会携带用于历史重建的 replay events,超长会话的 load 响应会更大;resumeSession 已避免返回这部分完整 replay payload。
  • 未覆盖内容:未录制浏览器手工验证视频;本次验证集中在 daemon bridge、SDK 和 WebUI session 相关测试。
  • 兼容性:新增字段是 optional,新 SDK 连接旧 daemon 时会降级为旧行为;新 SDK 连接新 daemon 时会使用 HTTP replay log + watermark,避免依赖有界 SSE ring 恢复完整历史。

测试矩阵

🍏 🪟 🐧
npm run ⚠️ ⚠️
npx ⚠️ ⚠️
Docker N/A N/A N/A
Podman N/A N/A N/A
Seatbelt N/A N/A N/A

测试矩阵说明:

  • 已在 macOS 本地运行相关 npx vitest 命令和 npm run build
  • Windows / Linux 未在本地运行。

关联 Issue / Bug

  • N/A

@github-actions

github-actions Bot commented Jun 1, 2026

Copy link
Copy Markdown
Contributor

📋 Review Summary

This PR implements a critical fix for daemon session recovery by separating full session replay recovery from the bounded SSE ring buffer. The changes introduce a persistent replay log (via ReplayStore) that allows complete transcript reconstruction even after the SSE ring has evicted older events. The implementation spans the acp-bridge, SDK, and WebUI layers with comprehensive test coverage.

🔍 General Feedback

  • Architecture: The separation of concerns between the bounded SSE ring (for short reconnect gaps) and the replay log (for full history) is well-designed and addresses the core risk of long sessions exceeding ring capacity.
  • Test Coverage: Excellent test additions across eventBus.test.ts, bridge.test.ts, DaemonSessionProvider.test.tsx, and DaemonSessionClient.test.ts covering replay seeding, ring eviction recovery, and edge cases.
  • Backward Compatibility: New fields (lastEventId, replayEvents) are optional, allowing older daemon/SDK combinations to fall back gracefully.
  • Code Quality: Generally solid implementation with good error handling and defensive coding patterns. The transcriptToMessages.ts refactoring improves subagent tracking robustness.

🎯 Specific Feedback

🔴 Critical

  • File: packages/acp-bridge/src/replayStore.ts:103-117 - The snapshot() method reads the entire JSONL file into memory on every call. For very long sessions, this could cause memory pressure. Consider streaming parsers or pagination for large files.

  • File: packages/webui/src/daemon/session/DaemonSessionProvider.tsx:399-445 - The replay seeding loop uses WeakSet for tracking seeded sessions, but WeakSet doesn't provide iteration capabilities and relies on GC for cleanup. If replaySeededSessionsRef.current grows unbounded during a long session with multiple reconnects, memory could accumulate. Consider using a Set with explicit cleanup on session disconnect.

  • File: packages/acp-bridge/src/bridge.ts:1700-1708 - The createSessionEventBus creates a FileReplayStore with deleteOnClose: true, but the cleanup at line 4542-4545 only runs when ownsReplayStoreRoot is true. If a custom replayStoreDir is provided, stale files could accumulate. The deleteSessionReplayCache method (line 2820) handles explicit deletion, but there's no guarantee it's called for all session termination paths.

🟡 High

  • File: packages/acp-bridge/src/replayStore.ts:158-188 - The flushWriteQueue uses synchronous stream writes with callback-based completion tracking. If the stream encounters backpressure repeatedly, the pendingEvents array could grow unbounded until drain occurs. The compaction logic (lines 221-237) helps but only triggers after 1024 events. Consider adding a secondary backpressure limit.

  • File: packages/webui/src/daemon/session/transcriptToMessages.ts:376-388 - The getFallbackActiveSubAgent function has complex logic for determining when to use positional fallback. The comment explains the rationale well, but the condition stack.length === 1 followed by checking top?.parentToolCallId could be simplified with clearer early returns to reduce cognitive load.

  • File: packages/sdk-typescript/src/daemon/DaemonSessionClient.ts:170-177 - The resume method discards replayEvents with _discarded but still receives them from the server. This wastes bandwidth for resume operations. Consider having the server omit replayEvents entirely for resume responses when the client signals it doesn't need them.

🟢 Medium

  • File: packages/acp-bridge/src/replayStore.ts:260-274 - The appendUniqueEvents function deduplicates by event ID, but this assumes all events have numeric IDs. Events without IDs (or with string IDs) would bypass deduplication. The type filter at line 267 helps, but consider adding a warning or assertion if duplicate non-numeric IDs are detected.

  • File: packages/webui/src/daemon/session/transcriptToMessages.ts:448-457 - The mergeToolCall function uses nullish coalescing (??) correctly, but the original code used || which would treat empty strings as falsy. This is an improvement, but verify that empty strings are indeed valid values that shouldn't be overwritten. For example, if source.content is "" but target.content has value, the empty string wins.

  • File: packages/acp-bridge/src/bridge.ts:627-629 - The workspacePathHash function uses SHA256 but only takes the first 16 hex characters. This is reasonable for directory naming, but consider documenting the collision risk (minimal but non-zero for very large numbers of workspaces).

🔵 Low

  • File: packages/acp-bridge/src/replayStore.ts:1 - Missing JSDoc header comment style that other files in the project have (license block is present, but no file-level documentation explaining purpose).

  • File: packages/webui/src/daemon/session/types.ts:88-100 - The DaemonModelInfo interface adds several optional fields (baseModelId, authType, modalities, isRuntime) that appear unrelated to the replay/recovery fix. Verify these belong in this PR or should be split.

  • File: packages/acp-bridge/src/bridge.ts:693 - The constant MCP_RESTART_TIMEOUT_MS has an extensive comment explaining the rationale (which is good), but the comment references specific PR numbers and reviewer names that may become stale. Consider linking to a permanent issue or design doc instead.

  • File: packages/webui/src/daemon/session/DaemonSessionProvider.tsx:437 - The replay seeding loop uses delay(0, abort.signal) every 100 events for responsiveness, but this is a magic number. Consider extracting to a constant like REPLAY_SEED_YIELD_INTERVAL = 100.

✅ Highlights

  • Excellent Test Coverage: The new tests in DaemonSessionProvider.test.tsx thoroughly cover replay seeding scenarios including edge cases like "keeps replay-seeded assistant streaming when no terminal replay event exists" and "recovers from ring_evicted followed by 410 via full reload."

  • Clean Separation of Concerns: The ReplayStore interface with InMemoryReplayStore and FileReplayStore implementations provides good abstraction. The fallback to in-memory on file write failures (lines 204-220 in replayStore.ts) ensures robustness.

  • Thoughtful Error Handling: The malformed JSONL handling in replayStore.ts:snapshot() (lines 103-117) gracefully skips bad lines without failing the entire session load, with appropriate stderr logging.

  • Improved Subagent Tracking: The refactoring in transcriptToMessages.ts with findSubAgentIndex, topLevelToolsByCallId map, and better permission handling significantly improves the robustness of subagent state management.

  • Proper Cleanup Paths: The cleanup of replay stores on session close (line 4542-4545 in bridge.ts) and batch deletion (lines 1895-1901 in server.ts) shows attention to resource management.

@ytahdn ytahdn force-pushed the feat/daemon-event-resync-recovery branch from 9c4141a to 67ffbbe Compare June 1, 2026 13:07
@ytahdn ytahdn force-pushed the feat/daemon-event-resync-recovery branch from 67ffbbe to 7e44574 Compare June 1, 2026 13:15

@doudouOUC doudouOUC left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

整体方向是对的:把完整历史恢复从有界 SSE ring 里拆出来,改成 loadSession 通过 HTTP replay log + watermark 返回,是解决长 daemon session 刷新后 ring eviction 的正确设计。但当前实现里 HTTP replay 与后续 SSE 订阅的语义还没有完全对齐,下面三个问题会导致 transcript 错误恢复,建议修完再合并。

[P1] HTTP replay seed 后,SSE 的 replay_complete 会把仍在流式中的 assistant 错误标记为完成。

DaemonSessionClient.load() 会把 cursor 初始化到服务端返回的 lastEventId,因此 WebUI 先用 activeSession.replayEvents seed 出 transcript,随后一订阅 SSE 就会收到 EventBus.subscribe() 发送的 replay_complete sentinel(即使 replayedCount: 0)。当前 DaemonSessionProvider 在收到 session.replay_complete 时,只要没有 active prompt,就会 dispatch assistant.done。这会把 replay tail 中没有 turn_complete 的仍在生成中的 assistant block 立即标成 finished。

相关位置:packages/acp-bridge/src/eventBus.ts:495packages/webui/src/daemon/session/DaemonSessionProvider.tsx:497。现有新增测试的 mock stream 没模拟真实 EventBus 的 replay_complete,所以没覆盖到。建议区分“HTTP snapshot seed 后的 SSE catch-up sentinel”和“旧语义的 replay complete”,或者至少只在确实 replay 了 terminal/catch-up completion 时结束 assistant。

[P1] HTTP replay seed 不能复用 live echo 的 suppressOwnUserEcho 逻辑,否则同 tab 刷新会丢自己的 user message。

WebUI 的 clientId 存在 sessionStorage,页面刷新后通常仍是同一个 clientId。live session replay log 里 bridge echo 的 user_message_chunk 带原来的 originatorClientId。当前 replay seed 调 normalizeDaemonEvent(... suppressOwnUserEcho: true),normalizer 在 originatorClientId === clientId 时会丢掉这条 user message。HTTP replay seed 是从空 store 重建完整历史,不应该沿用 live SSE 去重语义。

相关位置:packages/webui/src/daemon/session/DaemonSessionProvider.tsx:424packages/sdk-typescript/src/daemon/ui/normalizer.ts:387。建议 replay seed 强制 suppressOwnUserEcho: false,并补一个“同 clientId load/refresh 恢复自己的 prompt”的测试。

[P1/P2] replayEventslastEventId 不是一致快照,可能导致重复应用事件。

replayFieldsFor() 先读 entry.events.lastEventId,再 await snapshotReplayLog()。如果这段 await 期间有新事件发布,snapshot 可能包含比返回的 lastEventId 更大的事件;客户端先 seed 这些事件,再用旧 watermark 订阅 SSE,ring 会再 replay 一遍,文本/工具状态可能重复。

相关位置:packages/acp-bridge/src/bridge.ts:1771。建议让 replay snapshot 和 watermark 同源,例如返回 replay 数组后用其中最大 event id 作为 cursor,或者在 EventBus 上提供一个原子 snapshotReplayWithWatermark()。需要补并发 publish 的测试,证明不会重复/不丢。

@doudouOUC doudouOUC left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

补充 inline comments,方便逐行处理。总评里的结论不变:方向对,但这几个恢复语义问题需要先修。

await delay(0, abort.signal);
}
}
setConnection((c) => ({ ...c, catchingUp: undefined }));

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HTTP replay seed 之后,后续 SSE 订阅仍会收到 EventBus.subscribe()replay_complete sentinel(包括 replayedCount: 0)。当前文件下面的 session.replay_complete 分支会在没有 active prompt 时 dispatch assistant.done,这会把 replay tail 中仍在 streaming、但尚未出现 turn_complete 的 assistant block 错误标成完成。建议让 replay_complete 只表示 transport catch-up drained、只清 catchingUp,turn 结束仍只由 turn_complete / turn_error / cancel / session died 这类业务事件驱动,并补真实 EventBus sentinel 的测试。

}
const normalized = normalizeDaemonEvent(replayEvent, {
clientId: activeSession.clientId,
suppressOwnUserEcho: eventOptions.suppressOwnUserEcho,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HTTP replay seed 是从空 transcript 重建完整历史,不能复用 live SSE 的 own-echo suppression。WebUI clientId 存在 sessionStorage,同一个 tab 刷新后 clientId 通常不变;replay log 里的 bridge-echo user_message_chunk 会带原 originatorClientId,这里传 suppressOwnUserEcho: true 会把自己的历史 user message 丢掉。建议 replay seed 强制 suppressOwnUserEcho: false,live SSE 仍保留当前去重逻辑,并补同 clientId refresh 的测试。

const replayFieldsFor = async (
entry: SessionEntry,
): Promise<Pick<BridgeRestoredSession, 'lastEventId' | 'replayEvents'>> => {
const lastEventId = entry.events.lastEventId;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里的 watermark 和 replayEvents 不是同一个快照:先同步读取 lastEventId,再 await snapshotReplayLog()。如果 await 期间有新事件发布,返回的 replayEvents 可能已经包含比 lastEventId 更大的事件;客户端 seed 后再用旧 cursor 订阅 SSE,会从 ring 里再次 replay 这些事件,导致文本/工具状态重复。建议把 snapshot 和 watermark 做成同源结果,例如从返回的 replayEvents 最大 numeric id 计算 cursor,或在 EventBus 上提供一个原子 snapshotReplayWithWatermark(),并补并发 publish 测试。

@doudouOUC

Copy link
Copy Markdown
Collaborator

补充一个非阻断的长会话扩展建议,和上面几个 correctness 问题分开看:

当前 PR 先用 HTTP loadSession 返回完整 replay log 是合理的第一步,因为它能把“完整历史恢复”从有界 SSE ring 里解耦出来,先保证长会话刷新后不再依赖 ring 容量。但如果后续发现超长 session 的 loadSession 响应过大、浏览器 seed transcript 卡主线程,建议不要继续扩大 SSE ring,而是把 replay 恢复演进成“首屏快照 + 按需历史”的模型:

  • loadSession 默认返回最近 N 条/最近 N 个 transcript blocks,以及一个稳定 cursor/watermark,用于首屏快速恢复。
  • 旧历史通过分页或流式 replay API 按需加载,例如用户向上滚动时再请求更早一页。
  • 服务端可以返回已经 coalesced 的 UI transcript snapshot,而不是原始 daemon event 全量,减少连续 chunk、debug/raw event、长 tool output 对 payload 的放大。
  • SSE 仍只负责 snapshot cursor 之后的实时增量,不承担完整历史恢复。

这样短期路径保持简单正确:HTTP replay 完整恢复,SSE 追增量;等长会话规模真的压到 load 性能时,再把 HTTP replay 从“原始事件全量”升级为“压缩 transcript snapshot + 分页/流式历史”。

);
pendingSessionLoadRef.current = undefined;
const pendingLoad = pendingSessionLoadRef.current;
window.setTimeout(() => {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Critical] setTimeout(0) 延迟 reject 在生产环境中会错误地中止 新 session 的 load promise。

时序分析:用户切换 session 时,startSessionSwitch 同步将 pendingSessionLoadRef.current 设置为新 session 的 load 对象,然后 React 批量状态更新触发重渲染。旧 effect 的 cleanup 捕获 pendingLoad = pendingSessionLoadRef.current(此时已是新 session 的对象),安排 setTimeout(0)。新 effect 的 run() 开始执行并 await client.capabilities()(HTTP 调用,需要 >1ms)。

setTimeout(0) 是宏任务,在微任务队列清空后的下一个事件循环迭代触发。此时 HTTP 响应尚未返回,pendingSessionLoadRef.current === pendingLoad 为 true(新 effect 尚未 resolve),guard 条件不生效,新 session 的 load promise 被错误 reject 为 AbortError

为什么测试通过:mock 的 DaemonSessionClient 在微任务时间内解析(快于 setTimeout 宏任务),所以 pendingLoad.resolve()setTimeout 触发前就已执行,pendingSessionLoadRef.current 已被清空,guard 条件生效跳过 reject。

Suggested change
window.setTimeout(() => {
if (pendingSessionLoadRef.current) {
const pendingLoad = pendingSessionLoadRef.current;
// Check if this pending load belongs to the session being torn down
// (not a newer session switch that already superseded it).
// Use sessionId matching instead of setTimeout to avoid the
// macrotask/microtask race that causes false rejections in production.
if (pendingLoad.sessionId === restoreSessionId) {
clearTimeout(pendingLoad.timeout);
pendingSessionLoadRef.current = undefined;
pendingLoad.reject(new DOMException('Aborted', 'AbortError'));
}
}

— qwen3.7-max via Qwen Code /review

}

const restoreEvents = new EventBus(eventRingSize);
const restoreEvents = createSessionEventBus(req.sessionId);

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Critical] restoreEvents 现在通过 createSessionEventBus(req.sessionId) 创建,其 FileReplayStore 的文件路径 fileReplayStorePath(replayStoreRoot, sessionId) 只按 sessionId 区分,且 deleteOnClose: true。在下方 racedEntry 分支(约 1965-1981 行)里,当 byId.get(req.sessionId) 命中一个并发已注册的存活会话时,会先 restoreEvents.close()(1967 行)——它删掉的正是与存活的 racedEntry.events 完全相同的那个 replay 文件——紧接着 1980 行又用 replayFieldsFor(racedEntry) 去读这个刚被删的文件。

后果:① 本次返回的 replayEvents 只剩内存残余;② 更严重的是 racedEntry.events 仍持有已打开的 append 写流,文件被 unlink 后它继续写入“孤儿 inode”,而 snapshot()fs.access(filePath) 之后一直 ENOENT —— 该会话此后所有 /load 都只能拿到内存尾巴,磁盘历史被静默永久破坏,正好抵消本 PR 要解决的“ring 淘汰后恢复完整历史”。

这是本 PR 引入的回归:改动前两边都是纯内存 new EventBus(eventRingSize),loser 的 close 不影响 winner。触发概率较低(需 doSpawn 的 newSession 与并发 load 撞上同一个 sessionId),但一旦命中即静默不可恢复的数据丢失,且该 race 分支当前无测试覆盖。

建议:race-loser 分支只关闭 loser 的 bus 而不删共享文件(给 EventBus.close()/FileReplayStore 增加“close 但不 delete”模式,在 1967 处使用);或让 replay 文件名带上每个 bus 自己的唯一后缀,不要仅以 sessionId 命名。

— claude-opus-4-8 via Claude Code /qreview


private failWrites(error: unknown): void {
if (this.failed || this.closed) return;
this.failed = true;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion] this.failed 一旦置 true永不复位(全文件没有 failed = false)。此后每次 append() 都把事件推进 fallbackEvents(97/220 行),而 fallbackEvents 没有任何容量上限,会在整个 session 生命周期里无界增长——恰好把这个 PR 想消灭的“无界内存 replay 数组”又带了回来,且只在首次失败时打印一行 warn。进程退出时这些 fallback 事件(以及本应落盘的历史)只在内存里,全部丢失。

触发需要一次写失败(ENOSPC / EMFILE / 临时目录被清理等降级场景),但触发后是永久退化,在繁忙 daemon 上表现为“无声的内存增长 + 重启后 /load 拿到空 transcript”,凌晨排障时极难定位。

建议:① 给 fallbackEvents 设上限(超过后丢弃最旧并打印一次“replay history truncated”),避免拖垮 daemon;② 可考虑带退避地周期性重试 ensureStream()(重置 failed),让瞬时错误自愈。

— claude-opus-4-8 via Claude Code /qreview

includeRawEvent: eventOptions.includeRawEvent,
});
if (normalized.length > 0) {
store.dispatch(normalized);

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion] seed 循环对每个 replay 事件单独调用一次 store.dispatch(normalized),每次 dispatch 都跑一遍 reducer,做一次 Object.freeze(blocks)(O(B))和一次 takeBlocksOwnership[...state.blocks] 拷贝(O(B));而 webui 把 maxBlocks 设成 200_000(本文件 ~115 行),B 实际不会被裁剪小,于是整段 seed 是 O(N·B)。实测 N=10000 时主线程 ≈ 930ms,改成单次批量 dispatch ≈ 0.27ms。

雪上加霜:433-435 行“每 100 个事件让一次”用的是 delay(0),而 delay(0) 返回 Promise.resolve()(timing.ts:54),是 microtask,并不会把主线程让给浏览器渲染/输入(那需要 setTimeout/MessageChannel/scheduler.yield 这类 macrotask)。所以整段 seed 在一个 macrotask 内跑完,页面全程冻结。

这正好打在本 PR 的目标场景上——长会话 ring 淘汰后恢复时 transcript 最大、seed 最慢,且每次 ring_evicted 重载都会重新 seed 一遍。

建议:seed 改成单次(或每 500-1000 个)批量 store.dispatch(allUiEvents)(reducer 每次只 freeze/拷贝一次,与长度无关),遇到 turn_complete/turn_error 时先 flush 累积批次;并把周期性让出改成真正的 macrotask(await new Promise(r => setTimeout(r, 0)),受 abort 信号保护)。

— claude-opus-4-8 via Claude Code /qreview

const events: BridgeEvent[] = [];
try {
await fs.promises.access(this.filePath);
const raw = await fs.promises.readFile(this.filePath, 'utf8');

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion] replay JSONL 文件在 server 端没有任何大小/条数上限或轮转:SSE ring 有界(默认 8000),但该文件按设计是“无界权威源”,整个 session 期间每事件追加一行。而 snapshot()/load 时用 fs.promises.readFile(filePath, 'utf8')整个文件一次性读进内存再逐行 JSON.parse 成数组,最后整包塞进 /load 的 HTTP 响应(bridge.ts:1778)。

后果:长 / 高频会话(大量工具输出、思考流)会把 os.tmpdir() 撑满;且每次 /load/attach 都把整文件读进 daemon RSS 并序列化成可能数百 MB 的响应,每个 attach 的客户端都重复付出。这是 server 端的磁盘 + 内存维度,与已讨论的“浏览器侧 payload 过大”是不同的面。

建议:给 replay store 设字节/条数上限并丢弃/压缩最旧内容(SSE ring 本就接受“很旧的事件掉出快速 replay”,HTTP replay 可采用同样的有界策略);snapshot/响应改为按行流式而非整文件缓冲;至少先 fs.stat,超阈值就拒绝整读。

— claude-opus-4-8 via Claude Code /qreview

);
}
const ownsReplayStoreRoot = opts.replayStoreDir === undefined;
const replayStoreRoot =

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion] replayStoreRoot 形如 os.tmpdir()/qwen-code-daemon-replay/<sha256(boundWorkspace)[:16]>/<pid>-<uuid>,只有最后一段 <pid>-<uuid> 不可预测,父目录 qwen-code-daemon-replay/<hash> 是可预测的(hash 只是工作区路径的 sha256,不含秘密)。FileReplayStoremkdirSync(dir, {recursive, mode:0o700}) + chmodSynccreateWriteStream(..., {flags:'a'})。实测两点:① mkdirSync(recursive, 0o700) 不会收紧已存在的祖先目录(预置 0777 仍是 0777);② mkdirSync/createWriteStream跟随软链接。于是在多用户共享 /tmp 上,本地攻击者抢先把可预测的父目录建成软链接(或宽权限真目录),即可把 daemon 的 replay 写入重定向到可控位置,或预占路径造成启动 DoS。

本仓库其它地方(text-buffer.tsmermaidImageRenderer.ts)用的是 fs.mkdtempSync(path.join(os.tmpdir(), 'qwen-...'))——原子创建唯一的 0700 目录,正好规避此类问题;本 PR 偏离了该惯例。

建议:用 fs.mkdtempSync 创建 root,而非确定性的 <hash>/<pid>-<uuid>。若必须保留确定性布局,则对每级祖先 lstat 校验属主与非软链接,并以 O_NOFOLLOW 打开写流。(威胁模型限定在共享 /tmp 的本地多用户场景。)

— claude-opus-4-8 via Claude Code /qreview

inFlightChannelAwait,
]);
if (ownsReplayStoreRoot) {
await fs.rm(replayStoreRoot, { recursive: true, force: true });

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion] replayStoreRoot 只在优雅关闭路径里清理(此处 fs.rm,且仅当 ownsReplayStoreRoot),per-session 文件也只在 EventBus.close() / 批量删除时清。一旦 daemon 被 SIGKILL、OOM、崩溃或断电,整个 <pid>-<uuid>/ 子树(含所有存活 session 的 replay 日志)就留在 tmp 里;下次启动用新的 pid+uuid 目录,没有任何启动期清扫去回收死 pid 的旧目录 —— 崩溃 / 频繁重启会让这些目录无限累积。

这些文件是明文的会话 UI 事件投影(用户 prompt、工具参数与输出、权限请求等)。改动前 replay 只在内存(InMemoryReplayStore),所以这是本 PR 新引入的“落盘敏感数据 + 磁盘泄漏”。(文件 0600 / 叶子目录 0700,正常多用户机器上别的用户读不到,故是“残留 + 磁盘占用”而非跨用户泄露。)

建议:daemon 启动时扫一遍 qwen-code-daemon-replay/<hash>/,best-effort 删掉 pid 已不存在(或超 TTL)的 <pid>-<uuid> 目录;并把 root 清理挂到 process.on('exit') / 现有 SIGINT/SIGTERM 处理上作为兜底。

— claude-opus-4-8 via Claude Code /qreview

@chiga0 chiga0 left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review Overview (AI Generated)

PR: #4678 fix(daemon): restore replay history after SSE ring eviction
Type: Bug Fix (architecture-level recovery path redesign)
Change size: +1083/-85 across 13 files

Design Assessment

The overall design is sound: decoupling full transcript recovery from the bounded SSE ring and using an HTTP-delivered replay log + watermark is the correct architectural direction for solving the long-session ring eviction problem. The FileReplayStore abstraction with write-queue/flush/drain/fallback is well-engineered, and the new test coverage (bridge, eventBus, DaemonSessionProvider) is thorough.

Findings Summary (synthesized from doudouOUC + wenshao reviews + independent analysis)

  • Critical: 2 items — restoreEvents/racedEntry file collision (wenshao), replay_complete incorrectly dispatches assistant.done (doudouOUC)
  • Major: 2 items — suppressOwnUserEcho drops user messages on refresh (doudouOUC), replayFieldsFor TOCTOU may duplicate events (doudouOUC)
  • Minor: 3 items — unbounded fallbackEvents on write failure (wenshao), O(N·B) seed dispatch (wenshao), stale tmp dirs after crash (wenshao)

Independent Verification of Key Issues

F1 (restoreEvents file collision — wenshao Critical): Confirmed. createSessionEventBus(req.sessionId) at bridge.ts:1863 creates a FileReplayStore with the same path as the live session's store. When the racedEntry branch hits, restoreEvents.close() with deleteOnClose: true deletes the shared .jsonl file. The live racedEntry.events FileReplayStore now has its file gone and pendingStartIndex advanced past persisted events — subsequent snapshotReplayLog() returns only events from pendingStartIndex onward, losing the full history. Fix: give the restore-scoped EventBus a different directory (e.g. append -restore-{uuid}), or set deleteOnClose: false and clean up manually after the racedEntry check.

F2 (replay_complete → assistant.done — doudouOUC P1): Confirmed. Verified at DaemonSessionProvider.tsx:497-511. After HTTP replay seed (lines 396-438), the SSE subscription receives replay_complete from EventBus.subscribe(). The existing handler dispatches assistant.done when no active prompt exists. If the replay seed ended with a still-streaming assistant (no terminal turn_complete/turn_error), the next replay_complete would incorrectly mark it as done. The test "keeps replay-seeded assistant streaming when no terminal replay event exists" uses a mock that never yields replay_complete, so this path isn't exercised.

F3 (suppressOwnUserEcho in seed — doudouOUC P1): Confirmed. At DaemonSessionProvider.tsx:426, the seed loop passes eventOptions.suppressOwnUserEcho to normalizeDaemonEvent. Since clientId persists in sessionStorage across tab refresh, replay seed with this flag drops the user's own historical user messages. Fix: force suppressOwnUserEcho: false during replay seed.

F4 (replayFieldsFor TOCTOU — doudouOUC P1/P2): Partially mitigated but not fully resolved. Independent trace: lastEventId is read synchronously, then snapshotReplayLog() awaits FileReplayStore.snapshot(). If a concurrent publish() completes its file write during the await, readFile returns the file including that event, but lastEventId is stale by 1. appendUniqueEvents dedup works for events still in pendingEvents, but NOT for events whose file writes already completed (they're past pendingStartIndex and in the file). Client seeds from replayEvents (includes new events), subscribes SSE from stale lastEventId → ring replays the overlapping events → duplicate text.

Other Observations

wenshao's remaining suggestions (unbounded fallbackEvents, O(N·B) dispatch, tmp dir cleanup, predictable tmp path) are all valid engineering concerns. The O(N·B) batch dispatch and tmp dir security are particularly important for production deployment.

Overall: the architecture is correct, but the 4 confirmed issues (F1-F4) need to be addressed before merge.


This review was generated by QoderWork AI

const replayFieldsFor = async (
entry: SessionEntry,
): Promise<Pick<BridgeRestoredSession, 'lastEventId' | 'replayEvents'>> => {
const lastEventId = entry.events.lastEventId;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Major] replayFieldsFor TOCTOU — appendUniqueEvents partial mitigation

Independent trace confirms appendUniqueEvents in FileReplayStore.snapshot() does NOT fully close the TOCTOU gap doudouOUC identified.

The race window: lastEventId (synchronous getter) is captured, then snapshotReplayLog() awaits FileReplayStore.snapshot(). During the await, if EventBus.publish() completes a file write (advancing pendingStartIndex) for a new event, readFile returns the file including that event, but lastEventId is stale by 1.

appendUniqueEvents dedup only covers events still in pendingEvents (past pendingStartIndex). Events whose file writes completed between lastEventId read and readFile are past the pendingStartIndex boundary — they're in the file AND the replay, but the watermark doesn't cover them.

Concrete fix: capture lastEventId AFTER snapshotReplayLog() returns, from the max event id in the returned array:

const replayEvents = action === 'load'
  ? await entry.events.snapshotReplayLog()
  : undefined;
const lastEventId = replayEvents?.length
  ? Math.max(...replayEvents.map(e => typeof e.id === 'number' ? e.id : 0))
  : entry.events.lastEventId;

Or provide an atomic snapshotReplayWithWatermark() on EventBus that captures both under a single synchronous barrier.


This review was generated by QoderWork AI

clientId,
createdAt: racedEntry.createdAt,
state: racedEntry.restoreState ?? {},
...(await replayFieldsFor(racedEntry)),

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Critical] restoreEvents.close() deletes racedEntry's live replay file

Both restoreEvents (line 1863) and racedEntry.events use createSessionEventBus(sessionId)FileReplayStore({dir: replayStoreRoot, sessionId}) → same file path replayStoreRoot/<safeSessionId>.jsonl.

When this racedEntry branch is taken:

  1. restoreEvents.close() (line ~1967) → deleteFileReplayStore() → deletes <sessionId>.jsonl
  2. But racedEntry.events (the LIVE session's EventBus) still has its FileReplayStore pointing to the same path
  3. replayFieldsFor(racedEntry) here → racedEntry.events.snapshotReplayLog()readFile ENOENT → falls back to pendingEvents[pendingStartIndex:]
  4. If file writes were flushed (pendingStartIndex advanced), those events are lost from the snapshot

This corrupts the live session's replay capability for the rest of its lifetime.

Fix options:

  • Give restore-scoped EventBus a unique dir: new FileReplayStore({dir: path.join(replayStoreRoot, 'restore-${randomUUID()}'), sessionId, deleteOnClose: true})
  • Or set deleteOnClose: false for restore-scoped stores and clean up the file manually after the racedEntry check

This review was generated by QoderWork AI

}
const normalized = normalizeDaemonEvent(replayEvent, {
clientId: activeSession.clientId,
suppressOwnUserEcho: eventOptions.suppressOwnUserEcho,

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Major] HTTP replay seed must not suppress own-user echo

Confirmed doudouOUC's analysis. clientId persists in sessionStorage across tab refresh, so the replay log's bridge-echoed user_message_chunk events carry the same originatorClientId as the refreshing client. Passing suppressOwnUserEcho: eventOptions.suppressOwnUserEcho here drops the user's own historical user messages during replay seed.

This is semantically different from live SSE dedup: replay seed rebuilds transcript from empty state — there's no "own echo" to suppress, it's the authoritative history.

Suggested fix:

const normalized = normalizeDaemonEvent(replayEvent, {
  clientId: activeSession.clientId,
  suppressOwnUserEcho: false, // replay seed: always include own messages
  includeRawEvent: eventOptions.includeRawEvent,
});

Live SSE (below the seed loop) should keep the original eventOptions.suppressOwnUserEcho.


This review was generated by QoderWork AI

replaySeededSessionsRef.current.add(activeSession);
const eventOptions = eventOptionsRef.current;
let seededEventCount = 0;
for (const replayEvent of activeSession.replayEvents) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Critical] Replay seeding loop lacks per-event error boundary

The seeding loop (lines 410–441) processes each replay event without a try/catch, unlike the SSE event loop below (line 451+) which wraps every event in try { ... } catch that dispatches a recoverable error action.

If normalizeDaemonEvent or store.dispatch throws on any replay event (e.g., deeply nested structures exceeding call stack, corrupted JSONL entry), the exception propagates to the outer catch block which treats it as a connection failure and triggers a full reconnect. On reconnect, the same replay events are returned from FileReplayStore, causing the same throw — creating an infinite reconnect loop with exponential backoff.

Suggested change
for (const replayEvent of activeSession.replayEvents) {
if (
!replaySeededSessionsRef.current.has(activeSession) &&
activeSession.replayEvents.length > 0
) {
replaySeededSessionsRef.current.add(activeSession);
const eventOptions = eventOptionsRef.current;
let seededEventCount = 0;
for (const replayEvent of activeSession.replayEvents) {
try {
if (replayEvent.type === 'turn_complete') {
const stopReason =
(replayEvent.data as DaemonTurnCompleteData | undefined)
?.stopReason ?? 'replay_complete';
store.dispatch({ type: 'assistant.done', reason: stopReason });
setPromptStatus('idle');
continue;
}
if (replayEvent.type === 'turn_error') {
store.dispatch({ type: 'assistant.done', reason: 'error' });
setPromptStatus('idle');
continue;
}
const normalized = normalizeDaemonEvent(replayEvent, {
clientId: activeSession.clientId,
suppressOwnUserEcho: eventOptions.suppressOwnUserEcho,
includeRawEvent: eventOptions.includeRawEvent,
});
if (normalized.length > 0) {
store.dispatch(normalized);
}
} catch (error) {
const message =
error instanceof Error ? error.message : String(error);
store.dispatch({
type: 'error',
text: `Skipped malformed replay event: ${message}`,
recoverable: true,
});
}
seededEventCount++;
if (seededEventCount % 100 === 0) {
if (disposed || abort.signal.aborted) return;
await delay(0, abort.signal);
}
}
setConnection((c) => ({ ...c, catchingUp: undefined }));
}

— qwen3.7-max via Qwen Code /review

}

/** Defensive copy of all events currently retained in the ring. */
snapshotRing(): BridgeEvent[] {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Suggestion] snapshotRing() is dead public API

This method has zero production callers — all 7 references are in eventBus.test.ts. Production code uses snapshotReplayLog() instead. Adding a public method to a core class with no production consumer increases API surface and invites confusion about which snapshot method to use.

Consider either removing snapshotRing() and having tests access the ring directly (via a test-only getter or @internal annotation), or marking it @internal to signal it's not part of the public contract.

— qwen3.7-max via Qwen Code /review

@doudouOUC doudouOUC left a comment

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

整体方案合理,架构解耦正确。以下几个问题按严重度标注,#1 建议本 PR 内修复,其余可 follow-up。

const replayFieldsFor = async (
entry: SessionEntry,
): Promise<Pick<BridgeRestoredSession, 'lastEventId' | 'replayEvents'>> => {
const lastEventId = entry.events.lastEventId;

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Medium] lastEventIdsnapshotReplayLog() 之前捕获。snapshot() 内部 await readFile() 让出事件循环后,新的 publish() 调用会把事件追加到 pendingEvents,后续 .slice(pendingStartIndex) 会把这些新事件包进返回值。

结果:返回的 replayEvents 包含 ID > lastEventId 的事件。客户端拿 lastEventId 做 SSE cursor,SSE 从 lastEventId + 1 开始 replay → 这些事件被 SSE 再发一遍 → transcript 出现重复 chunk。

触发路径:prompt 进行中刷新页面(loadSession 期间有活跃事件发布)。窗口很窄但可复现。

建议修复:先 snapshot,再取 lastEventId;或 snapshot 结果 filter 掉 ID > watermark 的事件。

.filter((id): id is number => typeof id === 'number'),
);
for (const event of volatileEvents) {
if (typeof event.id === 'number') {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Low] 当 JSONL 中某行损坏被 skip 且该事件仍在 pendingEvents 中时,dedup 后追加到末尾会导致乱序。

例:file = [1, 2, <malformed>, 4, 5],pending 有 [3, 4, 5]。去重后 volatile 只剩 [3],追加到 out 末尾 → [1, 2, 4, 5, 3]

概率极低(需磁盘 I/O 恰好损坏特定行 + 该事件仍在 pending),但如果发生会导致 transcript 事件错乱。

如果要修可以在返回前按 event.id 排序,或在 append 时做插入排序。非阻断。

@@ -685,6 +692,15 @@ export function createHttpAcpBridge(opts: BridgeOptions): HttpAcpBridge {
`Must be a positive integer in [1, ${MAX_EVENT_RING_SIZE}].`,
);

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Low] 进程 crash(SIGKILL / OOM)时 shutdown() 不会执行,replay 文件永久残留在 os.tmpdir() 下。每次 crash 留一个 {pid}-{uuid} 目录。macOS 的 /private/tmp 不像 Linux 重启清空。

建议 follow-up:bridge 启动时扫描同 workspacePathHash 下的旧目录,用 process.kill(pid, 0) 检查 PID 是否存活,不存活的删掉。

);
pendingSessionLoadRef.current = undefined;
const pendingLoad = pendingSessionLoadRef.current;
window.setTimeout(() => {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Low] 极端场景下 promise 可能 never settle:

  1. loadSession('B')pendingLoad₁ 存入 ref
  2. cleanup 调度 setTimeout 准备 reject
  3. timeout 执行前用户又调 loadSession('C') → ref 被替换为 pendingLoad₂
  4. timeout 执行,ref !== pendingLoad₁,退出不 reject
  5. pendingLoad₁.promise 永远 unsettled,调用者 await 永远 hang

触发概率极低(需毫秒级连续切 session),非阻断。如果要修,可以在条件不满足分支也 reject(用 AbortError)。

@doudouOUC

Copy link
Copy Markdown
Collaborator

长会话性能 follow-up 建议

当前方案对日常会话长度(< 5K events)没有问题。但超长 session(3h+ 重度使用,10K-50K+ events)下,loadSession 的全量 replay 会成为瓶颈:

  • 服务端snapshot() 全量 readFile + JSON.parse 每行 → 峰值内存 ~3-4x 文件大小
  • 网络:HTTP 响应体随会话长度线性增长(50K events ≈ 50-100MB)
  • 客户端:浏览器 parse + replay seeding 可能 > 5s,期间 UI 空白

最大的放大因素是 streaming chunk 未合并 — 一次 2000 token 的 assistant response 存为 ~200 个独立 agent_message_chunk 事件,对 UI 重建来说只需要一个合并后的文本。

建议优先级

优先级 优化 收益
P1 streaming chunk coalescing — 存储时合并连续同类 chunk 事件数降 5-20x,直接把问题推迟一个数量级
P1 流式读取 — snapshot() 用 readline 逐行 parse + chunked HTTP response 峰值内存从 O(file) 降到 O(line)
P2 分页/cursor — loadSession 只返最近 N 条 + watermark,旧历史按需拉 首屏恢复恒定 < 200ms
P3 定期 compact — 后台将 JSONL 压缩为 coalesced transcript snapshot 磁盘 + 读取双收益

不阻断本 PR,先保证正确性;等长会话规模真的压到 load 性能时再按上面路径演进。

@doudouOUC

Copy link
Copy Markdown
Collaborator

Superseded by #4694.

新 PR 用 turn-boundary compaction 替代原始事件全量存储,解决了本 PR review 中识别的几个问题:

  • watermark vs snapshot 异步竞态 → 同步 snapshot() 消除
  • JSONL 文件无界增长 → 纯内存 O(turns) 压缩
  • resume 路径带宽浪费 → server 端按 action 过滤
  • 双队列/dedup/compaction-index 复杂度 → slot-based 有序合并

负载对比:典型 3h 重度会话从 ~50MB 降到 ~2MB。

@doudouOUC

Copy link
Copy Markdown
Collaborator

Closed in favor of #4694 (compacted replay approach).

@doudouOUC doudouOUC closed this Jun 2, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants