feat(serve): SSE replay sizing + slow_client_warning backpressure (#4175 Wave 2.5 PR 10)#4237
Conversation
#4175 Wave 2.5 PR 10. Closes the SSE replay / backpressure knobs called out in #3803 §02 so chatty Stage 1 sessions get an honest reconnect window and operators get a heads-up signal before clients are summarily evicted. - **`DEFAULT_RING_SIZE` 4000 → 8000.** Per-session replay ring depth now matches the #3803 §02 target for chatty sessions. - **`--event-ring-size <n>`** CLI flag (default 8000) lets operators tune the ring per daemon. Threaded `ServeOptions` → `BridgeOptions.eventRingSize` → both `new EventBus()` construction sites (fresh sessions + restore path). Validation is fail-CLOSED (positive finite integer; 0 / NaN / negative throw at boot). - **`slow_client_warning` SSE frame.** When a subscriber's queue crosses 75% full the bus force-pushes a synthetic `slow_client_warning` to that subscriber once per overflow episode, carrying `{queueSize, maxQueued, lastEventId}`. The flag re-arms after the queue drains below 37.5% (hysteresis, no flap near threshold). If the queue actually overflows after the warning, the existing `client_evicted` terminal frame path still fires. Like `client_evicted`, the warning has no `id` (synthetic frame; must not burn a sequence slot for other subscribers). - **`?maxQueued=N`** query param on `GET /session/:id/events` (range `[16, 2048]`, default 256). Lets cold reconnect clients pre-size their per-subscriber backlog so a large `Last-Event-ID: 0` replay doesn't trip the warning on the first publish. Range rationale: lower bound 16 (smaller is useless for any replay); upper bound 2048 (so a single subscriber can't pin ~1 MB just by asking). Out-of-range / non-decimal returns `400 invalid_max_queued` BEFORE opening the SSE stream — clean 4xx beats half-opening a stream + emitting a `stream_error` (which EventSource would auto-reconnect on). - **`slow_client_warning` capability tag** — single source of truth for the warning frame + `?maxQueued` query param + ring-size knob. Old daemons silently lack all of these; pre-flight via `caps.features`. - **SDK extensions** (`@qwen-code/sdk`): typed `DaemonSlowClientWarningEvent` (added to known event union and `DaemonStreamLifecycleEvent`); schema-validated by a new `isSlowClientWarningData` predicate; reducer (`reduceDaemonSessionEvent`) increments `slowClientWarningCount` + stores `lastSlowClientWarning`. Warning is **non-terminal** — `alive` stays true (only `client_evicted` / `stream_error` / `session_died` close the stream). Re-exported from the public SDK entry. - **Docs**: `qwen-serve-protocol.md` updates the features list (adds `slow_client_warning` and the previously-missing `client_identity` to match reality post-#4231), documents the `?maxQueued` query param, adds the warning frame to the event table, and notes the new default ring size. `qwen-serve.md` adds the `--event-ring-size` flag row. Tests: 19 eventBus (4 new: warning at 75%, once per episode, no `id` on the synthetic frame, hysteresis re-arm), 106 bridge (2 new: validate eventRingSize accept/reject), 111 server (4 new: ?maxQueued accept/absent/non-decimal/out-of-range + EXPECTED_STAGE1_FEATURES update), 14 SDK daemonEvents (2 new: schema validation + non-terminal reducer behavior). 321 focused tests total, all green. 🤖 Generated with [Qwen Code](https://github.com/QwenLM/qwen-code)
📋 Review SummaryThis PR (#4237) implements SSE replay/backpressure improvements for chatty Stage 1 sessions, introducing three coordinated features: (1) increased default ring size (4000 → 8000), (2) 🔍 General Feedback
🎯 Specific Feedback🟡 High
🟢 Medium
🔵 Low
✅ Highlights
|
There was a problem hiding this comment.
Pull request overview
Adds configurable SSE replay/backpressure controls for qwen serve, plus SDK typing and documentation for the new slow-client warning event.
Changes:
- Raises/defaults replay ring sizing and adds
--event-ring-size. - Adds
?maxQueued=NSSE subscription sizing andslow_client_warningframes. - Extends SDK event typing/reducer state and updates serve docs/tests.
Reviewed changes
Copilot reviewed 16 out of 16 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
packages/sdk-typescript/test/unit/daemonEvents.test.ts |
Tests warning event recognition and reducer behavior. |
packages/sdk-typescript/src/index.ts |
Re-exports warning event SDK types. |
packages/sdk-typescript/src/daemon/index.ts |
Re-exports daemon warning event types. |
packages/sdk-typescript/src/daemon/events.ts |
Adds warning event schema/types and reducer state. |
packages/cli/src/serve/types.ts |
Adds ServeOptions.eventRingSize. |
packages/cli/src/serve/server.ts |
Parses/validates ?maxQueued and forwards it to subscriptions. |
packages/cli/src/serve/server.test.ts |
Adds capability and maxQueued route coverage. |
packages/cli/src/serve/runQwenServe.ts |
Threads ring size into bridge creation. |
packages/cli/src/serve/httpAcpBridge.ts |
Validates ring size and uses it for EventBus creation. |
packages/cli/src/serve/httpAcpBridge.test.ts |
Tests bridge ring-size validation. |
packages/cli/src/serve/eventBus.ts |
Raises default ring size and emits slow-client warnings. |
packages/cli/src/serve/eventBus.test.ts |
Tests warning, eviction, hysteresis, and default ring behavior. |
packages/cli/src/serve/capabilities.ts |
Advertises slow_client_warning. |
packages/cli/src/commands/serve.ts |
Adds CLI flag for event ring size. |
docs/users/qwen-serve.md |
Documents serve flag and backpressure behavior. |
docs/developers/qwen-serve-protocol.md |
Documents protocol capability, query param, warning frame, and replay size. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Address the actionable items from the Qwen Code review bot's pass on PR #4237: - Pre-compute `warnThreshold` / `warnResetThreshold` per `InternalSub` at `subscribe()` time so `publish()`'s per-event hot path is one integer compare per subscriber instead of a multiply + compare. The `!warned` short-circuit still collapses the steady state to a single boolean read; this just shaves a multiply when the threshold check actually fires. - Document the back-of-queue ordering choice for the synthetic `slow_client_warning` frame in `EventBus.publish()`: front-push was considered but mid-stream front-insertion would mis-count `forcedInBuf` in `BoundedAsyncQueue.next()`, and `forcePush` already short-circuits via `resolvers.shift()` for the active-consumer case — the back-of-queue path only matters for stalled consumers, who can't drain regardless of warning position. - Reuse the existing `collect()` helper in the "default ring size 8000" test for consistency with the rest of the file; the new test also tightens the assertion by checking that the first retained event id is 2 (id=1 dropped by the ring) and the last is 8001. - Soften the "~500 B per session" magic number in `BridgeOptions.eventRingSize`'s JSDoc to a qualitative description (each retained `BridgeEvent` is a reference plus its serialized payload; ceiling scales as `ringSize × average-event-size`). Rejected: - Bot's claim that the error JSON contains `\`...\`` escape sequences — bot misread the JS template-literal source as the wire output; `JSON.stringify` does not escape backticks, and the existing `cwd` error messages use the same style. - Bot's "use `Record<string, never>` instead of `[key: string]: unknown`" suggestion on `DaemonSlowClientWarningData` — every other event-data type in `sdk-typescript/src/daemon/events.ts` carries the same index signature for additive-field compatibility. - Bot's "features list breaks alphabetical order" — the capability list is grouped by protocol lifecycle (health → capabilities → session lifecycle → events → permissions), not alphabetical. Tests: 139 focused tests across eventBus + httpAcpBridge + SDK daemon events — all passing. Behavior unchanged; this is hot-path micro-opt + comment polish only. 🤖 Generated with [Qwen Code](https://github.com/QwenLM/qwen-code)
|
Addressed the actionable items from the bot's review summary in ✅ Adopted
❌ Rejected
Verification: 139 focused tests (eventBus 19 / bridge 106 / SDK daemonEvents 14) all green. Behavior unchanged from |
Code Coverage Summary
CLI Package - Full Text ReportCore Package - Full Text ReportFor detailed HTML reports, please see the 'coverage-reports-22.x-ubuntu-latest' artifact from the main CI run. |
Address both P2 findings from the Codex review pass on PR #4237. **Bug 1: `BoundedAsyncQueue.forcedInBuf` position-invariant break** The previous `forcedInBuf` counter only tracked LIVE-vs-FORCED correctly when all forced entries lived at the FRONT of the buffer (subscribe-time `Last-Event-ID` replay). The new mid-stream `slow_client_warning` path force-pushes to the BACK of the queue while the queue is still open, which the existing accounting was not designed for: - publish 6 events at maxQueued=8 → 75% threshold trips → force-push warning at the back → buf=[1..6, warning], forcedInBuf=1. - consumer shifts `1` → forcedInBuf decremented to 0 (incorrect: `1` was a live frame, not the forced one). - consumer drains 2..6 + warning → buf=[], forcedInBuf=0, true live count = 0, but `size` getter and `push()` cap check then use `buf.length - forcedInBuf` which drifts over subsequent refills, causing premature warn / eviction before the cap is actually reached. Replace the position-dependent counter with a per-entry `{value, forced}` tag. `liveCount` is incremented in `push()` / decremented in `next()` only when the shifted entry was non-forced — position becomes irrelevant. `size` getter returns `liveCount` directly. The class doc comment is rewritten to call out that the new tag is the position-independent replacement for the old "forced frames must stay at the front" invariant. Regression test in `eventBus.test.ts` reproduces the codex trace (warn at 75%, drain past warning, refill to cap) and asserts no premature eviction. **Bug 2: SDK does not expose `?maxQueued`** `docs/users/qwen-serve.md` and `docs/developers/qwen-serve-protocol.md` both document `?maxQueued=N` as something SDK clients can request, but `SubscribeOptions` on `DaemonClient` only declared `lastEventId` + `signal`, and `subscribeEvents()` always fetched `/events` without a query string. Typed-SDK consumers had no way to opt in without hand-crafting URLs. - Add `SubscribeOptions.maxQueued?: number` with JSDoc noting the daemon range `[16, 2048]` and the pre-flight requirement on `caps.features.slow_client_warning`. - `DaemonClient.subscribeEvents` builds the URL with an optional `?maxQueued=<n>` segment. No client-side range validation — the daemon's `parseMaxQueuedQuery` is the source of truth and returns structured `400 invalid_max_queued`; duplicating the bounds in two layers would diverge on the next tweak. - `DaemonSessionSubscribeOptions extends SubscribeOptions` so the new field flows through `DaemonSessionClient` automatically. Three new SDK tests: - subscribeEvents appends `?maxQueued=N` when set - omits the query string when absent (existing behavior preserved) - propagates a `400 invalid_max_queued` unchanged Tests: 214 focused tests across eventBus / bridge / SDK DaemonClient / DaemonSessionClient / daemonEvents, plus 111 in the server suite. All green; the new eventBus regression case proves the position-invariant fix. 🤖 Generated with [Qwen Code](https://github.com/QwenLM/qwen-code)
Address 6 of 8 copilot-reviewer findings on PR #4237; the other 2 (#1 forcedInBuf live-size corruption, #5 SDK lacks maxQueued) were already fixed in bae42c8 — replied on the threads with the commit hash. - **[2] server.ts:1068** — `?maxQueued=` (present-but-empty) now fails closed with `400 invalid_max_queued` instead of silently falling back to the default queue cap. The API documents fail-closed for any malformed value before opening SSE, so an empty string is unambiguously malformed. New server.test.ts case locks this in. - **[3] commands/serve.ts:93** — CLI help text for `--event-ring-size` no longer mis-shapes `Last-Event-ID` as a query parameter. It is an HTTP header, and the daemon's SSE route does not parse a `?Last-Event-ID=` query. - **[4] docs/developers/qwen-serve-protocol.md:351** — clarify that `?maxQueued=N` controls the LIVE-event backlog cap. Replay frames are force-pushed and exempt from the cap; what consumes it is live events that arrive while the subscriber is still draining a cold-reconnect replay. Bumping for cold reconnects is still the right answer, but for the live tail, not for the replay frames themselves. - **[6] eventBus.ts:214** — stale `ringSize=4000` performance comment updated to the new `ringSize=8000` default with a note about the O(n) `shift()` cost scaling. - **[7] sdk-typescript events.ts:492** — `isSlowClientWarningData` now uses the existing `isFiniteNumber` helper instead of bare `typeof === 'number'`. Mirrors the sibling predicates and rejects `NaN` / `Infinity` payloads as schema garbage. New daemonEvents.test.ts assertions cover both. - **[8] server.ts:127** — `createServeApp`'s default-bridge construction now also forwards `opts.eventRingSize` to `createHttpAcpBridge`, symmetric with the `runQwenServe.ts` path. Direct embeds / tests that called `createServeApp` without supplying their own bridge but did pass `ServeOptions.eventRingSize` were silently getting the default 8000 ring. Tests: 326 focused tests across eventBus / bridge / SDK DaemonClient / DaemonSessionClient / daemonEvents / server. All green; the new server.test.ts case + the extended daemonEvents.test.ts assertions cover the tightened guards. 🤖 Generated with [Qwen Code](https://github.com/QwenLM/qwen-code)
wenshao
left a comment
There was a problem hiding this comment.
Three findings from review (0 Critical, 3 Suggestion). Full review report with low-confidence findings: see terminal output.
Note: the integration test at integration-tests/cli/qwen-serve-routes.test.ts is outside this PR's scope, but its expected capabilities array will need slow_client_warning added when it's next touched — otherwise the capability assertion will fail.
wenshao
left a comment
There was a problem hiding this comment.
Nit pass — five non-blocking suggestions. Overall this is a high-quality PR; the liveCount refactor in BoundedAsyncQueue is a thoughtful pre-emptive fix the previous forcedInBuf counter couldn't have survived once slow_client_warning started force-pushing mid-stream. Tests, validation posture, hysteresis design, capability gating, and SDK typing are all in good shape.
Six adopted findings from @wenshao's second review pass on PR #4237. The seventh ([10] forcedInBuf 3rd case invariant) was already fixed in bae42c8 — replied on that thread. - **[9] + [14] server.ts** — Sanitize attacker-controlled values before stderr interpolation in both `parseMaxQueuedQuery` and `parseLastEventId`. New `safeLogValue()` helper uses `JSON.stringify` to escape control characters (`\n`/`\r`/…) so a URL-encoded newline in `?maxQueued=%0a` can't inject extra log lines into journald/Loki/Splunk pipelines. Matches the `workspace_mismatch` sanitization style in `sendBridgeError`. Fixed in both helpers (the sibling pre-existing `parseLastEventId` had the same shape) so the file stays consistent. - **[11] httpAcpBridge.ts** — `!Number.isFinite(eventRingSize)` was redundant: `Number.isInteger(NaN)` and `Number.isInteger(Infinity)` both return `false`, so the sibling `!Number.isInteger` already catches both. Drop the dead guard. - **[12] httpAcpBridge.ts** — Add soft upper bound `MAX_EVENT_RING_SIZE = 1_000_000` on `eventRingSize` to catch operator typos (`--event-ring-size 80000000` vs `8000000`). At ~500 B per `BridgeEvent` an 1M-frame ring already pins ~500 MB per session — well past any realistic workload. Not a security boundary (operator-controlled flag), pure typo defense. Existing bridge construction test extended with an `80_000_000` case. - **[13] commands/serve.ts** — CLI `--event-ring-size` flag now sources its default from `DEFAULT_RING_SIZE` (imported from `serve/eventBus.js`) instead of the hardcoded literal `8000`. Without this, a future bump of the bus default would silently not take effect for daemons launched through the CLI because the flag always overrides — single source of truth fixes that. - **[15] eventBus.ts** — Drop unreachable `event.id ?? this.lastEventId` fallback in the `slow_client_warning` frame. `event` is locally constructed at the top of `publish()` with `id: this.nextId++` and is guaranteed defined. Use `event.id as number` directly + an inline note about the invariant. Tests: 197 (eventBus 20 / bridge 107 / SDK DaemonClient 57 / SDK daemonEvents 14) + 112 server. All green; the new upper-bound bridge case + the existing log assertions pin the changed behaviors. 🤖 Generated with [Qwen Code](https://github.com/QwenLM/qwen-code)
wenshao
left a comment
There was a problem hiding this comment.
— DeepSeek/deepseek-v4-pro via Qwen Code /review
| asKnownDaemonEvent({ | ||
| v: 1, | ||
| type: 'slow_client_warning', | ||
| data: { queueSize: 'lots', maxQueued: 256, lastEventId: 42 }, |
There was a problem hiding this comment.
[Suggestion] TS2345: the intentionally-wrong test value { queueSize: 'lots', ... } doesn't match DaemonEvent type, causing tsc --noEmit to fail. This test validates schema rejection of wrong types, which is correct behavior — just needs a TS escape hatch.
| data: { queueSize: 'lots', maxQueued: 256, lastEventId: 42 }, | |
| // @ts-expect-error: intentionally wrong type for schema validation test | |
| asKnownDaemonEvent({ | |
| v: 1, | |
| type: 'slow_client_warning', | |
| data: { queueSize: 'lots', maxQueued: 256, lastEventId: 42 }, | |
| }), |
— DeepSeek/deepseek-v4-pro via Qwen Code /review
| asKnownDaemonEvent({ | ||
| v: 1, | ||
| type: 'slow_client_warning', | ||
| data: { queueSize: 'lots', maxQueued: 256, lastEventId: 42 }, |
There was a problem hiding this comment.
[Suggestion] TS2345: the intentionally-wrong test value { queueSize: 'lots', ... } does not match DaemonEvent type, causing tsc --noEmit to fail. This test validates schema rejection of wrong types, which is correct behavior — just needs a TS escape hatch.
| data: { queueSize: 'lots', maxQueued: 256, lastEventId: 42 }, | |
| // @ts-expect-error: intentionally wrong type for schema validation test | |
| asKnownDaemonEvent({ | |
| v: 1, | |
| type: 'slow_client_warning', | |
| data: { queueSize: 'lots', maxQueued: 256, lastEventId: 42 }, | |
| }), |
— DeepSeek/deepseek-v4-pro via Qwen Code /review
Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
wenshao
left a comment
There was a problem hiding this comment.
[Suggestion] Docs still reference "default 4000 frames" in 5 places that should say 8000 now that DEFAULT_RING_SIZE changed:
docs/users/qwen-serve.mdlines 67, 207, 215, 233docs/developers/qwen-serve-protocol.mdline 232
Line 233's roadmap item 5 ("default 4000 covers short drops; mobile / chatty-turn workloads need 8000+") is now factually incorrect since the default IS 8000 — consider removing or marking as done.
— glm-5.1 via Qwen Code /review
| lastEventId: event.id ?? this.lastEventId, | ||
| }, | ||
| }; | ||
| sub.queue.forcePush(warningFrame); |
There was a problem hiding this comment.
[Suggestion] The slow_client_warning and client_evicted force-push paths produce no daemon-side stderr output. Other lifecycle events (permission timeout, maxQueued rejection) log via writeStderrLine.
During a 3 AM investigation of SSE client lag or disconnections, operators cannot correlate backpressure events with daemon memory/CPU spikes from logs alone — they'd need client-side access or a network tap.
Consider adding a rate-limited writeStderrLine here (gated by the existing !sub.warned flag so it fires once per overflow episode).
— glm-5.1 via Qwen Code /review
| lastEventId: event.id ?? this.lastEventId, | ||
| }, | ||
| }; | ||
| sub.queue.forcePush(warningFrame); |
There was a problem hiding this comment.
[Suggestion] The slow_client_warning and client_evicted force-push paths produce no daemon-side stderr output. Other lifecycle events (permission timeout, maxQueued rejection) log via writeStderrLine.
During a 3 AM investigation of SSE client lag or disconnections, operators cannot correlate backpressure events with daemon memory/CPU spikes from logs alone — they'd need client-side access or a network tap.
Consider adding a rate-limited writeStderrLine here (gated by the existing !sub.warned flag so it fires once per overflow episode).
— glm-5.1 via Qwen Code /review
验证报告 (HEAD
|
| # | 意见 | 落地点 | 验证手段 |
|---|---|---|---|
| 1 | ?maxQueued 拒绝路径 stderr 日志注入 |
safeLogValue() @ server.ts:1125,用 JSON.stringify(String(raw)).slice(0, 82) |
curl '?maxQueued=%0aqwen+serve%3a+FORGED+LOG+LINE' 实测 stderr 写出 rejected ?maxQueued "\nqwen serve: FORGED LOG LINE" (not a decimal integer) —— 换行被转义成可见 \n,伪造行没逃逸 |
| 2 | sibling parseLastEventId 同隐患 |
server.ts:1141, 1153 走同一 helper |
实测 rejected Last-Event-ID "1.5e10z" 同样加引号转义 |
| 3 | forcePush 把 slow_client_warning 插队尾会让 forcedInBuf 计数漂移 |
forcedInBuf 整删,改成 per-entry forced: boolean + 独立 liveCount (eventBus.ts:472-490, 567) |
回归测试 warn-at-back forced frame does NOT skew the live cap for subsequent publishes (codex P2) 锁定 |
| 4 | !Number.isFinite(eventRingSize) 多余守卫 |
删;注释里写明 Number.isInteger 已 cover NaN/Infinity (httpAcpBridge.ts:1235-1240) |
tsc 干净 |
| 5 | eventRingSize 无上限 |
MAX_EVENT_RING_SIZE = 1_000_000 typo 防御 |
boot 时 --event-ring-size 80000000 直接 fail-CLOSED 退出 1,stderr: Invalid eventRingSize: 80000000. Must be a positive integer in [1, 1000000]. |
| 6 | DEFAULT_RING_SIZE 重复硬编码 |
commands/serve.ts:14 import { DEFAULT_RING_SIZE } from '../serve/eventBus.js',yargs default 直接用 |
单一真相源 |
| 7 | event.id ?? this.lastEventId 死代码 |
改 event.id as number + 注释说明 event 由 publish() 顶部 id: this.nextId++ 构造 |
tsc 干净 |
本地测试
5 个 focused spec:
$ npx vitest run \
packages/cli/src/serve/eventBus.test.ts \
packages/cli/src/serve/httpAcpBridge.test.ts \
packages/sdk-typescript/test/unit/daemonEvents.test.ts \
packages/sdk-typescript/test/unit/DaemonClient.test.ts \
packages/sdk-typescript/test/unit/DaemonSessionClient.test.ts
Test Files 5 passed (5)
Tests 214 passed (214)
server.test.ts (cli 工作区跑):
$ cd packages/cli && npx vitest run src/serve/server.test.ts
Test Files 1 passed (1)
Tests 112 passed (112)
合计 326 passed。npx tsc --build packages/cli 也 0 error(PR 描述里提到的 acp-integration/ 那批 @google/genai-missing 是预存的、跟本 PR 无关)。
真实 HTTP smoke (node packages/cli/dist/index.js serve ...)
/capabilities.features 同时包含 slow_client_warning 和 client_identity(PR 描述里补登 client_identity 那条也兑现了)。
?maxQueued 校验顺序(验证 fail-CLOSED 早于 session 查找):
| 请求 | 期望 | 实测 |
|---|---|---|
?maxQueued= (present-but-empty) |
400 invalid_max_queued |
✅ {"error":"\maxQueued` must be a decimal integer","code":"invalid_max_queued"}` |
?maxQueued=abc |
400 | ✅ |
?maxQueued=15 |
400 outside range | ✅ {"error":"\maxQueued` must be in [16, 2048]","code":"invalid_max_queued"}` |
?maxQueued=2049 |
400 outside range | ✅ |
?maxQueued=16 (下界) |
通过校验 → 404 session | ✅ {"error":"No session with id \"nonexistent\""...} |
?maxQueued=256 |
通过校验 → 404 session | ✅ |
?maxQueued=2048 (上界) |
通过校验 → 404 session | ✅ |
日志注入对抗:
$ curl '?maxQueued=%0aqwen+serve%3a+FORGED+LOG+LINE' → HTTP 400
# stderr:
qwen serve: rejected ?maxQueued "\nqwen serve: FORGED LOG LINE" (not a decimal integer)
$ curl '?maxQueued=%0d%0aFORGED2' → HTTP 400
# stderr:
qwen serve: rejected ?maxQueued "\r\nFORGED2" (not a decimal integer)
两个攻击向量都被 JSON.stringify 包成单行可见转义,line-oriented log shipper 不会把伪造串当独立条目。
Boot-time --event-ring-size 校验:
| 值 | 期望 | 实测 |
|---|---|---|
0 |
fail-CLOSED | exit 1, Invalid eventRingSize: 0. Must be a positive integer in [1, 1000000]. |
-1 |
fail-CLOSED | exit 1 |
1.5 |
fail-CLOSED (Number.isInteger) |
exit 1 |
80000000 (typo defense) |
fail-CLOSED (MAX_EVENT_RING_SIZE) |
exit 1 |
1000000 (=上限) |
accepted, listening | ✅ |
一条 doc nit (非阻塞)
eventBus.ts:269-280 那段解释 "为啥把 warning push 到 queue 尾部而不是头部" 的注释里,论据 (a) 还在引用已被删除的 forcedInBuf 不变量:
the forward-position invariant in
BoundedAsyncQueue.next()'sforcedInBufaccounting is sized for "replay at front, live at back"
实际上代码现在用 per-entry forced 标签 + 独立 liveCount,是位置无关的——从头部插入也能正确计数。论据 (b) 关于 resolvers.shift() 快路径仍然成立。同文件 462-470 已经有最新的设计说明,不会误导后来人,可以 merge 后顺手清掉。
结论
7 条 round-2 意见全部 honest fix(不是 paper-over),关键回归(queue 计数 + 日志注入)都有测试锁住,真实 daemon 行为与文档一致。可以 approve。
…ventBus changes (#4245) - qwen-serve-routes.test.ts: expand expected features list to 24, adding slow_client_warning (#4237) and workspace_mcp/workspace_skills/ workspace_providers/session_context/session_supported_commands (#4241). Matches EXPECTED_STAGE1_FEATURES in server.test.ts:76-101. - qwen-serve-baseline.test.ts: update SSE backpressure assertion from 3 to 4 frames (tick, tick, slow_client_warning, client_evicted). PR #4237 changed EventBus to force-push a slow_client_warning synthetic frame when the per-subscriber queue reaches the 75% warn threshold, before the client_evicted terminal frame fires on overflow. Mirrors the unit test at eventBus.test.ts:103-122. Both integration mirrors drifted because integration tests only run on schedule / workflow_dispatch (release.yml:4-9), not PR CI. Fixes the release run 25992130532 failure in both Docker and No-Sandbox jobs. 🤖 Generated with [Qwen Code](https://github.com/QwenLM/qwen-code)
Summary
#4175 Wave 2.5 PR 10. Closes the SSE replay / backpressure knobs called out in #3803 §02 so chatty Stage 1 sessions get an honest reconnect window and operators get a heads-up signal before clients are summarily evicted.
DEFAULT_RING_SIZE4000 → 8000. Per-session replay ring depth now matches the Daemon mode (qwen serve): proposal & open decisions #3803 §02 target for chatty sessions.--event-ring-size <n>CLI flag (default 8000) lets operators tune the ring per daemon. ThreadedServeOptions→BridgeOptions.eventRingSize→ bothnew EventBus()construction sites (fresh sessions + restore path). Validation is fail-CLOSED (positive finite integer;0/NaN/ negative throw at boot).slow_client_warningSSE frame. When a subscriber's queue crosses 75% full the bus force-pushes a syntheticslow_client_warningto that subscriber once per overflow episode, carrying{queueSize, maxQueued, lastEventId}. The flag re-arms after the queue drains below 37.5% (hysteresis, no flap near threshold). If the queue actually overflows after the warning, the existingclient_evictedterminal frame path still fires. Likeclient_evicted, the warning has noid(synthetic frame; must not burn a sequence slot for other subscribers).?maxQueued=Nquery param onGET /session/:id/events(range[16, 2048], default 256). Lets cold reconnect clients pre-size their per-subscriber backlog so a largeLast-Event-ID: 0replay doesn't trip the warning on the first publish. Out-of-range / non-decimal returns400 invalid_max_queuedbefore opening the SSE stream — clean 4xx beats half-opening + emitting astream_errorthatEventSourcewould auto-reconnect on.slow_client_warningcapability tag — single source of truth for the warning frame +?maxQueuedquery param + ring-size knob. Pre-flight viacaps.featuresbefore opting in.@qwen-code/sdk): typedDaemonSlowClientWarningEvent(added to known event union andDaemonStreamLifecycleEvent); schema-validated by a newisSlowClientWarningDatapredicate; reducer (reduceDaemonSessionEvent) incrementsslowClientWarningCount+ storeslastSlowClientWarning. Warning is non-terminal —alivestays true (onlyclient_evicted/stream_error/session_diedclose the stream). Re-exported from the public SDK entry.qwen-serve-protocol.mdadds the new feature tag and the previously-missingclient_identity(post-feat(serve): add daemon-stamped client identity #4231) to the features list, documents the?maxQueuedquery param, adds the warning frame to the event table, and notes the new default ring size.qwen-serve.mdadds the--event-ring-sizerow.Validation
npx vitest run packages/cli/src/serve/eventBus.test.ts \ packages/cli/src/serve/httpAcpBridge.test.ts \ packages/sdk-typescript/test/unit/daemonEvents.test.ts \ packages/sdk-typescript/test/unit/DaemonClient.test.ts \ packages/sdk-typescript/test/unit/DaemonSessionClient.test.ts (cd packages/cli && npx vitest run src/serve/server.test.ts) npx tsc --build packages/cliserve/eventBus*,serve/httpAcpBridge*,serve/server*,serve/capabilities,serve/types,serve/runQwenServe,commands/serve,sdk-typescript/src/daemon/events,sdk-typescript/src/daemon/index,sdk-typescript/src/index).tsc --build packages/clireports zero errors in any file I touched (only pre-existing@google/genai-missing errors inacp-integration/show up locally).packages/cli/src/serve/eventBus.tsfor theWARN_THRESHOLD_RATIO+ hysteresis logic andpackages/cli/src/serve/server.tsfor the newparseMaxQueuedQuery.Scope / Risk
EventBus.publish()so it's on the per-event hot path. The check is one O(1) ratio compare guarded by!sub.warnedshort-circuit, which collapses to a single boolean read on the steady state. Hysteresis is symmetric (warn at 75%, reset at 37.5%) so no flap near threshold.?maxQueuedsee no behavior change (default queue cap stays 256).slow_client_warningignore unknown event types —parseTypedDaemonEventalready falls through unknown types and the SDK'sisKnownDaemonEventreturnsundefinedfor them.--event-ring-size 4000to restore prior behavior.qwen --acpchild under sustained 100 % queue fill (relies on agent-side throughput characteristics that vary by model). The unit tests cover the synthetic warning + eviction emit; production behavior is the same code path.Testing Matrix
Validated locally on macOS via the focused vitest commands above. Windows / Linux paths not run locally — CI covers.
Linked Issues / Bugs
Refs #4175 (Wave 2.5 PR 10), #3803 (§02 SSE backpressure target)
🤖 Generated with Qwen Code