Skip to content

Commit 6ead753

Browse files
committed
fix: preserve embedded dispatcher timeouts
1 parent 497b1a6 commit 6ead753

5 files changed

Lines changed: 95 additions & 28 deletions

File tree

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,14 @@
11
import {
22
DEFAULT_UNDICI_STREAM_TIMEOUT_MS,
3+
ensureGlobalUndiciDispatcherStreamTimeouts,
34
ensureGlobalUndiciEnvProxyDispatcher,
4-
ensureGlobalUndiciStreamTimeouts,
55
} from "../../../infra/net/undici-global-dispatcher.js";
66

77
export function configureEmbeddedAttemptHttpRuntime(params: { timeoutMs: number }): void {
88
// Proxy bootstrap must happen before timeout tuning so the timeouts wrap the
99
// active EnvHttpProxyAgent instead of being replaced by a bare proxy dispatcher.
1010
ensureGlobalUndiciEnvProxyDispatcher();
11-
ensureGlobalUndiciStreamTimeouts({
11+
ensureGlobalUndiciDispatcherStreamTimeouts({
1212
timeoutMs: Math.max(params.timeoutMs, DEFAULT_UNDICI_STREAM_TIMEOUT_MS),
1313
});
1414
}

src/agents/pi-embedded-runner/run/attempt.spawn-workspace.test-support.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ type AttemptSpawnWorkspaceHoisted = {
6060
sessionManagerOpenMock: UnknownMock;
6161
resolveSandboxContextMock: UnknownMock;
6262
ensureGlobalUndiciEnvProxyDispatcherMock: UnknownMock;
63+
ensureGlobalUndiciDispatcherStreamTimeoutsMock: UnknownMock;
6364
ensureGlobalUndiciStreamTimeoutsMock: UnknownMock;
6465
buildEmbeddedMessageActionDiscoveryInputMock: UnknownMock;
6566
createOpenClawCodingToolsMock: UnknownMock;
@@ -125,6 +126,7 @@ const hoisted = vi.hoisted((): AttemptSpawnWorkspaceHoisted => {
125126
const sessionManagerOpenMock = vi.fn();
126127
const resolveSandboxContextMock = vi.fn();
127128
const ensureGlobalUndiciEnvProxyDispatcherMock = vi.fn();
129+
const ensureGlobalUndiciDispatcherStreamTimeoutsMock = vi.fn();
128130
const ensureGlobalUndiciStreamTimeoutsMock = vi.fn();
129131
const buildEmbeddedMessageActionDiscoveryInputMock = vi.fn((params: unknown) => params);
130132
const createOpenClawCodingToolsMock = vi.fn(() => []);
@@ -193,6 +195,7 @@ const hoisted = vi.hoisted((): AttemptSpawnWorkspaceHoisted => {
193195
sessionManagerOpenMock,
194196
resolveSandboxContextMock,
195197
ensureGlobalUndiciEnvProxyDispatcherMock,
198+
ensureGlobalUndiciDispatcherStreamTimeoutsMock,
196199
ensureGlobalUndiciStreamTimeoutsMock,
197200
buildEmbeddedMessageActionDiscoveryInputMock,
198201
createOpenClawCodingToolsMock,
@@ -287,6 +290,8 @@ vi.mock("../../../infra/net/undici-global-dispatcher.js", () => ({
287290
DEFAULT_UNDICI_STREAM_TIMEOUT_MS: 120_000,
288291
ensureGlobalUndiciEnvProxyDispatcher: (...args: unknown[]) =>
289292
hoisted.ensureGlobalUndiciEnvProxyDispatcherMock(...args),
293+
ensureGlobalUndiciDispatcherStreamTimeouts: (...args: unknown[]) =>
294+
hoisted.ensureGlobalUndiciDispatcherStreamTimeoutsMock(...args),
290295
ensureGlobalUndiciStreamTimeouts: (...args: unknown[]) =>
291296
hoisted.ensureGlobalUndiciStreamTimeoutsMock(...args),
292297
}));
@@ -791,6 +796,7 @@ export function resetEmbeddedAttemptHarness(
791796
hoisted.sessionManagerOpenMock.mockReset().mockReturnValue(hoisted.sessionManager);
792797
hoisted.resolveSandboxContextMock.mockReset();
793798
hoisted.ensureGlobalUndiciEnvProxyDispatcherMock.mockReset();
799+
hoisted.ensureGlobalUndiciDispatcherStreamTimeoutsMock.mockReset();
794800
hoisted.ensureGlobalUndiciStreamTimeoutsMock.mockReset();
795801
hoisted.buildEmbeddedMessageActionDiscoveryInputMock
796802
.mockReset()

src/agents/pi-embedded-runner/run/attempt.spawn-workspace.timeout.test.ts

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,29 +2,29 @@ import { beforeEach, describe, expect, it, vi } from "vitest";
22

33
const mocks = vi.hoisted(() => ({
44
DEFAULT_UNDICI_STREAM_TIMEOUT_MS: 30 * 60 * 1000,
5+
ensureGlobalUndiciDispatcherStreamTimeouts: vi.fn(),
56
ensureGlobalUndiciEnvProxyDispatcher: vi.fn(),
6-
ensureGlobalUndiciStreamTimeouts: vi.fn(),
77
}));
88

99
vi.mock("../../../infra/net/undici-global-dispatcher.js", () => ({
1010
DEFAULT_UNDICI_STREAM_TIMEOUT_MS: mocks.DEFAULT_UNDICI_STREAM_TIMEOUT_MS,
11+
ensureGlobalUndiciDispatcherStreamTimeouts: mocks.ensureGlobalUndiciDispatcherStreamTimeouts,
1112
ensureGlobalUndiciEnvProxyDispatcher: mocks.ensureGlobalUndiciEnvProxyDispatcher,
12-
ensureGlobalUndiciStreamTimeouts: mocks.ensureGlobalUndiciStreamTimeouts,
1313
}));
1414

1515
import { configureEmbeddedAttemptHttpRuntime } from "./attempt-http-runtime.js";
1616

1717
describe("runEmbeddedAttempt undici timeout wiring", () => {
1818
beforeEach(() => {
1919
mocks.ensureGlobalUndiciEnvProxyDispatcher.mockReset();
20-
mocks.ensureGlobalUndiciStreamTimeouts.mockReset();
20+
mocks.ensureGlobalUndiciDispatcherStreamTimeouts.mockReset();
2121
});
2222

2323
it("does not lower global undici stream tuning below the shared default", () => {
2424
configureEmbeddedAttemptHttpRuntime({ timeoutMs: 123_456 });
2525

2626
expect(mocks.ensureGlobalUndiciEnvProxyDispatcher).toHaveBeenCalledOnce();
27-
expect(mocks.ensureGlobalUndiciStreamTimeouts).toHaveBeenCalledWith({
27+
expect(mocks.ensureGlobalUndiciDispatcherStreamTimeouts).toHaveBeenCalledWith({
2828
timeoutMs: mocks.DEFAULT_UNDICI_STREAM_TIMEOUT_MS,
2929
});
3030
});
@@ -35,7 +35,7 @@ describe("runEmbeddedAttempt undici timeout wiring", () => {
3535
configureEmbeddedAttemptHttpRuntime({ timeoutMs });
3636

3737
expect(mocks.ensureGlobalUndiciEnvProxyDispatcher).toHaveBeenCalledOnce();
38-
expect(mocks.ensureGlobalUndiciStreamTimeouts).toHaveBeenCalledWith({
38+
expect(mocks.ensureGlobalUndiciDispatcherStreamTimeouts).toHaveBeenCalledWith({
3939
timeoutMs,
4040
});
4141
});

src/infra/net/undici-global-dispatcher.test.ts

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,7 @@ vi.mock("../wsl.js", () => ({
7979
import { isWSL2Sync } from "../wsl.js";
8080
import { hasEnvHttpProxyAgentConfigured, resolveEnvHttpProxyAgentOptions } from "./proxy-env.js";
8181
let DEFAULT_UNDICI_STREAM_TIMEOUT_MS: typeof import("./undici-global-dispatcher.js").DEFAULT_UNDICI_STREAM_TIMEOUT_MS;
82+
let ensureGlobalUndiciDispatcherStreamTimeouts: typeof import("./undici-global-dispatcher.js").ensureGlobalUndiciDispatcherStreamTimeouts;
8283
let ensureGlobalUndiciEnvProxyDispatcher: typeof import("./undici-global-dispatcher.js").ensureGlobalUndiciEnvProxyDispatcher;
8384
let ensureGlobalUndiciStreamTimeouts: typeof import("./undici-global-dispatcher.js").ensureGlobalUndiciStreamTimeouts;
8485
let forceResetGlobalDispatcher: typeof import("./undici-global-dispatcher.js").forceResetGlobalDispatcher;
@@ -90,6 +91,7 @@ describe("ensureGlobalUndiciStreamTimeouts", () => {
9091
undiciGlobalDispatcherModule = await import("./undici-global-dispatcher.js");
9192
({
9293
DEFAULT_UNDICI_STREAM_TIMEOUT_MS,
94+
ensureGlobalUndiciDispatcherStreamTimeouts,
9395
ensureGlobalUndiciEnvProxyDispatcher,
9496
ensureGlobalUndiciStreamTimeouts,
9597
forceResetGlobalDispatcher,
@@ -150,6 +152,26 @@ describe("ensureGlobalUndiciStreamTimeouts", () => {
150152
expect(output.trim()).toBe("ok");
151153
});
152154

155+
it("explicitly tunes the global dispatcher when requested for embedded attempts", () => {
156+
getDefaultAutoSelectFamily.mockReturnValue(false);
157+
158+
ensureGlobalUndiciDispatcherStreamTimeouts({ timeoutMs: 1_900_000 });
159+
160+
expect(loadUndiciGlobalDispatcherDeps).toHaveBeenCalledTimes(1);
161+
expect(setGlobalDispatcher).toHaveBeenCalledTimes(1);
162+
const next = getCurrentDispatcher() as { options?: Record<string, unknown> };
163+
expect(next).toBeInstanceOf(Agent);
164+
expect(next.options).toEqual({
165+
bodyTimeout: 1_900_000,
166+
headersTimeout: 1_900_000,
167+
connect: {
168+
autoSelectFamily: false,
169+
autoSelectFamilyAttemptTimeout: 300,
170+
},
171+
});
172+
expect(undiciGlobalDispatcherModule._globalUndiciStreamTimeoutMs).toBe(1_900_000);
173+
});
174+
153175
it("replaces EnvHttpProxyAgent dispatcher while preserving env-proxy mode", () => {
154176
getDefaultAutoSelectFamily.mockReturnValue(false);
155177
vi.mocked(hasEnvHttpProxyAgentConfigured).mockReturnValue(true);

src/infra/net/undici-global-dispatcher.ts

Lines changed: 60 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,17 @@ function resolveDispatcherKey(params: {
4949
return `${params.kind}:${params.timeoutMs}:${autoSelectToken}`;
5050
}
5151

52+
function resolveStreamTimeoutMs(opts?: { timeoutMs?: number }): number | null {
53+
const timeoutMsRaw = opts?.timeoutMs ?? DEFAULT_UNDICI_STREAM_TIMEOUT_MS;
54+
if (!Number.isFinite(timeoutMsRaw)) {
55+
return null;
56+
}
57+
return Math.max(DEFAULT_UNDICI_STREAM_TIMEOUT_MS, Math.floor(timeoutMsRaw));
58+
}
59+
5260
function resolveCurrentDispatcherKind(
5361
runtime: Pick<UndiciGlobalDispatcherDeps, "getGlobalDispatcher">,
54-
): DispatcherKind | null {
62+
): Exclude<DispatcherKind, "unsupported"> | null {
5563
let dispatcher: unknown;
5664
try {
5765
dispatcher = runtime.getGlobalDispatcher();
@@ -92,19 +100,54 @@ export function ensureGlobalUndiciEnvProxyDispatcher(): void {
92100
}
93101
}
94102

103+
function applyGlobalDispatcherStreamTimeouts(params: {
104+
runtime: UndiciGlobalDispatcherDeps;
105+
kind: Exclude<DispatcherKind, "unsupported">;
106+
timeoutMs: number;
107+
}): void {
108+
const { runtime, kind, timeoutMs } = params;
109+
const autoSelectFamily = resolveUndiciAutoSelectFamily();
110+
const nextKey = resolveDispatcherKey({ kind, timeoutMs, autoSelectFamily });
111+
if (lastAppliedTimeoutKey === nextKey) {
112+
return;
113+
}
114+
115+
const connect = createUndiciAutoSelectFamilyConnectOptions(autoSelectFamily);
116+
try {
117+
if (kind === "env-proxy") {
118+
const proxyOptions = {
119+
...resolveEnvHttpProxyAgentOptions(),
120+
bodyTimeout: timeoutMs,
121+
headersTimeout: timeoutMs,
122+
...(connect ? { connect } : {}),
123+
} as ConstructorParameters<UndiciGlobalDispatcherDeps["EnvHttpProxyAgent"]>[0];
124+
runtime.setGlobalDispatcher(new runtime.EnvHttpProxyAgent(proxyOptions));
125+
} else {
126+
runtime.setGlobalDispatcher(
127+
new runtime.Agent({
128+
bodyTimeout: timeoutMs,
129+
headersTimeout: timeoutMs,
130+
...(connect ? { connect } : {}),
131+
}),
132+
);
133+
}
134+
lastAppliedTimeoutKey = nextKey;
135+
} catch {
136+
// Best-effort hardening only.
137+
}
138+
}
139+
95140
export function ensureGlobalUndiciStreamTimeouts(opts?: { timeoutMs?: number }): void {
96-
const timeoutMsRaw = opts?.timeoutMs ?? DEFAULT_UNDICI_STREAM_TIMEOUT_MS;
97-
if (!Number.isFinite(timeoutMsRaw)) {
141+
const timeoutMs = resolveStreamTimeoutMs(opts);
142+
if (timeoutMs === null) {
98143
return;
99144
}
100-
const timeoutMs = Math.max(DEFAULT_UNDICI_STREAM_TIMEOUT_MS, Math.floor(timeoutMsRaw));
101145
_globalUndiciStreamTimeoutMs = timeoutMs;
102146
if (!hasEnvHttpProxyAgentConfigured()) {
103147
lastAppliedTimeoutKey = null;
104148
return;
105149
}
106150
const runtime = loadUndiciGlobalDispatcherDeps();
107-
const { EnvHttpProxyAgent, setGlobalDispatcher } = runtime;
108151
const kind = resolveCurrentDispatcherKind(runtime);
109152
if (kind === null) {
110153
return;
@@ -113,25 +156,21 @@ export function ensureGlobalUndiciStreamTimeouts(opts?: { timeoutMs?: number }):
113156
return;
114157
}
115158

116-
const autoSelectFamily = resolveUndiciAutoSelectFamily();
117-
const nextKey = resolveDispatcherKey({ kind, timeoutMs, autoSelectFamily });
118-
if (lastAppliedTimeoutKey === nextKey) {
159+
applyGlobalDispatcherStreamTimeouts({ runtime, kind, timeoutMs });
160+
}
161+
162+
export function ensureGlobalUndiciDispatcherStreamTimeouts(opts?: { timeoutMs?: number }): void {
163+
const timeoutMs = resolveStreamTimeoutMs(opts);
164+
if (timeoutMs === null) {
119165
return;
120166
}
121-
122-
const connect = createUndiciAutoSelectFamilyConnectOptions(autoSelectFamily);
123-
try {
124-
const proxyOptions = {
125-
...resolveEnvHttpProxyAgentOptions(),
126-
bodyTimeout: timeoutMs,
127-
headersTimeout: timeoutMs,
128-
...(connect ? { connect } : {}),
129-
} as ConstructorParameters<UndiciGlobalDispatcherDeps["EnvHttpProxyAgent"]>[0];
130-
setGlobalDispatcher(new EnvHttpProxyAgent(proxyOptions));
131-
lastAppliedTimeoutKey = nextKey;
132-
} catch {
133-
// Best-effort hardening only.
167+
_globalUndiciStreamTimeoutMs = timeoutMs;
168+
const runtime = loadUndiciGlobalDispatcherDeps();
169+
const kind = resolveCurrentDispatcherKind(runtime);
170+
if (kind === null) {
171+
return;
134172
}
173+
applyGlobalDispatcherStreamTimeouts({ runtime, kind, timeoutMs });
135174
}
136175

137176
export function resetGlobalUndiciStreamTimeoutsForTests(): void {

0 commit comments

Comments
 (0)