fix(daemon): restore replay history after SSE ring eviction#4678
Conversation
📋 Review SummaryThis PR implements a critical fix for daemon session recovery by separating full session replay recovery from the bounded SSE ring buffer. The changes introduce a persistent replay log (via 🔍 General Feedback
🎯 Specific Feedback🔴 Critical
🟡 High
🟢 Medium
🔵 Low
✅ Highlights
|
9c4141a to
67ffbbe
Compare
67ffbbe to
7e44574
Compare
doudouOUC
left a comment
There was a problem hiding this comment.
整体方向是对的:把完整历史恢复从有界 SSE ring 里拆出来,改成 loadSession 通过 HTTP replay log + watermark 返回,是解决长 daemon session 刷新后 ring eviction 的正确设计。但当前实现里 HTTP replay 与后续 SSE 订阅的语义还没有完全对齐,下面三个问题会导致 transcript 错误恢复,建议修完再合并。
[P1] HTTP replay seed 后,SSE 的 replay_complete 会把仍在流式中的 assistant 错误标记为完成。
DaemonSessionClient.load() 会把 cursor 初始化到服务端返回的 lastEventId,因此 WebUI 先用 activeSession.replayEvents seed 出 transcript,随后一订阅 SSE 就会收到 EventBus.subscribe() 发送的 replay_complete sentinel(即使 replayedCount: 0)。当前 DaemonSessionProvider 在收到 session.replay_complete 时,只要没有 active prompt,就会 dispatch assistant.done。这会把 replay tail 中没有 turn_complete 的仍在生成中的 assistant block 立即标成 finished。
相关位置:packages/acp-bridge/src/eventBus.ts:495、packages/webui/src/daemon/session/DaemonSessionProvider.tsx:497。现有新增测试的 mock stream 没模拟真实 EventBus 的 replay_complete,所以没覆盖到。建议区分“HTTP snapshot seed 后的 SSE catch-up sentinel”和“旧语义的 replay complete”,或者至少只在确实 replay 了 terminal/catch-up completion 时结束 assistant。
[P1] HTTP replay seed 不能复用 live echo 的 suppressOwnUserEcho 逻辑,否则同 tab 刷新会丢自己的 user message。
WebUI 的 clientId 存在 sessionStorage,页面刷新后通常仍是同一个 clientId。live session replay log 里 bridge echo 的 user_message_chunk 带原来的 originatorClientId。当前 replay seed 调 normalizeDaemonEvent(... suppressOwnUserEcho: true),normalizer 在 originatorClientId === clientId 时会丢掉这条 user message。HTTP replay seed 是从空 store 重建完整历史,不应该沿用 live SSE 去重语义。
相关位置:packages/webui/src/daemon/session/DaemonSessionProvider.tsx:424、packages/sdk-typescript/src/daemon/ui/normalizer.ts:387。建议 replay seed 强制 suppressOwnUserEcho: false,并补一个“同 clientId load/refresh 恢复自己的 prompt”的测试。
[P1/P2] replayEvents 和 lastEventId 不是一致快照,可能导致重复应用事件。
replayFieldsFor() 先读 entry.events.lastEventId,再 await snapshotReplayLog()。如果这段 await 期间有新事件发布,snapshot 可能包含比返回的 lastEventId 更大的事件;客户端先 seed 这些事件,再用旧 watermark 订阅 SSE,ring 会再 replay 一遍,文本/工具状态可能重复。
相关位置:packages/acp-bridge/src/bridge.ts:1771。建议让 replay snapshot 和 watermark 同源,例如返回 replay 数组后用其中最大 event id 作为 cursor,或者在 EventBus 上提供一个原子 snapshotReplayWithWatermark()。需要补并发 publish 的测试,证明不会重复/不丢。
doudouOUC
left a comment
There was a problem hiding this comment.
补充 inline comments,方便逐行处理。总评里的结论不变:方向对,但这几个恢复语义问题需要先修。
| await delay(0, abort.signal); | ||
| } | ||
| } | ||
| setConnection((c) => ({ ...c, catchingUp: undefined })); |
There was a problem hiding this comment.
HTTP replay seed 之后,后续 SSE 订阅仍会收到 EventBus.subscribe() 的 replay_complete sentinel(包括 replayedCount: 0)。当前文件下面的 session.replay_complete 分支会在没有 active prompt 时 dispatch assistant.done,这会把 replay tail 中仍在 streaming、但尚未出现 turn_complete 的 assistant block 错误标成完成。建议让 replay_complete 只表示 transport catch-up drained、只清 catchingUp,turn 结束仍只由 turn_complete / turn_error / cancel / session died 这类业务事件驱动,并补真实 EventBus sentinel 的测试。
| } | ||
| const normalized = normalizeDaemonEvent(replayEvent, { | ||
| clientId: activeSession.clientId, | ||
| suppressOwnUserEcho: eventOptions.suppressOwnUserEcho, |
There was a problem hiding this comment.
HTTP replay seed 是从空 transcript 重建完整历史,不能复用 live SSE 的 own-echo suppression。WebUI clientId 存在 sessionStorage,同一个 tab 刷新后 clientId 通常不变;replay log 里的 bridge-echo user_message_chunk 会带原 originatorClientId,这里传 suppressOwnUserEcho: true 会把自己的历史 user message 丢掉。建议 replay seed 强制 suppressOwnUserEcho: false,live SSE 仍保留当前去重逻辑,并补同 clientId refresh 的测试。
| const replayFieldsFor = async ( | ||
| entry: SessionEntry, | ||
| ): Promise<Pick<BridgeRestoredSession, 'lastEventId' | 'replayEvents'>> => { | ||
| const lastEventId = entry.events.lastEventId; |
There was a problem hiding this comment.
这里的 watermark 和 replayEvents 不是同一个快照:先同步读取 lastEventId,再 await snapshotReplayLog()。如果 await 期间有新事件发布,返回的 replayEvents 可能已经包含比 lastEventId 更大的事件;客户端 seed 后再用旧 cursor 订阅 SSE,会从 ring 里再次 replay 这些事件,导致文本/工具状态重复。建议把 snapshot 和 watermark 做成同源结果,例如从返回的 replayEvents 最大 numeric id 计算 cursor,或在 EventBus 上提供一个原子 snapshotReplayWithWatermark(),并补并发 publish 测试。
|
补充一个非阻断的长会话扩展建议,和上面几个 correctness 问题分开看: 当前 PR 先用 HTTP
这样短期路径保持简单正确:HTTP replay 完整恢复,SSE 追增量;等长会话规模真的压到 load 性能时,再把 HTTP replay 从“原始事件全量”升级为“压缩 transcript snapshot + 分页/流式历史”。 |
| ); | ||
| pendingSessionLoadRef.current = undefined; | ||
| const pendingLoad = pendingSessionLoadRef.current; | ||
| window.setTimeout(() => { |
There was a problem hiding this comment.
[Critical] setTimeout(0) 延迟 reject 在生产环境中会错误地中止 新 session 的 load promise。
时序分析:用户切换 session 时,startSessionSwitch 同步将 pendingSessionLoadRef.current 设置为新 session 的 load 对象,然后 React 批量状态更新触发重渲染。旧 effect 的 cleanup 捕获 pendingLoad = pendingSessionLoadRef.current(此时已是新 session 的对象),安排 setTimeout(0)。新 effect 的 run() 开始执行并 await client.capabilities()(HTTP 调用,需要 >1ms)。
setTimeout(0) 是宏任务,在微任务队列清空后的下一个事件循环迭代触发。此时 HTTP 响应尚未返回,pendingSessionLoadRef.current === pendingLoad 为 true(新 effect 尚未 resolve),guard 条件不生效,新 session 的 load promise 被错误 reject 为 AbortError。
为什么测试通过:mock 的 DaemonSessionClient 在微任务时间内解析(快于 setTimeout 宏任务),所以 pendingLoad.resolve() 在 setTimeout 触发前就已执行,pendingSessionLoadRef.current 已被清空,guard 条件生效跳过 reject。
| window.setTimeout(() => { | |
| if (pendingSessionLoadRef.current) { | |
| const pendingLoad = pendingSessionLoadRef.current; | |
| // Check if this pending load belongs to the session being torn down | |
| // (not a newer session switch that already superseded it). | |
| // Use sessionId matching instead of setTimeout to avoid the | |
| // macrotask/microtask race that causes false rejections in production. | |
| if (pendingLoad.sessionId === restoreSessionId) { | |
| clearTimeout(pendingLoad.timeout); | |
| pendingSessionLoadRef.current = undefined; | |
| pendingLoad.reject(new DOMException('Aborted', 'AbortError')); | |
| } | |
| } |
— qwen3.7-max via Qwen Code /review
| } | ||
|
|
||
| const restoreEvents = new EventBus(eventRingSize); | ||
| const restoreEvents = createSessionEventBus(req.sessionId); |
There was a problem hiding this comment.
[Critical] restoreEvents 现在通过 createSessionEventBus(req.sessionId) 创建,其 FileReplayStore 的文件路径 fileReplayStorePath(replayStoreRoot, sessionId) 只按 sessionId 区分,且 deleteOnClose: true。在下方 racedEntry 分支(约 1965-1981 行)里,当 byId.get(req.sessionId) 命中一个并发已注册的存活会话时,会先 restoreEvents.close()(1967 行)——它删掉的正是与存活的 racedEntry.events 完全相同的那个 replay 文件——紧接着 1980 行又用 replayFieldsFor(racedEntry) 去读这个刚被删的文件。
后果:① 本次返回的 replayEvents 只剩内存残余;② 更严重的是 racedEntry.events 仍持有已打开的 append 写流,文件被 unlink 后它继续写入“孤儿 inode”,而 snapshot() 里 fs.access(filePath) 之后一直 ENOENT —— 该会话此后所有 /load 都只能拿到内存尾巴,磁盘历史被静默永久破坏,正好抵消本 PR 要解决的“ring 淘汰后恢复完整历史”。
这是本 PR 引入的回归:改动前两边都是纯内存 new EventBus(eventRingSize),loser 的 close 不影响 winner。触发概率较低(需 doSpawn 的 newSession 与并发 load 撞上同一个 sessionId),但一旦命中即静默不可恢复的数据丢失,且该 race 分支当前无测试覆盖。
建议:race-loser 分支只关闭 loser 的 bus 而不删共享文件(给 EventBus.close()/FileReplayStore 增加“close 但不 delete”模式,在 1967 处使用);或让 replay 文件名带上每个 bus 自己的唯一后缀,不要仅以 sessionId 命名。
— claude-opus-4-8 via Claude Code /qreview
|
|
||
| private failWrites(error: unknown): void { | ||
| if (this.failed || this.closed) return; | ||
| this.failed = true; |
There was a problem hiding this comment.
[Suggestion] this.failed 一旦置 true 就永不复位(全文件没有 failed = false)。此后每次 append() 都把事件推进 fallbackEvents(97/220 行),而 fallbackEvents 没有任何容量上限,会在整个 session 生命周期里无界增长——恰好把这个 PR 想消灭的“无界内存 replay 数组”又带了回来,且只在首次失败时打印一行 warn。进程退出时这些 fallback 事件(以及本应落盘的历史)只在内存里,全部丢失。
触发需要一次写失败(ENOSPC / EMFILE / 临时目录被清理等降级场景),但触发后是永久退化,在繁忙 daemon 上表现为“无声的内存增长 + 重启后 /load 拿到空 transcript”,凌晨排障时极难定位。
建议:① 给 fallbackEvents 设上限(超过后丢弃最旧并打印一次“replay history truncated”),避免拖垮 daemon;② 可考虑带退避地周期性重试 ensureStream()(重置 failed),让瞬时错误自愈。
— claude-opus-4-8 via Claude Code /qreview
| includeRawEvent: eventOptions.includeRawEvent, | ||
| }); | ||
| if (normalized.length > 0) { | ||
| store.dispatch(normalized); |
There was a problem hiding this comment.
[Suggestion] seed 循环对每个 replay 事件单独调用一次 store.dispatch(normalized),每次 dispatch 都跑一遍 reducer,做一次 Object.freeze(blocks)(O(B))和一次 takeBlocksOwnership 的 [...state.blocks] 拷贝(O(B));而 webui 把 maxBlocks 设成 200_000(本文件 ~115 行),B 实际不会被裁剪小,于是整段 seed 是 O(N·B)。实测 N=10000 时主线程 ≈ 930ms,改成单次批量 dispatch ≈ 0.27ms。
雪上加霜:433-435 行“每 100 个事件让一次”用的是 delay(0),而 delay(0) 返回 Promise.resolve()(timing.ts:54),是 microtask,并不会把主线程让给浏览器渲染/输入(那需要 setTimeout/MessageChannel/scheduler.yield 这类 macrotask)。所以整段 seed 在一个 macrotask 内跑完,页面全程冻结。
这正好打在本 PR 的目标场景上——长会话 ring 淘汰后恢复时 transcript 最大、seed 最慢,且每次 ring_evicted 重载都会重新 seed 一遍。
建议:seed 改成单次(或每 500-1000 个)批量 store.dispatch(allUiEvents)(reducer 每次只 freeze/拷贝一次,与长度无关),遇到 turn_complete/turn_error 时先 flush 累积批次;并把周期性让出改成真正的 macrotask(await new Promise(r => setTimeout(r, 0)),受 abort 信号保护)。
— claude-opus-4-8 via Claude Code /qreview
| const events: BridgeEvent[] = []; | ||
| try { | ||
| await fs.promises.access(this.filePath); | ||
| const raw = await fs.promises.readFile(this.filePath, 'utf8'); |
There was a problem hiding this comment.
[Suggestion] replay JSONL 文件在 server 端没有任何大小/条数上限或轮转:SSE ring 有界(默认 8000),但该文件按设计是“无界权威源”,整个 session 期间每事件追加一行。而 snapshot() 在 /load 时用 fs.promises.readFile(filePath, 'utf8') 把整个文件一次性读进内存再逐行 JSON.parse 成数组,最后整包塞进 /load 的 HTTP 响应(bridge.ts:1778)。
后果:长 / 高频会话(大量工具输出、思考流)会把 os.tmpdir() 撑满;且每次 /load/attach 都把整文件读进 daemon RSS 并序列化成可能数百 MB 的响应,每个 attach 的客户端都重复付出。这是 server 端的磁盘 + 内存维度,与已讨论的“浏览器侧 payload 过大”是不同的面。
建议:给 replay store 设字节/条数上限并丢弃/压缩最旧内容(SSE ring 本就接受“很旧的事件掉出快速 replay”,HTTP replay 可采用同样的有界策略);snapshot/响应改为按行流式而非整文件缓冲;至少先 fs.stat,超阈值就拒绝整读。
— claude-opus-4-8 via Claude Code /qreview
| ); | ||
| } | ||
| const ownsReplayStoreRoot = opts.replayStoreDir === undefined; | ||
| const replayStoreRoot = |
There was a problem hiding this comment.
[Suggestion] replayStoreRoot 形如 os.tmpdir()/qwen-code-daemon-replay/<sha256(boundWorkspace)[:16]>/<pid>-<uuid>,只有最后一段 <pid>-<uuid> 不可预测,父目录 qwen-code-daemon-replay/<hash> 是可预测的(hash 只是工作区路径的 sha256,不含秘密)。FileReplayStore 用 mkdirSync(dir, {recursive, mode:0o700}) + chmodSync 再 createWriteStream(..., {flags:'a'})。实测两点:① mkdirSync(recursive, 0o700) 不会收紧已存在的祖先目录(预置 0777 仍是 0777);② mkdirSync/createWriteStream 都跟随软链接。于是在多用户共享 /tmp 上,本地攻击者抢先把可预测的父目录建成软链接(或宽权限真目录),即可把 daemon 的 replay 写入重定向到可控位置,或预占路径造成启动 DoS。
本仓库其它地方(text-buffer.ts、mermaidImageRenderer.ts)用的是 fs.mkdtempSync(path.join(os.tmpdir(), 'qwen-...'))——原子创建唯一的 0700 目录,正好规避此类问题;本 PR 偏离了该惯例。
建议:用 fs.mkdtempSync 创建 root,而非确定性的 <hash>/<pid>-<uuid>。若必须保留确定性布局,则对每级祖先 lstat 校验属主与非软链接,并以 O_NOFOLLOW 打开写流。(威胁模型限定在共享 /tmp 的本地多用户场景。)
— claude-opus-4-8 via Claude Code /qreview
| inFlightChannelAwait, | ||
| ]); | ||
| if (ownsReplayStoreRoot) { | ||
| await fs.rm(replayStoreRoot, { recursive: true, force: true }); |
There was a problem hiding this comment.
[Suggestion] replayStoreRoot 只在优雅关闭路径里清理(此处 fs.rm,且仅当 ownsReplayStoreRoot),per-session 文件也只在 EventBus.close() / 批量删除时清。一旦 daemon 被 SIGKILL、OOM、崩溃或断电,整个 <pid>-<uuid>/ 子树(含所有存活 session 的 replay 日志)就留在 tmp 里;下次启动用新的 pid+uuid 目录,没有任何启动期清扫去回收死 pid 的旧目录 —— 崩溃 / 频繁重启会让这些目录无限累积。
这些文件是明文的会话 UI 事件投影(用户 prompt、工具参数与输出、权限请求等)。改动前 replay 只在内存(InMemoryReplayStore),所以这是本 PR 新引入的“落盘敏感数据 + 磁盘泄漏”。(文件 0600 / 叶子目录 0700,正常多用户机器上别的用户读不到,故是“残留 + 磁盘占用”而非跨用户泄露。)
建议:daemon 启动时扫一遍 qwen-code-daemon-replay/<hash>/,best-effort 删掉 pid 已不存在(或超 TTL)的 <pid>-<uuid> 目录;并把 root 清理挂到 process.on('exit') / 现有 SIGINT/SIGTERM 处理上作为兜底。
— claude-opus-4-8 via Claude Code /qreview
chiga0
left a comment
There was a problem hiding this comment.
Code Review Overview (AI Generated)
PR: #4678 fix(daemon): restore replay history after SSE ring eviction
Type: Bug Fix (architecture-level recovery path redesign)
Change size: +1083/-85 across 13 files
Design Assessment
The overall design is sound: decoupling full transcript recovery from the bounded SSE ring and using an HTTP-delivered replay log + watermark is the correct architectural direction for solving the long-session ring eviction problem. The FileReplayStore abstraction with write-queue/flush/drain/fallback is well-engineered, and the new test coverage (bridge, eventBus, DaemonSessionProvider) is thorough.
Findings Summary (synthesized from doudouOUC + wenshao reviews + independent analysis)
- Critical: 2 items — restoreEvents/racedEntry file collision (wenshao), replay_complete incorrectly dispatches assistant.done (doudouOUC)
- Major: 2 items — suppressOwnUserEcho drops user messages on refresh (doudouOUC), replayFieldsFor TOCTOU may duplicate events (doudouOUC)
- Minor: 3 items — unbounded fallbackEvents on write failure (wenshao), O(N·B) seed dispatch (wenshao), stale tmp dirs after crash (wenshao)
Independent Verification of Key Issues
F1 (restoreEvents file collision — wenshao Critical): Confirmed. createSessionEventBus(req.sessionId) at bridge.ts:1863 creates a FileReplayStore with the same path as the live session's store. When the racedEntry branch hits, restoreEvents.close() with deleteOnClose: true deletes the shared .jsonl file. The live racedEntry.events FileReplayStore now has its file gone and pendingStartIndex advanced past persisted events — subsequent snapshotReplayLog() returns only events from pendingStartIndex onward, losing the full history. Fix: give the restore-scoped EventBus a different directory (e.g. append -restore-{uuid}), or set deleteOnClose: false and clean up manually after the racedEntry check.
F2 (replay_complete → assistant.done — doudouOUC P1): Confirmed. Verified at DaemonSessionProvider.tsx:497-511. After HTTP replay seed (lines 396-438), the SSE subscription receives replay_complete from EventBus.subscribe(). The existing handler dispatches assistant.done when no active prompt exists. If the replay seed ended with a still-streaming assistant (no terminal turn_complete/turn_error), the next replay_complete would incorrectly mark it as done. The test "keeps replay-seeded assistant streaming when no terminal replay event exists" uses a mock that never yields replay_complete, so this path isn't exercised.
F3 (suppressOwnUserEcho in seed — doudouOUC P1): Confirmed. At DaemonSessionProvider.tsx:426, the seed loop passes eventOptions.suppressOwnUserEcho to normalizeDaemonEvent. Since clientId persists in sessionStorage across tab refresh, replay seed with this flag drops the user's own historical user messages. Fix: force suppressOwnUserEcho: false during replay seed.
F4 (replayFieldsFor TOCTOU — doudouOUC P1/P2): Partially mitigated but not fully resolved. Independent trace: lastEventId is read synchronously, then snapshotReplayLog() awaits FileReplayStore.snapshot(). If a concurrent publish() completes its file write during the await, readFile returns the file including that event, but lastEventId is stale by 1. appendUniqueEvents dedup works for events still in pendingEvents, but NOT for events whose file writes already completed (they're past pendingStartIndex and in the file). Client seeds from replayEvents (includes new events), subscribes SSE from stale lastEventId → ring replays the overlapping events → duplicate text.
Other Observations
wenshao's remaining suggestions (unbounded fallbackEvents, O(N·B) dispatch, tmp dir cleanup, predictable tmp path) are all valid engineering concerns. The O(N·B) batch dispatch and tmp dir security are particularly important for production deployment.
Overall: the architecture is correct, but the 4 confirmed issues (F1-F4) need to be addressed before merge.
This review was generated by QoderWork AI
| const replayFieldsFor = async ( | ||
| entry: SessionEntry, | ||
| ): Promise<Pick<BridgeRestoredSession, 'lastEventId' | 'replayEvents'>> => { | ||
| const lastEventId = entry.events.lastEventId; |
There was a problem hiding this comment.
[Major] replayFieldsFor TOCTOU — appendUniqueEvents partial mitigation
Independent trace confirms appendUniqueEvents in FileReplayStore.snapshot() does NOT fully close the TOCTOU gap doudouOUC identified.
The race window: lastEventId (synchronous getter) is captured, then snapshotReplayLog() awaits FileReplayStore.snapshot(). During the await, if EventBus.publish() completes a file write (advancing pendingStartIndex) for a new event, readFile returns the file including that event, but lastEventId is stale by 1.
appendUniqueEvents dedup only covers events still in pendingEvents (past pendingStartIndex). Events whose file writes completed between lastEventId read and readFile are past the pendingStartIndex boundary — they're in the file AND the replay, but the watermark doesn't cover them.
Concrete fix: capture lastEventId AFTER snapshotReplayLog() returns, from the max event id in the returned array:
const replayEvents = action === 'load'
? await entry.events.snapshotReplayLog()
: undefined;
const lastEventId = replayEvents?.length
? Math.max(...replayEvents.map(e => typeof e.id === 'number' ? e.id : 0))
: entry.events.lastEventId;Or provide an atomic snapshotReplayWithWatermark() on EventBus that captures both under a single synchronous barrier.
This review was generated by QoderWork AI
| clientId, | ||
| createdAt: racedEntry.createdAt, | ||
| state: racedEntry.restoreState ?? {}, | ||
| ...(await replayFieldsFor(racedEntry)), |
There was a problem hiding this comment.
[Critical] restoreEvents.close() deletes racedEntry's live replay file
Both restoreEvents (line 1863) and racedEntry.events use createSessionEventBus(sessionId) → FileReplayStore({dir: replayStoreRoot, sessionId}) → same file path replayStoreRoot/<safeSessionId>.jsonl.
When this racedEntry branch is taken:
restoreEvents.close()(line ~1967) →deleteFileReplayStore()→ deletes<sessionId>.jsonl- But
racedEntry.events(the LIVE session's EventBus) still has its FileReplayStore pointing to the same path replayFieldsFor(racedEntry)here →racedEntry.events.snapshotReplayLog()→readFileENOENT → falls back topendingEvents[pendingStartIndex:]- If file writes were flushed (
pendingStartIndexadvanced), those events are lost from the snapshot
This corrupts the live session's replay capability for the rest of its lifetime.
Fix options:
- Give restore-scoped EventBus a unique dir:
new FileReplayStore({dir: path.join(replayStoreRoot, 'restore-${randomUUID()}'), sessionId, deleteOnClose: true}) - Or set
deleteOnClose: falsefor restore-scoped stores and clean up the file manually after the racedEntry check
This review was generated by QoderWork AI
| } | ||
| const normalized = normalizeDaemonEvent(replayEvent, { | ||
| clientId: activeSession.clientId, | ||
| suppressOwnUserEcho: eventOptions.suppressOwnUserEcho, |
There was a problem hiding this comment.
[Major] HTTP replay seed must not suppress own-user echo
Confirmed doudouOUC's analysis. clientId persists in sessionStorage across tab refresh, so the replay log's bridge-echoed user_message_chunk events carry the same originatorClientId as the refreshing client. Passing suppressOwnUserEcho: eventOptions.suppressOwnUserEcho here drops the user's own historical user messages during replay seed.
This is semantically different from live SSE dedup: replay seed rebuilds transcript from empty state — there's no "own echo" to suppress, it's the authoritative history.
Suggested fix:
const normalized = normalizeDaemonEvent(replayEvent, {
clientId: activeSession.clientId,
suppressOwnUserEcho: false, // replay seed: always include own messages
includeRawEvent: eventOptions.includeRawEvent,
});Live SSE (below the seed loop) should keep the original eventOptions.suppressOwnUserEcho.
This review was generated by QoderWork AI
| replaySeededSessionsRef.current.add(activeSession); | ||
| const eventOptions = eventOptionsRef.current; | ||
| let seededEventCount = 0; | ||
| for (const replayEvent of activeSession.replayEvents) { |
There was a problem hiding this comment.
[Critical] Replay seeding loop lacks per-event error boundary
The seeding loop (lines 410–441) processes each replay event without a try/catch, unlike the SSE event loop below (line 451+) which wraps every event in try { ... } catch that dispatches a recoverable error action.
If normalizeDaemonEvent or store.dispatch throws on any replay event (e.g., deeply nested structures exceeding call stack, corrupted JSONL entry), the exception propagates to the outer catch block which treats it as a connection failure and triggers a full reconnect. On reconnect, the same replay events are returned from FileReplayStore, causing the same throw — creating an infinite reconnect loop with exponential backoff.
| for (const replayEvent of activeSession.replayEvents) { | |
| if ( | |
| !replaySeededSessionsRef.current.has(activeSession) && | |
| activeSession.replayEvents.length > 0 | |
| ) { | |
| replaySeededSessionsRef.current.add(activeSession); | |
| const eventOptions = eventOptionsRef.current; | |
| let seededEventCount = 0; | |
| for (const replayEvent of activeSession.replayEvents) { | |
| try { | |
| if (replayEvent.type === 'turn_complete') { | |
| const stopReason = | |
| (replayEvent.data as DaemonTurnCompleteData | undefined) | |
| ?.stopReason ?? 'replay_complete'; | |
| store.dispatch({ type: 'assistant.done', reason: stopReason }); | |
| setPromptStatus('idle'); | |
| continue; | |
| } | |
| if (replayEvent.type === 'turn_error') { | |
| store.dispatch({ type: 'assistant.done', reason: 'error' }); | |
| setPromptStatus('idle'); | |
| continue; | |
| } | |
| const normalized = normalizeDaemonEvent(replayEvent, { | |
| clientId: activeSession.clientId, | |
| suppressOwnUserEcho: eventOptions.suppressOwnUserEcho, | |
| includeRawEvent: eventOptions.includeRawEvent, | |
| }); | |
| if (normalized.length > 0) { | |
| store.dispatch(normalized); | |
| } | |
| } catch (error) { | |
| const message = | |
| error instanceof Error ? error.message : String(error); | |
| store.dispatch({ | |
| type: 'error', | |
| text: `Skipped malformed replay event: ${message}`, | |
| recoverable: true, | |
| }); | |
| } | |
| seededEventCount++; | |
| if (seededEventCount % 100 === 0) { | |
| if (disposed || abort.signal.aborted) return; | |
| await delay(0, abort.signal); | |
| } | |
| } | |
| setConnection((c) => ({ ...c, catchingUp: undefined })); | |
| } |
— qwen3.7-max via Qwen Code /review
| } | ||
|
|
||
| /** Defensive copy of all events currently retained in the ring. */ | ||
| snapshotRing(): BridgeEvent[] { |
There was a problem hiding this comment.
[Suggestion] snapshotRing() is dead public API
This method has zero production callers — all 7 references are in eventBus.test.ts. Production code uses snapshotReplayLog() instead. Adding a public method to a core class with no production consumer increases API surface and invites confusion about which snapshot method to use.
Consider either removing snapshotRing() and having tests access the ring directly (via a test-only getter or @internal annotation), or marking it @internal to signal it's not part of the public contract.
— qwen3.7-max via Qwen Code /review
| const replayFieldsFor = async ( | ||
| entry: SessionEntry, | ||
| ): Promise<Pick<BridgeRestoredSession, 'lastEventId' | 'replayEvents'>> => { | ||
| const lastEventId = entry.events.lastEventId; |
There was a problem hiding this comment.
[Medium] lastEventId 在 snapshotReplayLog() 之前捕获。snapshot() 内部 await readFile() 让出事件循环后,新的 publish() 调用会把事件追加到 pendingEvents,后续 .slice(pendingStartIndex) 会把这些新事件包进返回值。
结果:返回的 replayEvents 包含 ID > lastEventId 的事件。客户端拿 lastEventId 做 SSE cursor,SSE 从 lastEventId + 1 开始 replay → 这些事件被 SSE 再发一遍 → transcript 出现重复 chunk。
触发路径:prompt 进行中刷新页面(loadSession 期间有活跃事件发布)。窗口很窄但可复现。
建议修复:先 snapshot,再取 lastEventId;或 snapshot 结果 filter 掉 ID > watermark 的事件。
| .filter((id): id is number => typeof id === 'number'), | ||
| ); | ||
| for (const event of volatileEvents) { | ||
| if (typeof event.id === 'number') { |
There was a problem hiding this comment.
[Low] 当 JSONL 中某行损坏被 skip 且该事件仍在 pendingEvents 中时,dedup 后追加到末尾会导致乱序。
例:file = [1, 2, <malformed>, 4, 5],pending 有 [3, 4, 5]。去重后 volatile 只剩 [3],追加到 out 末尾 → [1, 2, 4, 5, 3]。
概率极低(需磁盘 I/O 恰好损坏特定行 + 该事件仍在 pending),但如果发生会导致 transcript 事件错乱。
如果要修可以在返回前按 event.id 排序,或在 append 时做插入排序。非阻断。
| @@ -685,6 +692,15 @@ export function createHttpAcpBridge(opts: BridgeOptions): HttpAcpBridge { | |||
| `Must be a positive integer in [1, ${MAX_EVENT_RING_SIZE}].`, | |||
| ); | |||
There was a problem hiding this comment.
[Low] 进程 crash(SIGKILL / OOM)时 shutdown() 不会执行,replay 文件永久残留在 os.tmpdir() 下。每次 crash 留一个 {pid}-{uuid} 目录。macOS 的 /private/tmp 不像 Linux 重启清空。
建议 follow-up:bridge 启动时扫描同 workspacePathHash 下的旧目录,用 process.kill(pid, 0) 检查 PID 是否存活,不存活的删掉。
| ); | ||
| pendingSessionLoadRef.current = undefined; | ||
| const pendingLoad = pendingSessionLoadRef.current; | ||
| window.setTimeout(() => { |
There was a problem hiding this comment.
[Low] 极端场景下 promise 可能 never settle:
loadSession('B')→pendingLoad₁存入 ref- cleanup 调度
setTimeout准备 reject - timeout 执行前用户又调
loadSession('C')→ ref 被替换为pendingLoad₂ - timeout 执行,
ref !== pendingLoad₁,退出不 reject pendingLoad₁.promise永远 unsettled,调用者await永远 hang
触发概率极低(需毫秒级连续切 session),非阻断。如果要修,可以在条件不满足分支也 reject(用 AbortError)。
长会话性能 follow-up 建议当前方案对日常会话长度(< 5K events)没有问题。但超长 session(3h+ 重度使用,10K-50K+ events)下,
最大的放大因素是 streaming chunk 未合并 — 一次 2000 token 的 assistant response 存为 ~200 个独立 建议优先级
不阻断本 PR,先保证正确性;等长会话规模真的压到 load 性能时再按上面路径演进。 |
|
Superseded by #4694. 新 PR 用 turn-boundary compaction 替代原始事件全量存储,解决了本 PR review 中识别的几个问题:
负载对比:典型 3h 重度会话从 ~50MB 降到 ~2MB。 |
|
Closed in favor of #4694 (compacted replay approach). |
概要
loadSession返回replayEvents和lastEventId用于重建页面历史;SSE ring 只负责短暂断线后的增量追赶。Last-Event-ID: 0订阅 SSE,服务端会因为早期事件已被 ring 淘汰而返回ring_evicted,导致页面无法恢复完整 transcript。resumeSession不再返回完整 replay payload,以及ring_evicted后 WebUI 会重新走 load 恢复状态。验证
loadSession返回的 replay events 恢复完整历史,并从返回的 watermark 后继续接收实时增量。npm run build成功完成。构建过程中仅出现已有的 VS Code companion lint warning 和 Browserslist 数据过期提示,没有错误。replayEvents、lastEventId、resume和ring_evicted恢复路径。packages/acp-bridge:254 个测试通过。packages/sdk-typescript:220 个测试通过。packages/webui:99 个测试通过。npm run build成功。范围 / 风险
loadSession现在会携带用于历史重建的 replay events,超长会话的 load 响应会更大;resumeSession已避免返回这部分完整 replay payload。测试矩阵
测试矩阵说明:
npx vitest命令和npm run build。关联 Issue / Bug