Skip to content

Commit a047890

Browse files
committed
fix(serve): fold-in 4 — qwen-latest review (#4175 Wave 4 PR 17)
Round-4 reviewer adoption (qwen-latest-series-invite-beta-v28): - C1: hoist `persistApprovalMode` guard before the ACP roundtrip so a missing callback no longer leaves the daemon's mode shifted while the caller observes a 500 (httpAcpBridge.ts). - C2: serialize `persistApprovalMode` and `persistDisabledTools` through a per-workspace promise chain (`withSettingsLock`) so concurrent toggles can't lose updates in the read-modify-write window (runQwenServe.ts). - C3: trim `toolName` before persisting in `/workspace/tools/:name/enable` so the write path matches `loadCliConfig`'s `.trim()` on read. Re-validates empty-after-trim with 400 `invalid_tool_name`. - S1: cap `serverName` at `MAX_SERVER_NAME_LENGTH=256` on `/workspace/mcp/:server/restart` for parity with the tool-toggle cap. - S2: when `persist:true` succeeds, mirror `approval_mode_changed` via `broadcastWorkspaceEvent` so peer sessions in the same workspace observe the new default before their next ACP child spawn. - S3: `'noop'` added to `FakeBridge.initWorkspaceImpl` return type. - S5: `qwen-serve-protocol.md` action enumeration now includes `'noop'` and notes how the SSE event mirrors the response action. S4 (sync IO inside async persist callbacks) is acknowledged but deferred — `loadSettings` is the project-wide read path and the H2 fold-in already restricted us to workspace-scope-only consumption, keeping the sync window bounded. Fully eliminating it requires swapping `loadSettings` to async across the CLI, which is out of scope. 7 new tests: - server.test.ts × 3: tool-name trim, whitespace-only 400, server-name 256 cap. - httpAcpBridge.test.ts × 4: pre-call guard ordering for persist:true (no callback), persist:false bypasses guard, persist:true broadcasts to peer sessions, persist:false stays session-scoped. Typecheck clean across cli / sdk-typescript / core. 1599/1599 unit tests pass.
1 parent 015c917 commit a047890

6 files changed

Lines changed: 364 additions & 39 deletions

File tree

docs/developers/qwen-serve-protocol.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1183,7 +1183,7 @@ Response (200):
11831183
{ "path": "/work/bound/QWEN.md", "action": "created" }
11841184
```
11851185

1186-
`action` is `'created'` for fresh creates and whitespace-only overrides; `'overwrote'` when `force: true` replaced non-empty content.
1186+
`action` is `'created'` for fresh creates, `'noop'` when an existing whitespace-only file was left untouched (no write performed), and `'overwrote'` when `force: true` replaced non-empty content. The `workspace_initialized` SSE event mirrors the response action — observers can filter for `action !== 'noop'` to react only to actual on-disk changes.
11871187

11881188
Errors:
11891189

packages/cli/src/serve/httpAcpBridge.test.ts

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ import {
5353
} from './httpAcpBridge.js';
5454
import { createInMemoryChannel } from './inMemoryChannel.js';
5555
import type { BridgeEvent } from './eventBus.js';
56+
import { ApprovalMode } from '@qwen-code/qwen-code-core';
5657

5758
// Workspace fixtures must round-trip through `path.resolve` so the
5859
// expected values match what the bridge canonicalizes internally on
@@ -4279,6 +4280,189 @@ describe('createHttpAcpBridge', () => {
42794280
});
42804281
});
42814282

4283+
describe('setSessionApprovalMode (#4175 Wave 4 PR 17)', () => {
4284+
/**
4285+
* #4282 fold-in 4 (qwen-latest C1). Build a channel factory whose
4286+
* extMethod handler answers `qwen/control/session/approval_mode`
4287+
* with the expected `{previous, current}` shape. Tracks invocations
4288+
* so the guard-ordering tests can assert that the ACP call did NOT
4289+
* happen when the persist contract was already violated upfront.
4290+
*/
4291+
function approvalModeFactoryWithCallTracker(): {
4292+
factory: ChannelFactory;
4293+
getCalls: () => Array<{ method: string }>;
4294+
} {
4295+
const calls: Array<{ method: string }> = [];
4296+
const factory: ChannelFactory = async () => {
4297+
const { clientStream, agentStream } = createInMemoryChannel();
4298+
const agent = new FakeAgent({
4299+
extMethodImpl: (method, params) => {
4300+
calls.push({ method });
4301+
if (method === 'qwen/control/session/approval_mode') {
4302+
return Promise.resolve({
4303+
previous: 'default',
4304+
current: (params as { mode: string }).mode,
4305+
});
4306+
}
4307+
return Promise.resolve({});
4308+
},
4309+
});
4310+
new AgentSideConnection(() => agent as Agent, agentStream);
4311+
return {
4312+
stream: clientStream,
4313+
exited: new Promise<
4314+
| { exitCode: number | null; signalCode: NodeJS.Signals | null }
4315+
| undefined
4316+
>(() => {}),
4317+
kill: async () => {},
4318+
killSync: () => {},
4319+
};
4320+
};
4321+
return { factory, getCalls: () => calls };
4322+
}
4323+
4324+
it('throws BEFORE the ACP roundtrip when persist:true but no callback wired', async () => {
4325+
// The previous post-ACP placement of the persist guard meant a
4326+
// missing callback produced a 500 *after* the ACP child had
4327+
// already applied the mode change — observable to other in-flight
4328+
// requests but invisible to the caller. Pre-call ordering closes
4329+
// that window; assert by checking the ACP `extMethod` was never
4330+
// invoked when the guard fires.
4331+
const { factory, getCalls } = approvalModeFactoryWithCallTracker();
4332+
const bridge = makeBridge({ channelFactory: factory });
4333+
const session = await bridge.spawnOrAttach({ workspaceCwd: WS_A });
4334+
await expect(
4335+
bridge.setSessionApprovalMode(
4336+
session.sessionId,
4337+
ApprovalMode.YOLO,
4338+
{ persist: true },
4339+
undefined,
4340+
),
4341+
).rejects.toThrow(/persistApprovalMode/);
4342+
expect(
4343+
getCalls().some(
4344+
(c) => c.method === 'qwen/control/session/approval_mode',
4345+
),
4346+
).toBe(false);
4347+
await bridge.shutdown();
4348+
});
4349+
4350+
it('persist:false bypasses the guard regardless of callback wiring', async () => {
4351+
// Symmetric coverage for the guard: when `persist` is omitted /
4352+
// false, the missing callback is irrelevant and the ACP call must
4353+
// proceed normally. Without this check, a future regression that
4354+
// moves the guard could over-restrict the no-persist path.
4355+
const { factory } = approvalModeFactoryWithCallTracker();
4356+
const bridge = makeBridge({ channelFactory: factory });
4357+
const session = await bridge.spawnOrAttach({ workspaceCwd: WS_A });
4358+
const res = await bridge.setSessionApprovalMode(
4359+
session.sessionId,
4360+
ApprovalMode.YOLO,
4361+
{ persist: false },
4362+
undefined,
4363+
);
4364+
expect(res.persisted).toBe(false);
4365+
expect(res.mode).toBe('yolo');
4366+
await bridge.shutdown();
4367+
});
4368+
4369+
it('broadcasts approval_mode_changed to peer sessions when persisted (#4282 fold-in 4 S2)', async () => {
4370+
// When `persist:true` succeeds the change becomes the workspace
4371+
// default, so a peer session needs to know its next ACP child
4372+
// will spawn into a different mode. The session-scoped publish
4373+
// remains the authoritative signal for the requester; the
4374+
// workspace broadcast is the informational mirror for peers.
4375+
const { factory } = approvalModeFactoryWithCallTracker();
4376+
const bridge = makeBridge({
4377+
channelFactory: factory,
4378+
persistApprovalMode: async () => {},
4379+
});
4380+
const a = await bridge.spawnOrAttach({
4381+
workspaceCwd: WS_A,
4382+
sessionScope: 'thread',
4383+
});
4384+
const b = await bridge.spawnOrAttach({
4385+
workspaceCwd: WS_A,
4386+
sessionScope: 'thread',
4387+
});
4388+
const aborts = [new AbortController(), new AbortController()];
4389+
const itA = bridge
4390+
.subscribeEvents(a.sessionId, { signal: aborts[0]!.signal })
4391+
[Symbol.asyncIterator]();
4392+
const itB = bridge
4393+
.subscribeEvents(b.sessionId, { signal: aborts[1]!.signal })
4394+
[Symbol.asyncIterator]();
4395+
await bridge.setSessionApprovalMode(
4396+
a.sessionId,
4397+
ApprovalMode.YOLO,
4398+
{ persist: true },
4399+
undefined,
4400+
);
4401+
// Session A receives both the session-scoped event and the
4402+
// workspace-scoped mirror; collect two events.
4403+
const aFirst = await itA.next();
4404+
const aSecond = await itA.next();
4405+
const aTypes = [aFirst.value?.type, aSecond.value?.type];
4406+
expect(aTypes.filter((t) => t === 'approval_mode_changed').length).toBe(
4407+
2,
4408+
);
4409+
// Session B receives only the workspace-scoped mirror.
4410+
const bFirst = await itB.next();
4411+
expect(bFirst.value?.type).toBe('approval_mode_changed');
4412+
expect(bFirst.value?.data).toMatchObject({
4413+
sessionId: a.sessionId,
4414+
previous: 'default',
4415+
next: 'yolo',
4416+
persisted: true,
4417+
});
4418+
aborts.forEach((a) => a.abort());
4419+
await bridge.shutdown();
4420+
});
4421+
4422+
it('does NOT broadcast to peers when persisted is false', async () => {
4423+
// Symmetric coverage: ephemeral changes affect only the
4424+
// requesting session and must not surface on peer SSE buses, or
4425+
// peer UIs would react to a workspace-wide change that didn't
4426+
// happen.
4427+
const { factory } = approvalModeFactoryWithCallTracker();
4428+
const bridge = makeBridge({ channelFactory: factory });
4429+
const a = await bridge.spawnOrAttach({
4430+
workspaceCwd: WS_A,
4431+
sessionScope: 'thread',
4432+
});
4433+
const b = await bridge.spawnOrAttach({
4434+
workspaceCwd: WS_A,
4435+
sessionScope: 'thread',
4436+
});
4437+
const aborts = [new AbortController(), new AbortController()];
4438+
const itA = bridge
4439+
.subscribeEvents(a.sessionId, { signal: aborts[0]!.signal })
4440+
[Symbol.asyncIterator]();
4441+
const itB = bridge
4442+
.subscribeEvents(b.sessionId, { signal: aborts[1]!.signal })
4443+
[Symbol.asyncIterator]();
4444+
await bridge.setSessionApprovalMode(
4445+
a.sessionId,
4446+
ApprovalMode.YOLO,
4447+
{ persist: false },
4448+
undefined,
4449+
);
4450+
const aFirst = await itA.next();
4451+
expect(aFirst.value?.type).toBe('approval_mode_changed');
4452+
// Race the peer subscriber against a 50ms timer. Without a
4453+
// timeout the test would hang because no event is expected.
4454+
const timed = await Promise.race([
4455+
itB.next().then((v) => ({ kind: 'event' as const, v })),
4456+
new Promise((r) => setTimeout(r, 50)).then(() => ({
4457+
kind: 'timeout' as const,
4458+
})),
4459+
]);
4460+
expect(timed.kind).toBe('timeout');
4461+
aborts.forEach((a) => a.abort());
4462+
await bridge.shutdown();
4463+
});
4464+
});
4465+
42824466
describe('setWorkspaceToolEnabled (#4175 Wave 4 PR 17)', () => {
42834467
it('throws when no persistDisabledTools callback is wired', async () => {
42844468
const bridge = makeBridge();

packages/cli/src/serve/httpAcpBridge.ts

Lines changed: 34 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -3792,6 +3792,21 @@ export function createHttpAcpBridge(opts: BridgeOptions): HttpAcpBridge {
37923792
entry,
37933793
context?.clientId,
37943794
);
3795+
// #4282 fold-in 4 (qwen-latest C1): validate the persist contract
3796+
// BEFORE the ACP roundtrip changes the in-process mode. The previous
3797+
// post-call placement meant a missing `persistApprovalMode` callback
3798+
// produced a 500 *after* the ACP child had already applied the
3799+
// mode change — observable to other in-flight requests but
3800+
// invisible to the caller. Mirrors the pre-call validation in
3801+
// `setWorkspaceToolEnabled`.
3802+
if (opts.persist && !persistApprovalMode) {
3803+
throw new Error(
3804+
'setSessionApprovalMode called with `persist: true` but no ' +
3805+
'`persistApprovalMode` callback wired in BridgeOptions. ' +
3806+
'runQwenServe wires the production callback; direct embeds ' +
3807+
'and tests must opt in or omit `persist`.',
3808+
);
3809+
}
37953810
let response: { previous: ApprovalMode; current: ApprovalMode };
37963811
try {
37973812
response = (await Promise.race([
@@ -3828,21 +3843,6 @@ export function createHttpAcpBridge(opts: BridgeOptions): HttpAcpBridge {
38283843
}
38293844
throw err;
38303845
}
3831-
// #4282 wenshao H3 fold-in: throw clearly when the caller asked
3832-
// for `persist: true` but the bridge wasn't wired with a
3833-
// `persistApprovalMode` callback. The previous silent
3834-
// `persisted: false` was indistinguishable from "hook ran but
3835-
// failed" or genuine `persisted: true` from the caller's point
3836-
// of view, leaving a contract gap. Mirrors the throw in
3837-
// `setWorkspaceToolEnabled` for the same situation.
3838-
if (opts.persist && !persistApprovalMode) {
3839-
throw new Error(
3840-
'setSessionApprovalMode called with `persist: true` but no ' +
3841-
'`persistApprovalMode` callback wired in BridgeOptions. ' +
3842-
'runQwenServe wires the production callback; direct embeds ' +
3843-
'and tests must opt in or omit `persist`.',
3844-
);
3845-
}
38463846
let persisted = false;
38473847
if (opts.persist) {
38483848
try {
@@ -3874,6 +3874,25 @@ export function createHttpAcpBridge(opts: BridgeOptions): HttpAcpBridge {
38743874
} catch {
38753875
/* bus closed */
38763876
}
3877+
// #4282 fold-in 4 (qwen-latest S2): when the change is persisted to
3878+
// workspace settings, the new mode becomes the default for every
3879+
// future session in this workspace. Fan out a workspace-scoped
3880+
// mirror so peer sessions can update their UI before they next
3881+
// spawn an ACP child. The session-scoped publish above remains the
3882+
// authoritative signal for the requesting session (and carries the
3883+
// sessionId in `data`); the workspace mirror is informational.
3884+
if (persisted) {
3885+
broadcastWorkspaceEvent({
3886+
type: 'approval_mode_changed',
3887+
data: {
3888+
sessionId: entry.sessionId,
3889+
previous: response.previous,
3890+
next: response.current,
3891+
persisted,
3892+
},
3893+
...(originatorClientId ? { originatorClientId } : {}),
3894+
});
3895+
}
38773896
return {
38783897
sessionId: entry.sessionId,
38793898
mode: response.current,

packages/cli/src/serve/runQwenServe.ts

Lines changed: 67 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,40 @@ function formatHostForUrl(host: string): string {
4848
return host;
4949
}
5050

51+
/**
52+
* #4282 fold-in 4 (qwen-latest C2). Per-workspace promise chain that
53+
* serializes settings read-modify-write cycles inside this process.
54+
*
55+
* Both `persistApprovalMode` and `persistDisabledTools` re-read
56+
* `tools.disabled` (or `tools.approvalMode`) from disk before writing
57+
* the merged result back, which is a textbook lost-update window if
58+
* two concurrent HTTP requests land at the same workspace. Threading
59+
* each call through this lock collapses the window: the first request
60+
* holds the chain until its `setValue` flush completes, and the second
61+
* sees the post-write state when it runs its own load.
62+
*
63+
* Scope is INTRA-process: a separate `qwen serve` invocation against
64+
* the same workspace would not share the Map, but per-workspace
65+
* single-daemon is the supported deployment shape (see #3803 §02).
66+
* The lock decays naturally — when no callers are queued, the chain
67+
* resolves and stays mounted in the Map; the per-workspace memory
68+
* cost is one settled Promise and one Map entry.
69+
*
70+
* Errors propagate to the caller; the chain advances to the next
71+
* waiter regardless via the `.then(fn, fn)` pattern, so a single
72+
* failed write doesn't permanently stall persistence.
73+
*/
74+
const settingsWriteLocks = new Map<string, Promise<unknown>>();
75+
function withSettingsLock<T>(
76+
workspace: string,
77+
fn: () => Promise<T>,
78+
): Promise<T> {
79+
const prev = settingsWriteLocks.get(workspace) ?? Promise.resolve();
80+
const next = prev.then(fn, fn);
81+
settingsWriteLocks.set(workspace, next);
82+
return next;
83+
}
84+
5185
export interface RunHandle {
5286
server: Server;
5387
url: string;
@@ -285,10 +319,22 @@ export async function runQwenServe(
285319
// another writer (CLI, another daemon, an editor) could have
286320
// touched the file between calls, so the freshest state wins
287321
// over a stale in-memory cache.
288-
persistApprovalMode: async (workspace, mode) => {
289-
const fresh = loadSettings(workspace);
290-
fresh.setValue(SettingScope.Workspace, 'tools.approvalMode', mode);
291-
},
322+
//
323+
// #4282 fold-in 4 (qwen-latest C2): both persist callbacks run
324+
// through `withSettingsLock` — a per-workspace promise chain that
325+
// serializes the read-modify-write cycle. Without the lock, two
326+
// concurrent `POST /workspace/tools/:name/enable` requests could
327+
// both read the same pre-modification state and the second write
328+
// would silently overwrite the first toggle, leaving the disk
329+
// copy out of sync with the SDK reducer's view. The lock costs
330+
// one tick of latency per call but eliminates the lost-update
331+
// window for the entire process; cross-daemon races against the
332+
// same workspace file remain (rare; documented).
333+
persistApprovalMode: (workspace, mode) =>
334+
withSettingsLock(workspace, async () => {
335+
const fresh = loadSettings(workspace);
336+
fresh.setValue(SettingScope.Workspace, 'tools.approvalMode', mode);
337+
}),
292338
// #4175 Wave 4 PR 17: `POST /workspace/tools/:name/enable` writes
293339
// through this callback. Re-reads settings on each call (same
294340
// freshness rationale as `persistApprovalMode`) and merges into
@@ -303,22 +349,23 @@ export async function runQwenServe(
303349
// toggle. Subsequent removals at the originating scope (e.g.
304350
// User) would no longer take effect because the names have been
305351
// baked into the workspace file with no obvious source.
306-
persistDisabledTools: async (workspace, toolName, enabled) => {
307-
const fresh = loadSettings(workspace);
308-
const wsScope = fresh.forScope(SettingScope.Workspace).settings;
309-
const wsDisabled = wsScope.tools?.disabled;
310-
const current = Array.isArray(wsDisabled)
311-
? wsDisabled.filter((v): v is string => typeof v === 'string')
312-
: [];
313-
const next = new Set(current);
314-
if (enabled) next.delete(toolName);
315-
else next.add(toolName);
316-
fresh.setValue(
317-
SettingScope.Workspace,
318-
'tools.disabled',
319-
[...next].sort(),
320-
);
321-
},
352+
persistDisabledTools: (workspace, toolName, enabled) =>
353+
withSettingsLock(workspace, async () => {
354+
const fresh = loadSettings(workspace);
355+
const wsScope = fresh.forScope(SettingScope.Workspace).settings;
356+
const wsDisabled = wsScope.tools?.disabled;
357+
const current = Array.isArray(wsDisabled)
358+
? wsDisabled.filter((v): v is string => typeof v === 'string')
359+
: [];
360+
const next = new Set(current);
361+
if (enabled) next.delete(toolName);
362+
else next.add(toolName);
363+
fresh.setValue(
364+
SettingScope.Workspace,
365+
'tools.disabled',
366+
[...next].sort(),
367+
);
368+
}),
322369
});
323370
let actualPort = opts.port;
324371
// Pass the already-canonical `boundWorkspace` into `createServeApp`

0 commit comments

Comments
 (0)