Skip to content

Commit acb9cbb

Browse files
0xRainElonitoTakhoffman
authored
fix(gateway): drain active turns before restart to prevent message loss (#13931)
* fix(gateway): drain active turns before restart to prevent message loss On SIGUSR1 restart, the gateway now waits up to 30s for in-flight agent turns to complete before tearing down the server. This prevents buffered messages from being dropped when config.patch or update triggers a restart while agents are mid-turn. Changes: - command-queue.ts: add getActiveTaskCount() and waitForActiveTasks() helpers to track and wait on active lane tasks - run-loop.ts: on restart signal, drain active tasks before server.close() with a 30s timeout; extend force-exit timer accordingly - command-queue.test.ts: update imports for new exports Fixes #13883 * fix(queue): snapshot active tasks for restart drain --------- Co-authored-by: Elonito <0xRaini@users.noreply.github.com> Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
1 parent f7e05d0 commit acb9cbb

3 files changed

Lines changed: 205 additions & 2 deletions

File tree

src/cli/gateway-cli/run-loop.ts

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
isGatewaySigusr1RestartExternallyAllowed,
77
} from "../../infra/restart.js";
88
import { createSubsystemLogger } from "../../logging/subsystem.js";
9+
import { getActiveTaskCount, waitForActiveTasks } from "../../process/command-queue.js";
910

1011
const gatewayLog = createSubsystemLogger("gateway");
1112

@@ -26,6 +27,9 @@ export async function runGatewayLoop(params: {
2627
process.removeListener("SIGUSR1", onSigusr1);
2728
};
2829

30+
const DRAIN_TIMEOUT_MS = 30_000;
31+
const SHUTDOWN_TIMEOUT_MS = 5_000;
32+
2933
const request = (action: GatewayRunSignalAction, signal: string) => {
3034
if (shuttingDown) {
3135
gatewayLog.info(`received ${signal} during shutdown; ignoring`);
@@ -35,14 +39,33 @@ export async function runGatewayLoop(params: {
3539
const isRestart = action === "restart";
3640
gatewayLog.info(`received ${signal}; ${isRestart ? "restarting" : "shutting down"}`);
3741

42+
// Allow extra time for draining active turns on restart.
43+
const forceExitMs = isRestart ? DRAIN_TIMEOUT_MS + SHUTDOWN_TIMEOUT_MS : SHUTDOWN_TIMEOUT_MS;
3844
const forceExitTimer = setTimeout(() => {
3945
gatewayLog.error("shutdown timed out; exiting without full cleanup");
4046
cleanupSignals();
4147
params.runtime.exit(0);
42-
}, 5000);
48+
}, forceExitMs);
4349

4450
void (async () => {
4551
try {
52+
// On restart, wait for in-flight agent turns to finish before
53+
// tearing down the server so buffered messages are delivered.
54+
if (isRestart) {
55+
const activeTasks = getActiveTaskCount();
56+
if (activeTasks > 0) {
57+
gatewayLog.info(
58+
`draining ${activeTasks} active task(s) before restart (timeout ${DRAIN_TIMEOUT_MS}ms)`,
59+
);
60+
const { drained } = await waitForActiveTasks(DRAIN_TIMEOUT_MS);
61+
if (drained) {
62+
gatewayLog.info("all active tasks drained");
63+
} else {
64+
gatewayLog.warn("drain timeout reached; proceeding with restart");
65+
}
66+
}
67+
}
68+
4669
await server?.close({
4770
reason: isRestart ? "gateway restarting" : "gateway stopping",
4871
restartExpectedMs: isRestart ? 1500 : null,

src/process/command-queue.test.ts

Lines changed: 110 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,14 @@ vi.mock("../logging/diagnostic.js", () => ({
1616
diagnosticLogger: diagnosticMocks.diag,
1717
}));
1818

19-
import { enqueueCommand, getQueueSize } from "./command-queue.js";
19+
import {
20+
enqueueCommand,
21+
enqueueCommandInLane,
22+
getActiveTaskCount,
23+
getQueueSize,
24+
setCommandLaneConcurrency,
25+
waitForActiveTasks,
26+
} from "./command-queue.js";
2027

2128
describe("command queue", () => {
2229
beforeEach(() => {
@@ -85,4 +92,106 @@ describe("command queue", () => {
8592
expect(waited as number).toBeGreaterThanOrEqual(5);
8693
expect(queuedAhead).toBe(0);
8794
});
95+
96+
it("getActiveTaskCount returns count of currently executing tasks", async () => {
97+
let resolve1!: () => void;
98+
const blocker = new Promise<void>((r) => {
99+
resolve1 = r;
100+
});
101+
102+
const task = enqueueCommand(async () => {
103+
await blocker;
104+
});
105+
106+
// Give the event loop a tick for the task to start.
107+
await new Promise((r) => setTimeout(r, 5));
108+
expect(getActiveTaskCount()).toBe(1);
109+
110+
resolve1();
111+
await task;
112+
expect(getActiveTaskCount()).toBe(0);
113+
});
114+
115+
it("waitForActiveTasks resolves immediately when no tasks are active", async () => {
116+
const { drained } = await waitForActiveTasks(1000);
117+
expect(drained).toBe(true);
118+
});
119+
120+
it("waitForActiveTasks waits for active tasks to finish", async () => {
121+
let resolve1!: () => void;
122+
const blocker = new Promise<void>((r) => {
123+
resolve1 = r;
124+
});
125+
126+
const task = enqueueCommand(async () => {
127+
await blocker;
128+
});
129+
130+
// Give the task a tick to start.
131+
await new Promise((r) => setTimeout(r, 5));
132+
133+
const drainPromise = waitForActiveTasks(5000);
134+
135+
// Resolve the blocker after a short delay.
136+
setTimeout(() => resolve1(), 50);
137+
138+
const { drained } = await drainPromise;
139+
expect(drained).toBe(true);
140+
141+
await task;
142+
});
143+
144+
it("waitForActiveTasks returns drained=false on timeout", async () => {
145+
let resolve1!: () => void;
146+
const blocker = new Promise<void>((r) => {
147+
resolve1 = r;
148+
});
149+
150+
const task = enqueueCommand(async () => {
151+
await blocker;
152+
});
153+
154+
await new Promise((r) => setTimeout(r, 5));
155+
156+
const { drained } = await waitForActiveTasks(50);
157+
expect(drained).toBe(false);
158+
159+
resolve1();
160+
await task;
161+
});
162+
163+
it("waitForActiveTasks ignores tasks that start after the call", async () => {
164+
const lane = `drain-snapshot-${Date.now()}-${Math.random().toString(16).slice(2)}`;
165+
setCommandLaneConcurrency(lane, 2);
166+
167+
let resolve1!: () => void;
168+
const blocker1 = new Promise<void>((r) => {
169+
resolve1 = r;
170+
});
171+
let resolve2!: () => void;
172+
const blocker2 = new Promise<void>((r) => {
173+
resolve2 = r;
174+
});
175+
176+
const first = enqueueCommandInLane(lane, async () => {
177+
await blocker1;
178+
});
179+
await new Promise((r) => setTimeout(r, 5));
180+
181+
const drainPromise = waitForActiveTasks(2000);
182+
183+
// Starts after waitForActiveTasks snapshot and should not block drain completion.
184+
const second = enqueueCommandInLane(lane, async () => {
185+
await blocker2;
186+
});
187+
await new Promise((r) => setTimeout(r, 5));
188+
expect(getActiveTaskCount()).toBeGreaterThanOrEqual(2);
189+
190+
resolve1();
191+
const { drained } = await drainPromise;
192+
expect(drained).toBe(true);
193+
194+
resolve2();
195+
await Promise.all([first, second]);
196+
});
88197
});

src/process/command-queue.ts

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,13 @@ type LaneState = {
1919
lane: string;
2020
queue: QueueEntry[];
2121
active: number;
22+
activeTaskIds: Set<number>;
2223
maxConcurrent: number;
2324
draining: boolean;
2425
};
2526

2627
const lanes = new Map<string, LaneState>();
28+
let nextTaskId = 1;
2729

2830
function getLaneState(lane: string): LaneState {
2931
const existing = lanes.get(lane);
@@ -34,6 +36,7 @@ function getLaneState(lane: string): LaneState {
3436
lane,
3537
queue: [],
3638
active: 0,
39+
activeTaskIds: new Set(),
3740
maxConcurrent: 1,
3841
draining: false,
3942
};
@@ -59,19 +62,23 @@ function drainLane(lane: string) {
5962
);
6063
}
6164
logLaneDequeue(lane, waitedMs, state.queue.length);
65+
const taskId = nextTaskId++;
6266
state.active += 1;
67+
state.activeTaskIds.add(taskId);
6368
void (async () => {
6469
const startTime = Date.now();
6570
try {
6671
const result = await entry.task();
6772
state.active -= 1;
73+
state.activeTaskIds.delete(taskId);
6874
diag.debug(
6975
`lane task done: lane=${lane} durationMs=${Date.now() - startTime} active=${state.active} queued=${state.queue.length}`,
7076
);
7177
pump();
7278
entry.resolve(result);
7379
} catch (err) {
7480
state.active -= 1;
81+
state.activeTaskIds.delete(taskId);
7582
const isProbeLane = lane.startsWith("auth-probe:") || lane.startsWith("session:probe-");
7683
if (!isProbeLane) {
7784
diag.error(
@@ -158,3 +165,67 @@ export function clearCommandLane(lane: string = CommandLane.Main) {
158165
state.queue.length = 0;
159166
return removed;
160167
}
168+
169+
/**
170+
* Returns the total number of actively executing tasks across all lanes
171+
* (excludes queued-but-not-started entries).
172+
*/
173+
export function getActiveTaskCount(): number {
174+
let total = 0;
175+
for (const s of lanes.values()) {
176+
total += s.active;
177+
}
178+
return total;
179+
}
180+
181+
/**
182+
* Wait for all currently active tasks across all lanes to finish.
183+
* Polls at a short interval; resolves when no tasks are active or
184+
* when `timeoutMs` elapses (whichever comes first).
185+
*
186+
* New tasks enqueued after this call are ignored — only tasks that are
187+
* already executing are waited on.
188+
*/
189+
export function waitForActiveTasks(timeoutMs: number): Promise<{ drained: boolean }> {
190+
const POLL_INTERVAL_MS = 250;
191+
const deadline = Date.now() + timeoutMs;
192+
const activeAtStart = new Set<number>();
193+
for (const state of lanes.values()) {
194+
for (const taskId of state.activeTaskIds) {
195+
activeAtStart.add(taskId);
196+
}
197+
}
198+
199+
return new Promise((resolve) => {
200+
const check = () => {
201+
if (activeAtStart.size === 0) {
202+
resolve({ drained: true });
203+
return;
204+
}
205+
206+
let hasPending = false;
207+
for (const state of lanes.values()) {
208+
for (const taskId of state.activeTaskIds) {
209+
if (activeAtStart.has(taskId)) {
210+
hasPending = true;
211+
break;
212+
}
213+
}
214+
if (hasPending) {
215+
break;
216+
}
217+
}
218+
219+
if (!hasPending) {
220+
resolve({ drained: true });
221+
return;
222+
}
223+
if (Date.now() >= deadline) {
224+
resolve({ drained: false });
225+
return;
226+
}
227+
setTimeout(check, POLL_INTERVAL_MS);
228+
};
229+
check();
230+
});
231+
}

0 commit comments

Comments
 (0)