feat(serve): ACP WebSocket transport (RFD Streamable HTTP phase 2)#4773
Conversation
📋 Review SummaryThis PR implements the ACP Streamable HTTP transport (RFD #721) with SSE-based event streaming, providing a solid foundation for the eventual WebSocket transport mentioned in the title. The implementation is well-architected with clear separation between transport concerns ( 🔍 General Feedback
🎯 Specific Feedback🟡 High
🟢 Medium
🔵 Low
✅ Highlights
|
wenshao
left a comment
There was a problem hiding this comment.
Stacked PR Findings (not in diff — from dependency PRs #4736/#4737)
dispatch.ts:
- [Critical]
sessions/delete(line 1662) accepts anysessionIdsarray without checkingconn.ownsSession(). Compare tosession/close(line 734) which callsrequireOwned(). Any authenticated connection can delete other connections' sessions. - [Critical]
workspace/agents/createandagents/update(lines 1729, 1794) skip input validations enforced by the REST layer:toolselement types, max entries (256), max per-entry length (256 chars),description/systemPromptmax size (256KB), non-empty checks. Also,agents/updateandagents/deletedon't callassertMutableLevel()— builtin/extension/session-level agents can be modified via ACP while REST blocks with 403. - [Critical] 11 tsc errors: dispatcher calls non-existent
HttpAcpBridgemethods (generateSessionRecap,generateSessionBtw,executeShellCommand, etc.). PR does not compile. - [Suggestion]
CONN_ROUTED_METHODS(line 86) andbuildInitializeResultmethods (line 418) maintained separately with no compiler enforcement. Define once and derive both. - [Suggestion]
toRpcError(line 199) forwardsSubagentError.messageverbatim — leaks absolute filesystem paths forFILE_ERRORsubcode. REST API gates behindisServeDebugMode().
server.ts: 25 tsc errors (property mismatches on BridgeSessionSummary, 'err' is of type 'unknown' x19, index signature access).
runQwenServe.ts: 4 tsc errors (missing module @qwen-code/acp-bridge/spawnChannel, implicit any parameters x3).
transport.test.ts: 10 of 56 tests fail — 2 expect status 400 but implementation returns 404 (per RFD); 8 session-extension tests read the wrong SSE frame (connection-level stream receives session/new reply before extension method reply).
Deterministic analysis: tsc = 40 errors total, eslint = 0. Coverage: WsStream (101 lines) and WS upgrade handler (~210 lines) have zero test coverage.
— qwen3.7-max via Qwen Code /review
205c521 to
7fac21b
Compare
0bff239 to
8f02791
Compare
7fac21b to
884e02d
Compare
001e00e to
cb76dd9
Compare
wenshao
left a comment
There was a problem hiding this comment.
[Critical] closeSessionStream (line 270) calls binding.stream?.close() unconditionally. After the type widening to TransportStream, WS transport shares the same WsStream instance between conn.connStream and binding.stream (set via conn.attachSessionStream(sid, conn.connStream!, ac) at index.ts:528). Closing any single session tears down the entire WS connection and all other sessions on it.
(Posting as body-level comment because closeSessionStream is on a line not touched by this PR's diff, but the bug is caused by the type widening in this PR.)
— qwen3.7-max via Qwen Code /review
cb76dd9 to
1e442b7
Compare
wenshao
left a comment
There was a problem hiding this comment.
tsc: 12 compilation errors in dispatch.ts (missing BTW_MAX_INPUT_LENGTH import, 9 missing methods on HttpAcpBridge, 2 type mismatches). The build does not compile.
Test coverage: zero tests for new code paths — WsStream adapter (83 lines, parallel SseStream has 8 test scenarios), toRpcError 7 new error-type branches, sessions/delete batch orchestration.
Needs human review: device_flow/get returns full state to non-initiators (REST redacts), session/shell passes no AbortSignal (commands run 120s after disconnect), tools array skips element type validation.
— qwen3.7-max via Qwen Code /review
6f450e1 to
9a61356
Compare
chiga0
left a comment
There was a problem hiding this comment.
Independent Review at HEAD 9a61356e
PR: feat(serve): ACP WebSocket transport (RFD Streamable HTTP phase 2) — +1345/-21, 9 files, 3 commits
This review validates wenshao's 2 rounds of CHANGES_REQUESTED (15 Critical + 16 Suggestions) and adds 1 additional Critical finding not previously raised.
Additional Finding (not in wenshao's reviews)
| # | Severity | File | Line | Issue |
|---|---|---|---|---|
| NEW-1 | Critical | dispatch.ts |
1057-1061 | session/shell handler references undefined cmd variable. const rawCmd = params['command'] at line 1043, but cmd.slice(0, 120) at line 1057 and cmd at line 1061 — cmd is never declared. This is a ReferenceError at runtime for every shell command. Fix: rename rawCmd → cmd or add const cmd = rawCmd.trim(). |
Verified: grep -n 'cmd\b' dispatch.ts shows cmd used at lines 1057/1061 but never declared. rawCmd declared at line 1043, used only in the validation check.
Validated wenshao Critical findings (independently confirmed)
Security (must-fix before merge)
| # | Finding | Verification |
|---|---|---|
| W-C2 | CSRF via WS upgrade — no Origin check | Confirmed: upgrade handler at index.ts:324 bypasses denyBrowserOriginCors/hostAllowlist/bearerAuth. Browsers send Origin on WS upgrades per RFC 6454. Loopback binding alone is NOT sufficient — malicious webpages can open ws://localhost:<port>/acp. Fix: reject non-loopback origins OR check Origin header against loopback. |
| W-C26 | device_flow/get returns full view without redaction |
Confirmed: dispatch.ts:1468 returns raw DeviceFlowPublicView including userCode/verificationUri/verificationUriComplete to non-initiator callers. REST counterpart (toDeviceFlowStateBody) strips these. Enables cross-client flow monitoring. |
| W-C27 | agents/create lacks BuiltinAgentRegistry shadow check |
Confirmed: dispatch.ts:1713 calls createSubagent without isBuiltinAgent(). ACP client can register "general-purpose" with malicious systemPrompt, shadowing builtins globally. deleteSubagent calls isBuiltinAgent → throws, making backdoor undeletable. |
Reliability (must-fix before merge)
| # | Finding | Verification |
|---|---|---|
| W-C1/C10 | closeSessionStream closes entire WS connection |
Confirmed: teardownBinding at connectionRegistry.ts:288 calls binding.stream?.close(). For WS, binding.stream === conn.connStream (same WsStream). Fix: add if (binding.stream !== this.connStream) guard before close, matching attachSessionStream (line 261). |
| W-C4 | Unserialized ws.on('message', async ...) handlers |
Confirmed: Concurrent async callbacks per frame. Fast clients can interleave session/new and session/prompt. Fix: serialize with async queue or sequential processing. |
| W-C7 | Heartbeat ping without pong listener | Confirmed: setInterval sends ws.ping() every 15s, onHeartbeat calls conn.touch(), but no ws.on('pong') listener. Dead connections undetected + immune to idle sweeper. Fix: track alive flag, set on pong, check on ping interval. |
| W-C8 | writeChain unbounded pending writes |
Confirmed: Slow WS readers accumulate unbounded Promise + JSON string chain. Fix: cap at N pending writes, reject when full. |
| W-C14 | Lazy session attach not atomic | Confirmed: !binding.stream check at index.ts:495 is racy under concurrent handlers (W-C4). Fix: synchronous guard flag binding._pumpStarting. |
| W-C19 | send() returns raw promise, not chained |
Confirmed: return next at wsStream.ts:50 bypasses .catch handler. Fire-and-forget callers leave unhandled rejections → Node 22 --unhandled-rejections=throw crashes daemon. Fix: return this.writeChain. |
| W-C28 | sessions/delete orphaned registry state |
Confirmed: After bridge.closeSession() + svc.removeSessions(), no conn.closeSessionStream(), ownedSessions.delete(), or closingSessions.add() for affected sessions. Registry retains zombie entries. |
Protocol/Correctness
| # | Finding | Verification |
|---|---|---|
| W-C5 | Initialize response fire-and-forget | Confirmed: ws.send(JSON.stringify(...)) without callback at index.ts:449. initialized = true set unconditionally. If send fails → zombie connection occupying maxConnections slot. |
| W-C6 | No timeout for uninitialized connections | Confirmed: After upgrade, server waits indefinitely for initialize. Never enters registry → maxConnections doesn't apply. Fix: setTimeout to close if not initialized within 30s. |
| W-C12 | Non-matching upgrade path leaks FD | Confirmed: if (url.pathname !== path) return at index.ts:330 doesn't destroy socket. Node doesn't auto-destroy upgrade sockets. Fix: socket.destroy() before return. |
| W-C25 | MCP server name validation incomplete | Confirmed: dispatch.ts:1564 only checks `!name |
| W-C22 | configOptions removed from capabilities |
Confirmed: buildInitializeResult dropped configOptions: true, breaking existing SSE clients that feature-detect session/set_config_option. |
Prioritized fix plan
P0 — Security (blocks merge):
- W-C2: Add Origin header check on WS upgrade (CSRF)
- W-C26: Redact device flow verification data for non-initiators
- W-C27: Add
BuiltinAgentRegistry.isBuiltinAgent()shadow check toagents/create
P1 — Reliability (blocks merge):
4. NEW-1: Fix cmd → rawCmd in session/shell handler
5. W-C1/C10: Add connStream identity guard in closeSessionStream
6. W-C7: Add pong listener to WsStream heartbeat
7. W-C19: send() should return this.writeChain not raw next
8. W-C4 + W-C14: Serialize WS message handlers (fixes both concurrent handler and lazy attach race)
9. W-C8: Add pending-write cap to WsStream.writeChain
10. W-C28: Clean up registry state in sessions/delete
P2 — Protocol (should-fix):
11. W-C5/W-C6: Initialize response error callback + uninitialized connection timeout
12. W-C12: socket.destroy() on non-matching upgrade path
13. W-C25: Use validateMcpRuntimeServerName for MCP server names
14. W-C22: Restore configOptions: true or document removal
P3 — Quality (follow-up OK):
15. Suggestions: WeakMap, dispose listener cleanup, loopback DRY, SubagentError codes, agent name trimming, memory constant sharing, etc.
Total: 16 Critical (15 wenshao + 1 new) + 16 Suggestions. Recommend addressing P0+P1 before next review round.
wenshao
left a comment
There was a problem hiding this comment.
[Critical] sessions/delete (line ~1638) 接受最多 100 个 session ID 并通过 closeSession + removeSessions 删除,但从未调用 requireOwned() 过滤。对比 session/close(line 679)在操作前检查 this.requireOwned(conn, sessionId, id)。任何已认证客户端可批量删除其他客户端的 session——包括磁盘上的持久化数据。建议在 Promise.allSettled 前添加:const ids = [...new Set(sessionIds)].filter(sid => conn.ownsSession(sid));
— qwen3.7-max via Qwen Code /review
7193979 to
1486da7
Compare
wenshao
left a comment
There was a problem hiding this comment.
Build: 2 tsc errors (dispatch.ts:1747 TS2345 missing level in SubagentConfig — already reported at this line; server.ts:2889 TS4111 dot-access on index signature — already reported at this line).
Tests: 11 failures — 3 capability registry drift (SERVE_CAPABILITY_REGISTRY gained workspace_agent_generate, session_stats, workspace_mcp_manage but EXPECTED_STAGE1_FEATURES in server.test.ts was not updated); 8 runQwenServe afterEach hook timeouts (10s limit exceeded by accumulated cleanup steps).
dispatch.ts:1623 — sessions/delete allows empty-string sessionId: validation checks typeof s === 'string' but not s.length > 0. An array like ["", "abc"] passes. Add && s.length > 0 to the .every() predicate.
Test coverage: zero tests added for ~1,400 lines of new code. Critical untested paths: WS upgrade handler (auth + protocol handshake), WsStream adapter (writeChain serialization, close idempotency, heartbeat), session/shell (command execution security boundary), sessions/delete (concurrent aggregation), POST /acp content-type/batch guards (415/501). Existing transport.test.ts:359 may be broken by the 400→404 status code split.
— qwen3.7-max via Qwen Code /review
|
All 52 automated review comments are against outdated code (pre-rebase). The WebSocket implementation has been completely rewritten on top of the latest
All threads resolved as outdated. |
wenshao
left a comment
There was a problem hiding this comment.
tsc: 2 compilation errors (same as R5 — dispatch.ts:1747 TS2345 missing level in SubagentConfig; server.ts:2889 TS4111 dot-access on index signature).
Tests: transport.test.ts — 2 tests fail because the PR changed unknown-connection responses from 400→404 (index.ts:174,231) but the test assertions at lines 366 and 1451 still expect 400. Update tests to expect 404.
Most findings from R5 (53 inline comments) remain unresolved at this same commit. Two new findings below.
— qwen3.7-max via Qwen Code /review
1486da7 to
e38d978
Compare
|
[Critical]
if (binding.stream && binding.stream !== this.connStream) {
binding.stream.close();
}But if (prevStream && prevStream !== stream) prevStream.close();In WS mode, Fix: if (prevStream && prevStream !== stream && prevStream !== this.connStream) {
prevStream.close();
}Confirmed by wenshao (review 4474984014) and independently verified. Re-review overview: review 4475067415. This review was generated by QoderWork AI |
|
Re: review 4475067415 + attachSessionStream finding (4678705791) The one remaining Critical identified in the re-review ( // connectionRegistry.ts:260
if (prevStream && prevStream !== stream && prevStream !== this.connStream) {
prevStream.close();
}Also fixed in the same commit: All items from the re-review are now resolved at PR HEAD ( |
…ype cleanup - dispose() now removes the 'upgrade' listener from httpServer, preventing TypeError crash on late-arriving WS upgrades - Refactor middleware to delegate to tryConsume(), eliminating duplicated token-bucket logic - Use exported AcpHttpHandle type instead of inline type shapes in runQwenServe.ts Generated with AI Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
qwen-code-ci-bot
left a comment
There was a problem hiding this comment.
Code Review Summary
Automated review of the ACP WebSocket transport addition. Found 3 Critical and 2 Suggestion findings after verification.
Pre-existing issues excluded: Two SSE permission tests (permission request round-trips at line 473, double-failure permission vote at line 1373) fail on both this branch and the base daemon_mode_b_main — not introduced by this PR.
Prior review findings addressed: Security checks (origin/host/auth), maxPayload, and stream identity guards from the previous review round have all been fixed in subsequent commits.
Critical
- Typecheck error (
index.ts:469) —socket.remoteAddressaccessed onDuplextype. Build is broken. - Message queue deadlock (
index.ts:479) —messageQueueserializes permission votes behind the prompt that awaits them. - Unsafe non-null assertion (
index.ts:624) —conn.connStream!without guard crashes if WS drops between init and session message.
Suggestions
- Duplicate error listener (
index.ts:471) —ws.on('error')here + WsStream constructor = double logging post-init. - Misleading test name (
transport.test.ts:2075) — "rejects" test asserts acceptance (code 101).
— qwen3.7-max via Qwen Code /review
DragonnZhang
left a comment
There was a problem hiding this comment.
All previously reported Critical findings have been addressed in this revision:
- Shared stream close guard:
teardownBindingnow skips closingbinding.streamwhen it equalsconn.connStream, preventing WS connection teardown on session close. - WS upgrade security: Host allowlist, CSRF origin validation, and bearer token auth (with timing-safe comparison) are all enforced before
handleUpgrade. - maxPayload:
WebSocketServernow setsmaxPayload: 10 * 1024 * 1024(10 MB), matching the REST body limit. - Message serialization: Inbound WS messages are serialized via a
messageQueuepromise chain, preventing race conditions between concurrent frames. - Initialize timeout: A 30-second timer closes WS connections that never send
initialize. - dispose() cleanup: The upgrade listener is properly removed on dispose.
- Heartbeat/pong: Dead connection detection via ping/pong with
aliveflag inWsStream.
The TransportStream abstraction is clean, the rate limiter refactor correctly extracts tryConsume for reuse by both Express middleware and WS, and the test coverage is thorough (unit + integration). LGTM.
— qwen3-235b-a22b via Qwen Code /review
ytahdn
left a comment
There was a problem hiding this comment.
Incremental Review (commits b016ba2 → 2fb0b60)
Good progress on addressing the previous review findings. Here's the status:
✅ Fixed
- WS message serialization —
messageQueueper-connection promise chain correctly serializes message processing.handleWsMessageis async and properly chained. ✅ - Rate limiter integration —
tryConsumeextracted and exposed viacheckRate, threaded throughMountAcpHttpOptions. ✅ - Token hashing — SHA-256 pre-hash eliminates byte-length side channel. ✅
- acpHandle.dispose() — Called in shutdown sequence; upgrade listener properly removed. ✅
- connStream guard — Both
attachSessionStreamandteardownBindingnow guard against closing shared WS connStream. ✅ - cleanupSession race — Abort controller identity check (
b?.abort === myAbort) prevents stale cleanup from closing new pumps. ✅ - WS security tests — 10 new tests covering origin check, token auth, maxPayload, protocol state, serialization, and rate limiting. ✅
Remaining from previous review
initTimernot cleared on early disconnect (low confidence — minor timer leak)- No test for connection cap (1013), initTimer timeout, or lazy session stream attachment
New finding
One minor regression in the rateLimit.ts refactor — see inline comment.
— qwen3.7-max via Qwen Code /review
ytahdn
left a comment
There was a problem hiding this comment.
All Critical findings from the initial review have been addressed. LGTM! ✅ — qwen3.7-max via Qwen Code /review
…m guard - Prompt dispatch no longer blocks the message queue, preventing deadlock when a permission vote is queued behind an in-flight prompt - Rate-limit tiers use explicit read-method allowlist instead of prefix match, so session/new|close|cancel are correctly 'mutation' - wsKey uses proper Duplex cast + ::ffff: normalization for IP parity - connStream non-null assertion replaced with isClosed guard - tryConsume fires onError callback on bucket overflow - Test name corrected (accepts → not rejects) Generated with AI Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
78581e0 to
bb051e2
Compare
- connectionRegistry.ts: wrap each teardownBinding() call in try/catch during destroy() so one failing callback cannot leak the remaining sessions' resources (AbortControllers, streams, pending requests) - index.ts: add rate-limit enforcement for ACP HTTP POST path (POST /acp was exempt from Express middleware but had no alternative checkRate call, unlike the WS handler) Generated with AI Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
DragonnZhang
left a comment
There was a problem hiding this comment.
Re-reviewed at bb051e28 (11 commits since last review). No high-confidence issues found.
Summary of changes reviewed:
- WebSocket transport (WsStream) with transport-agnostic
TransportStreaminterface - WS upgrade handler with CSWSH/DNS-rebinding/bearer-auth security
- Message serialization via promise queue (prevents concurrent handler races)
- Prompt fire-and-forget to avoid deadlock on permission votes
- Shared connStream guard in
attachSessionStream/teardownBinding(WS uses one socket for all sessions) - Rate limiter refactored to expose
checkRate()for WS (DRY viatryConsume) - Token pre-hash (SHA-256) before
timingSafeEqualfor constant-time comparison - Explicit read-method allowlist for rate-limit tier classification
dispose()properly removes upgrade listener from httpServer- 17 WsStream unit tests + 10 WS security integration tests
APPROVE.
- upgradeListener: use correct function signature, remove `as any` - connRef: type as `AcpConnection | undefined` instead of `any` - SHA-256 token hash: pre-compute once at setupWebSocket instead of per-upgrade, reuse `expectedTokenHash` for all comparisons - Add `ws` + `@types/ws` to cli package.json dependencies (was only hoisted from plugin-example) - Log WS initialize timeout with source address for diagnostics Generated with AI Co-authored-by: Qwen-Coder <qwen-coder@alibabacloud.com>
chiga0
left a comment
There was a problem hiding this comment.
Re-Review at HEAD c6ded92 (8 new commits since fd6d3027)
New Commits
| # | Commit | Description |
|---|---|---|
| 1 | 41516e02 |
TS4111 bracket notation + IPv6 bracket strip + empty Host guard |
| 2 | b016ba26 |
SHA-256 token hashing, message serialization queue, WS rate limiting |
| 3 | 9139f286 |
10 WS security integration tests + ws error handler |
| 4 | e0742894 |
attachSessionStream shared connStream guard + abort identity guard |
| 5 | 2fb0b600 |
upgrade listener cleanup on dispose + DRY rate limiter + typed connRef |
| 6 | bb051e28 |
WS prompt fire-and-forget, explicit tier sets, IP normalization |
| 7 | 27105e59 |
Exception-safe destroy() + ACP HTTP rate limiting |
| 8 | c6ded929 |
Type safety (no any), token pre-hash, ws dependency, init timeout log |
Reviewer Status at HEAD
| Reviewer | Last Review | Status |
|---|---|---|
| wenshao | e0742894 |
CHANGES_REQUESTED (4474984014) |
| ytahdn | bb051e28 |
DISMISSED (4475998173) — "All Critical findings addressed" |
| DragonnZhang | bb051e28 |
DISMISSED (4476321966) — "No high-confidence issues found" |
| ci-bot | c6ded929 |
CHANGES_REQUESTED (4476035606) |
| doudouOUC | c6ded929 |
CHANGES_REQUESTED (4476337860) |
Cross-Validation Table (Critical findings)
| # | Reviewer | Finding | Status at HEAD |
|---|---|---|---|
| 1 | wenshao/DZ/ytahdn | TS4111 app.locals.acpHandle |
FIXED (commit 1: bracket notation) |
| 2 | DZ/wenshao | Bearer token timing side-channel | FIXED (commit 2: SHA-256 + timingSafeEqual, commit 8: pre-hash) |
| 3 | ci-bot/DZ | IPv6 bracket mismatch in origin check | FIXED (commit 1: `.replace(/^[ |
| 4 | ci-bot | Empty Host short-circuits allowlist | FIXED (commit 1: if (!allowed.has(host)) without host &&) |
| 5 | ytahdn | ws.on('message', async) not serialized |
FIXED (commit 2: messageQueue promise chain) |
| 6 | ytahdn | WS messages bypass rate limiter | FIXED (commit 2: opts.checkRate, commit 7: HTTP parity) |
| 7 | ci-bot/doudouOUC | Rate-limit tier misclassifies session mutations | FIXED (commit 6: explicit WS_READ_METHODS set + WS_EXEMPT_METHODS) |
| 8 | ci-bot/doudouOUC | WS rate-limit key raw remoteAddress |
FIXED (commit 6: ::ffff: prefix strip) |
| 9 | ci-bot | Prompt blocks message queue → deadlock | FIXED (commit 6: prompt fire-and-forget) |
| 10 | wenshao | attachSessionStream shared connStream guard |
FIXED (commit 4: prevStream !== this.connStream + abort identity) |
| 11 | wenshao/ci-bot | conn.connStream! non-null assertion |
FIXED (commit 6: conn.connStream && !conn.connStream.isClosed) |
| 12 | ci-bot/doudouOUC | dispose() doesn't remove upgrade listener |
FIXED (commit 5: upgradeServer.removeListener) |
| 13 | ci-bot | connRef: any type |
FIXED (commit 8: `AcpConnection |
| 14 | doudouOUC | upgradeListener type mismatch |
FIXED (commit 8: proper function signature) |
| 15 | doudouOUC | socket.remoteAddress TS2339 on Duplex |
FIXED (commit 6: proper cast pattern) |
| 16 | doudouOUC | ws not in package.json dependencies |
FIXED (commit 8: "ws": "^8.18.0" + @types/ws) |
| 17 | ci-bot/doudouOUC | tryConsume drops onError callback |
FIXED (commit 6: config.onError?.(...) on overflow) |
| 18 | ci-bot | ACP HTTP POST not rate-limited | FIXED (commit 7: HTTP checkRate path) |
| 19 | ci-bot | destroy() not exception-safe |
FIXED (commit 7: try-catch per teardownBinding) |
| 20 | doudouOUC | Init timeout lacks server-side log | FIXED (commit 8: writeStderrLine before ws.close()) |
| 21 | ytahdn | ACP handle not disposed on shutdown | FIXED (commit 2: acpHandle.dispose() in shutdown) |
Independent Verification at HEAD
Security layer (all 5 layers confirmed):
- Host allowlist: loopback +
host:portmatching ✓ - CSWSH origin: applies to loopback, IPv6 bracket strip ✓
- Bearer auth: SHA-256 pre-hash +
timingSafeEqual, no length leak ✓ - Connection cap:
registry.create()returns null → 1013 close ✓ - Init timeout: 30s with
.unref(), logged on fire ✓
Message processing:
messageQueueserializes all messages via promise chain ✓- Prompt fire-and-forget:
const isPrompt = ...→if (!isPrompt) await dispatchP✓ - Rate limit:
WS_EXEMPT_METHODS(heartbeat/metadata),WS_READ_METHODS(explicit set), elsemutation✓
Connection lifecycle:
connStreamguard:conn.connStream && !conn.connStream.isClosed✓- Abort identity:
myAbortcaptured, cleanup checksb?.abort === myAbort✓ - Dispose: removes upgrade listener, closes WSS, disposes registry ✓
- Destroy: try-catch per
teardownBinding✓
Rate limiter:
tryConsumeshared by Express middleware + WS + HTTP ACP ✓onErrorcallback preserved on bucket overflow ✓- IP normalization:
::ffff:prefix strip ✓ - ACP HTTP POST:
checkRate(httpKey, tier)withRetry-After: 5✓
Remaining Items (all Suggestions, non-blocking)
| # | Finding | Assessment |
|---|---|---|
| 1 | initTimer not cleared on ws error/close before init |
Minor — .unref() prevents exit blocking; fires on closed socket (no-op) |
| 2 | WsStream write chain lacks depth limit |
Pre-existing; same pattern as SSE; low risk in practice |
| 3 | WsStream.close() doesn't remove constructor listeners |
Pre-existing; GC handles after ws close |
| 4 | JSON.stringify in send() can throw synchronously |
Pre-existing; JSON-RPC messages from dispatcher are safe |
| 5 | Various dispatch-layer size limits (agents/create, file/edit, etc.) | Pre-existing REST parity — not WS-specific |
| 6 | WsStream.onClose unconditional registry.delete |
Deferred — WS reconnect not supported |
| 7 | Origin check ignores port | Acceptable for loopback daemon |
Final Verdict — LGTM, Recommend Merge
All 21 Critical findings from 5 reviewers addressed across 8 commits. Security layer is defense-in-depth (host → origin → auth → cap → timeout). Message serialization + prompt fire-and-forget resolves the deadlock. Rate limiting achieves full parity between WS and HTTP transports. Remaining 7 items are all Suggestions (pre-existing or acceptable trade-offs). 10 integration tests cover all security paths.
This re-review was generated by QoderWork AI
ytahdn
left a comment
There was a problem hiding this comment.
Incremental Review (commits 2fb0b60 → c6ded92)
Good set of fixes addressing the previous review and additional hardening. Summary of changes:
✅ Well done
- Prompt fire-and-forget —
session/promptno longer blocks the message queue, preventing deadlock when permission votes or cancel requests are queued behind a long-running prompt. Clear comment explaining the rationale. - Type safety —
connRef: AcpConnection | undefinedreplacesany,upgradeListenerproperly typed,connStream!non-null assertion removed with proper guard. - Token hash pre-computed —
expectedTokenHashcomputed once at setup scope, not per-upgrade. destroy()exception safety —teardownBindingwrapped in try-catch so one session failure doesn't prevent others from being torn down.- Rate limit bucket overflow —
onErrorcallback restored intryConsume, fixing the regression from last round. - Rate limit method classification parity — Explicit
WS_EXEMPT_METHODS/WS_READ_METHODSsets ensure REST and WS rate limiting use the same tier classification. - REST POST rate limiting —
checkRatenow applied to non-initialize POST requests, closing the gap where REST POST had no rate limit. - Init timeout logging — 30s init timeout now logs the source address for debugging.
- Lazy session stream guard —
conn.connStream && !conn.connStream.isClosedprevents attaching to a closed stream. wsKeyIPv6 normalization — MatchesnormalizeIpbehavior used in REST middleware.
Findings
Two minor observations — see inline comments.
— qwen3.7-max via Qwen Code /review
| */ | ||
| const CONN_GRACE_MS = 10_000; | ||
|
|
||
| const WS_EXEMPT_METHODS = new Set([ |
There was a problem hiding this comment.
[Suggestion] WS_EXEMPT_METHODS and WS_READ_METHODS are now also used in the REST POST rate-limit path (line ~232). The WS_ prefix is misleading since these are protocol-agnostic method classification sets. Consider renaming to ACP_EXEMPT_METHODS and ACP_READ_METHODS to reflect the shared usage.
— qwen3.7-max via Qwen Code /review
| : 'mutation'; | ||
| const httpKey = (req.socket?.remoteAddress ?? 'http-unknown').replace( | ||
| /^::ffff:/, | ||
| '', |
There was a problem hiding this comment.
[Suggestion] The REST POST rate-limit key uses req.socket?.remoteAddress (IP only), while the REST middleware (rateLimit.ts createKeyExtractor) may include a clientId from the x-qwen-client-id header (e.g. ip:clientId or cid:clientId). This means the same client could be tracked under different keys in the two paths, reducing rate-limit effectiveness.
Consider extracting the key-building logic into a shared helper:
const httpKey = extractKeyFromReq(req); // reuses same clientId + IP logic— qwen3.7-max via Qwen Code /review
ytahdn
left a comment
There was a problem hiding this comment.
All previous Critical findings resolved. Two minor Suggestions remain (naming + rate-limit key parity) but are non-blocking. LGTM! ✅ — qwen3.7-max via Qwen Code /review
qwen-code-ci-bot
left a comment
There was a problem hiding this comment.
Code Review Summary
Reviewed by 9 parallel agents (correctness, security, code quality, performance, test coverage, 3 undirected personas, build/test) plus 3 rounds of iterative reverse audit.
Scope: +1,088 / -53 across 10 files adding WebSocket transport for the ACP HTTP server.
Findings: 3 Critical, 7 Suggestions
Critical:
- Rate limit bypass via JSON-RPC notifications (no
id= exempt from rate limiter) - Unsynchronized
ws.send()afterWsStreamcreation bypasses write-chain serialization - Notification-form
session/promptdeadlocks the WS message queue
Suggestions:
4. Silent socket.destroy() on all 7 security rejection paths (no logging)
5. initTimer not cleared on protocol-error and connection-cap failure paths
6. WS rate-limit error uses INTERNAL_ERROR (-32603) instead of a rate-limit-specific code
7. Origin check validates hostname only, ignoring port (CSWSH risk in no-token dev mode)
8. Duplicate ws.on('error') handlers in both index.ts and WsStream constructor
9. Tier resolution logic duplicated between HTTP POST and WS handlers
10. Batch JSON-RPC rejection is not valid JSON-RPC 2.0 format
Terminal-only findings (low confidence, not posted as inline comments):
- HTTP POST rate-limit response not in JSON-RPC format
WsStream.send()silently resolves after closerawAddrTDZ fragility ininitTimercallback- Port 80 host allowlist special case not replicated for WS
- Auth token
.trim()vs HTTP middleware's leading-BWS-only strip divergence - Init handler throw between
registry.create()andinitialized = trueleaks slot - Recursive
registry.deleteon WS connections suppresses audit log
| } | ||
| } | ||
|
|
||
| if (opts.checkRate && isRequest(message)) { |
There was a problem hiding this comment.
[Critical — Security] Rate limit bypass via JSON-RPC notifications.
The guard isRequest(message) returns false for JSON-RPC notifications (messages without an id field), so opts.checkRate() is never called. An attacker can send unlimited notification-form requests (e.g., {"jsonrpc":"2.0","method":"session/list"} with no id) to bypass the rate limiter entirely.
The same issue exists on the HTTP POST path at line 231.
Suggested fix: Change the condition to also check notifications, or apply the rate limiter unconditionally (exempting only the WS_EXEMPT_METHODS set):
if (opts.checkRate && !WS_EXEMPT_METHODS.has(message.method)) {
// ... rate limit check ...
}| ); | ||
| conn.attachConnStream(stream); | ||
|
|
||
| ws.send( |
There was a problem hiding this comment.
[Critical — Correctness] Unsynchronized write bypasses WsStream write-chain serialization.
After creating WsStream at line 638 and attaching it to conn, this ws.send() call writes directly to the raw WebSocket. WsStream is designed to serialize writes through its writeChain promise to prevent frame interleaving. Direct ws.send() calls bypass that guarantee.
The same issue appears at line 735 (rate-limit error response) and line 584 (batch JSON-RPC rejection) — all use ws.send() after the WsStream exists or could be racing with WsStream.send() calls.
Suggested fix: Use stream.send(...) instead of ws.send(JSON.stringify(...)) for all writes after WsStream creation, and use void ws.send(...) → await stream.send(...) for pre-init writes if ordering matters.
| // Prompt is long-running (minutes); awaiting it would block | ||
| // permission votes and cancel requests queued behind it → deadlock. | ||
| // Fire-and-forget so the message queue stays unblocked. | ||
| const isPrompt = |
There was a problem hiding this comment.
[Critical — Correctness] Notification-form session/prompt deadlocks the WS message queue.
The isPrompt guard requires isRequest(message) which checks for an id field. A JSON-RPC notification (no id) with method session/prompt has isPrompt = false, so dispatchP is awaited. Since prompts can take minutes and may require permission votes from the same WS connection, this blocks the entire messageQueue — the client's vote/cancel messages queue behind the awaited prompt.
Deadlock scenario: Client sends notification-form session/prompt → prompt requires permission vote → client sends vote as next WS message → vote queued behind awaited prompt → prompt awaits vote → deadlock.
Suggested fix: Check the method name regardless of whether it's a request or notification:
const isPrompt =
message.method === 'session/prompt' ||
message.method === '_qwen/session/prompt';| `http://${req.headers.host ?? 'localhost'}`, | ||
| ); | ||
| } catch { | ||
| socket.destroy(); |
There was a problem hiding this comment.
[Suggestion — Observability] All 7 security rejection paths in the WS upgrade handler (socket.destroy() on invalid URL, wrong path, host mismatch, CSWSH origin, missing auth, bad token, non-loopback without token) silently destroy the socket with no logging.
This makes it difficult to diagnose attack attempts, misconfigured clients, or DNS-rebinding probes in production.
Suggested fix: Add a log line before each socket.destroy():
writeStderrLine(`qwen serve: /acp WS upgrade rejected (host-mismatch): host=${host} from=${socket.remoteAddress}`);
socket.destroy();| ); | ||
| ws.close(1002, 'Protocol error'); | ||
| return; | ||
| } |
There was a problem hiding this comment.
[Suggestion — Resource leak] initTimer (30s timeout) is not cleared on the protocol-error path (ws.close(1002)) or the connection-cap path (ws.close(1013)). After ws.close(), the timer fires and calls ws.close(1002, 'Initialize timeout') on an already-closed socket.
Additionally, if dispatcher.buildInitializeResult() or JSON.stringify() throws after registry.create(), the connection leaks a registry slot for up to 30 seconds.
Suggested fix: Add clearTimeout(initTimer) before each early return, and wrap the post-registry.create() block in a try/catch that cleans up the timer and registry entry.
| rpcError( | ||
| message.id, | ||
| RPC.INTERNAL_ERROR, | ||
| 'Rate limit exceeded', |
There was a problem hiding this comment.
[Suggestion — Protocol] The WS rate-limit error response uses RPC.INTERNAL_ERROR (-32603) with the message "Rate limit exceeded". JSON-RPC 2.0 reserves -32603 for server implementation errors, not client-side throttling. A spec-compliant client may interpret this as a server bug rather than a rate limit.
Suggested fix: Use a custom error code (e.g., a dedicated RATE_LIMIT_EXCEEDED constant) or reuse the HTTP path's approach (a code: 'rate_limit_exceeded' field in the error data).
| const origin = req.headers['origin']; | ||
| if (origin) { | ||
| try { | ||
| const originHost = new URL(origin).hostname.replace(/^\[|\]$/g, ''); |
There was a problem hiding this comment.
[Suggestion — Security] The CSWSH origin check validates only hostname, ignoring the port. In no-token dev mode (opts.token is undefined), a malicious site at http://127.0.0.1:ATTACKER_PORT can initiate a WS upgrade to the daemon with Origin: http://127.0.0.1:ATTACKER_PORT — the hostname 127.0.0.1 passes the check.
While browser CSRF protections limit cross-origin cookie leakage (not relevant for bearer-token auth), this check is the only defense in no-token loopback mode where the daemon accepts any local connection.
Suggested fix: Also validate the origin port matches the server's listening port, or reject origins with non-standard ports.
| ? rawAddr.slice(7) | ||
| : rawAddr; | ||
|
|
||
| ws.on('error', (err) => { |
There was a problem hiding this comment.
[Suggestion — Code quality] Duplicate ws.on('error') handlers. This handler at line 546 and the one in WsStream's constructor (wsStream.ts:22) both listen for 'error' on the same WebSocket. Both log and the WsStream one also calls this.close().
The WsStream is created after handleUpgrade, so there's a window where only this handler is active. Consider consolidating into WsStream or removing the redundant listener here.
| // Rate limit ACP HTTP POST (mirrors the WS checkRate path). | ||
| if (opts.checkRate && isRequest(message)) { | ||
| const m = message.method; | ||
| if (!WS_EXEMPT_METHODS.has(m)) { |
There was a problem hiding this comment.
[Suggestion — Maintainability] The tier resolution logic (prompt → 'prompt', read methods → 'read', rest → 'mutation') is duplicated between the HTTP POST path (here) and the WS path (~line 728). If a new method is added or the tier mapping changes, both locations must be updated.
Suggested fix: Extract into a shared function:
function resolveTier(method: string): RateLimitTier | null {
if (WS_EXEMPT_METHODS.has(method)) return null;
if (method === 'session/prompt' || method === '_qwen/session/prompt') return 'prompt';
if (WS_READ_METHODS.has(method)) return 'read';
return 'mutation';
}|
|
||
| if (Array.isArray(parsed)) { | ||
| ws.send( | ||
| JSON.stringify({ |
There was a problem hiding this comment.
[Suggestion — Protocol compliance] The batch JSON-RPC rejection { error: 'Batch JSON-RPC not supported' } is not a valid JSON-RPC 2.0 response. It lacks jsonrpc: "2.0", the id field, and a structured error: { code, message } object.
A spec-compliant client cannot correlate or classify this response.
Suggested fix:
ws.send(JSON.stringify(
rpcError(null, RPC.INVALID_REQUEST, 'Batch JSON-RPC not supported')
));
Summary
Complete ACP WebSocket transport per RFD. Coexists with SSE.
Depends on: #4827
Implementation
transportStream.tswsStream.tsconnectionRegistry.tsindex.tsserver.tsrunQwenServe.tsdispatch.tsWS Lifecycle
GET /acpwithUpgrade: websocketinitializeas first text frameSecurity
handleUpgradeSee #4782 for tracking.