Skip to content

Commit 6c00c17

Browse files
committed
fix(gateway): keep in-process dispatch timeout budget
1 parent 2b66fd1 commit 6c00c17

2 files changed

Lines changed: 84 additions & 15 deletions

File tree

src/gateway/server-plugins.test.ts

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -804,6 +804,62 @@ describe("loadGatewayPlugins", () => {
804804
).resolves.toEqual({ status: "accepted", runId: "run-accepted" });
805805
});
806806

807+
test("uses one timeout budget across accepted and final in-process responses", async () => {
808+
vi.useFakeTimers();
809+
try {
810+
serverPluginsModule.setFallbackGatewayContext(createTestContext("single-final-deadline"));
811+
handleGatewayRequest.mockImplementationOnce(async (opts: HandleGatewayRequestOptions) => {
812+
setTimeout(() => {
813+
opts.respond(true, { status: "accepted", runId: "run-deadline" });
814+
}, 7);
815+
setTimeout(() => {
816+
opts.respond(true, { status: "ok", runId: "run-deadline" });
817+
}, 13);
818+
await new Promise((resolve) => setTimeout(resolve, 13));
819+
});
820+
821+
const result = expect(
822+
serverPluginsModule.dispatchGatewayMethodInProcess(
823+
"agent",
824+
{ sessionKey: "s-deadline" },
825+
{ expectFinal: true, timeoutMs: 10 },
826+
),
827+
).rejects.toThrow("gateway request timeout for agent");
828+
829+
await vi.advanceTimersByTimeAsync(10);
830+
await result;
831+
await vi.advanceTimersByTimeAsync(10);
832+
} finally {
833+
vi.useRealTimers();
834+
}
835+
});
836+
837+
test("clears final-response timeout when handler rejects after accepted response", async () => {
838+
vi.useFakeTimers();
839+
try {
840+
serverPluginsModule.setFallbackGatewayContext(createTestContext("accepted-then-error"));
841+
handleGatewayRequest.mockImplementationOnce(async (opts: HandleGatewayRequestOptions) => {
842+
opts.respond(true, { status: "accepted", runId: "run-error-after-accepted" });
843+
await new Promise((resolve) => setTimeout(resolve, 5));
844+
throw new Error("handler failed after accepted");
845+
});
846+
847+
const result = expect(
848+
serverPluginsModule.dispatchGatewayMethodInProcess(
849+
"agent",
850+
{ sessionKey: "s-error-after-accepted" },
851+
{ expectFinal: true, timeoutMs: 1_000 },
852+
),
853+
).rejects.toThrow("handler failed after accepted");
854+
855+
await vi.advanceTimersByTimeAsync(5);
856+
await result;
857+
expect(vi.getTimerCount()).toBe(0);
858+
} finally {
859+
vi.useRealTimers();
860+
}
861+
});
862+
807863
test("filters connected plugin nodes locally without sending unsupported node.list params", async () => {
808864
loadOpenClawPlugins.mockReturnValue(createRegistry([]));
809865
loadGatewayStartupPluginsForTest();

src/gateway/server-plugins.ts

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -340,13 +340,24 @@ function resolveInProcessDispatchTimeoutMs(timeoutMs?: number): number | undefin
340340
: undefined;
341341
}
342342

343+
function resolveInProcessDispatchDeadlineMs(timeoutMs?: number): number | undefined {
344+
const safeTimeoutMs = resolveInProcessDispatchTimeoutMs(timeoutMs);
345+
return safeTimeoutMs === undefined ? undefined : Date.now() + safeTimeoutMs;
346+
}
347+
348+
function resolveRemainingInProcessDispatchTimeoutMs(deadlineMs?: number): number | undefined {
349+
return deadlineMs === undefined
350+
? undefined
351+
: resolveSafeTimeoutDelayMs(deadlineMs - Date.now(), { minMs: 0 });
352+
}
353+
343354
async function waitForInProcessDispatch<T>(
344355
method: string,
345356
promise: Promise<T>,
346-
timeoutMs?: number,
357+
deadlineMs?: number,
347358
): Promise<T> {
348-
const safeTimeoutMs = resolveInProcessDispatchTimeoutMs(timeoutMs);
349-
if (safeTimeoutMs === undefined) {
359+
const remainingTimeoutMs = resolveRemainingInProcessDispatchTimeoutMs(deadlineMs);
360+
if (remainingTimeoutMs === undefined) {
350361
return await promise;
351362
}
352363
let timeout: NodeJS.Timeout | undefined;
@@ -356,7 +367,7 @@ async function waitForInProcessDispatch<T>(
356367
new Promise<never>((_resolve, reject) => {
357368
timeout = setTimeout(() => {
358369
reject(new Error(`gateway request timeout for ${method}`));
359-
}, safeTimeoutMs);
370+
}, remainingTimeoutMs);
360371
}),
361372
]);
362373
} finally {
@@ -396,6 +407,7 @@ export async function dispatchGatewayMethodInProcessRaw(
396407
resolveFirstResponse = resolve;
397408
rejectFirstResponse = reject;
398409
});
410+
const deadlineMs = resolveInProcessDispatchDeadlineMs(options?.timeoutMs);
399411
const { handleGatewayRequest } = await import("./server-methods.js");
400412
const pluginRuntimeOwnerId =
401413
typeof options?.pluginRuntimeOwnerId === "string" && options.pluginRuntimeOwnerId.trim()
@@ -462,7 +474,7 @@ export async function dispatchGatewayMethodInProcessRaw(
462474
rejectFinalResponse?.(error);
463475
});
464476

465-
firstResponse = await waitForInProcessDispatch(method, firstResponsePromise, options?.timeoutMs);
477+
firstResponse = await waitForInProcessDispatch(method, firstResponsePromise, deadlineMs);
466478
const firstPayload = firstResponse.payload as { status?: unknown } | undefined;
467479
if (options?.expectFinal !== true || firstPayload?.status !== "accepted") {
468480
return firstResponse;
@@ -474,32 +486,33 @@ export async function dispatchGatewayMethodInProcessRaw(
474486
finalResponse ??
475487
(await new Promise<GatewayMethodDispatchResponse>((resolve, reject) => {
476488
resolveFinalResponse = resolve;
477-
rejectFinalResponse = reject;
478-
const timeoutMs = resolveInProcessDispatchTimeoutMs(options.timeoutMs);
489+
const timeoutMs = resolveRemainingInProcessDispatchTimeoutMs(deadlineMs);
479490
const timeout =
480491
timeoutMs === undefined
481492
? undefined
482493
: setTimeout(() => {
483494
reject(new Error(`gateway request timeout for ${method}`));
484495
}, timeoutMs);
485-
if (postFirstResponseError) {
496+
const clearFinalTimeout = () => {
486497
if (timeout) {
487498
clearTimeout(timeout);
488499
}
489-
reject(postFirstResponseError);
500+
};
501+
rejectFinalResponse = (err) => {
502+
clearFinalTimeout();
503+
reject(err);
504+
};
505+
if (postFirstResponseError) {
506+
rejectFinalResponse(postFirstResponseError);
490507
return;
491508
}
492509
if (finalResponse) {
493-
if (timeout) {
494-
clearTimeout(timeout);
495-
}
510+
clearFinalTimeout();
496511
resolve(finalResponse);
497512
return;
498513
}
499514
resolveFinalResponse = (response) => {
500-
if (timeout) {
501-
clearTimeout(timeout);
502-
}
515+
clearFinalTimeout();
503516
resolve(response);
504517
};
505518
}));

0 commit comments

Comments
 (0)