Skip to content

Commit c0e97ae

Browse files
committed
fix: drain restart queues before sigusr1
1 parent 936c02e commit c0e97ae

25 files changed

Lines changed: 1307 additions & 61 deletions

extensions/discord/src/monitor/message-handler.ts

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ export function createDiscordMessageHandler(
153153
hasDiscordMessageStickers(message),
154154
});
155155
},
156-
onFlush: async (entries) => {
156+
onFlush: async (entries, context) => {
157157
const last = entries.at(-1);
158158
if (!last) {
159159
return;
@@ -187,7 +187,12 @@ export function createDiscordMessageHandler(
187187
}
188188
applyImplicitReplyBatchGate(ctx, params.replyToMode, false);
189189
queueAcceptedDiscordTypingCue(ctx);
190-
messageRunQueue.enqueue(buildDiscordInboundJob(ctx, { replayKeys }));
190+
const job = buildDiscordInboundJob(ctx, { replayKeys });
191+
if (context?.allowDuringGatewayDrain) {
192+
messageRunQueue.enqueueInternal(job);
193+
} else {
194+
messageRunQueue.enqueue(job);
195+
}
191196
return;
192197
}
193198
const combinedBaseText = entries
@@ -250,7 +255,12 @@ export function createDiscordMessageHandler(
250255
}
251256
}
252257
queueAcceptedDiscordTypingCue(ctx);
253-
messageRunQueue.enqueue(buildDiscordInboundJob(ctx, { replayKeys }));
258+
const job = buildDiscordInboundJob(ctx, { replayKeys });
259+
if (context?.allowDuringGatewayDrain) {
260+
messageRunQueue.enqueueInternal(job);
261+
} else {
262+
messageRunQueue.enqueue(job);
263+
}
254264
} catch (error) {
255265
if (error instanceof DiscordRetryableInboundError) {
256266
releaseDiscordInboundReplay({ replayKeys, error, replayGuard });
@@ -303,7 +313,10 @@ export function createDiscordMessageHandler(
303313
}
304314
};
305315

306-
handler.deactivate = messageRunQueue.deactivate;
316+
handler.deactivate = () => {
317+
debouncer.unregister();
318+
messageRunQueue.deactivate();
319+
};
307320

308321
return handler;
309322
}

extensions/discord/src/monitor/message-run-queue.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ type DiscordMessageRunQueueParams = {
2424

2525
type DiscordMessageRunQueue = {
2626
enqueue: (job: DiscordInboundJob) => void;
27+
enqueueInternal: (job: DiscordInboundJob) => void;
2728
deactivate: () => void;
2829
};
2930

@@ -96,6 +97,17 @@ export function createDiscordMessageRunQueue(
9697
});
9798
});
9899
},
100+
enqueueInternal(job) {
101+
const enqueue = runQueue.enqueueInternal ?? runQueue.enqueue;
102+
enqueue(job.queueKey, async ({ lifecycleSignal }) => {
103+
await processDiscordQueuedMessage({
104+
job,
105+
lifecycleSignal,
106+
replayGuard,
107+
testing: params.__testing,
108+
});
109+
});
110+
},
99111
deactivate: runQueue.deactivate,
100112
};
101113
}

extensions/feishu/src/monitor.reaction.test.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -697,7 +697,10 @@ describe("Feishu inbound debounce regressions", () => {
697697
enqueueMock(item);
698698
params.onError?.(new Error("dispatch failed"), [item]);
699699
},
700-
flushKey: async () => {},
700+
flushKey: async (_key: string) => {},
701+
flushKeyWithCount: async (_key: string) => ({ flushed: 0 }),
702+
flushAll: async () => 0,
703+
unregister: () => {},
701704
}),
702705
}),
703706
);

extensions/feishu/src/test-support/lifecycle-test-support.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,10 @@ function createImmediateInboundDebounce() {
8787
params.onError?.(err, [item]);
8888
}
8989
},
90-
flushKey: async () => {},
90+
flushKey: async (_key: string) => {},
91+
flushKeyWithCount: async (_key: string) => ({ flushed: 0 }),
92+
flushAll: async () => 0,
93+
unregister: () => {},
9194
}),
9295
};
9396
}

scripts/e2e/commitments-safety-docker-client.ts

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -151,11 +151,8 @@ async function verifyExtractionStoresMetadataOnly() {
151151

152152
const store = await loadCommitmentStore();
153153
assert(store.commitments.length === 1, `unexpected store size ${store.commitments.length}`);
154-
assert(!("sourceUserText" in store.commitments[0]!), "source user text was persisted");
155-
assert(
156-
!("sourceAssistantText" in store.commitments[0]!),
157-
"source assistant text was persisted",
158-
);
154+
assert(!("sourceUserText" in store.commitments[0]), "source user text was persisted");
155+
assert(!("sourceAssistantText" in store.commitments[0]), "source assistant text was persisted");
159156
const raw = await fs.readFile(resolveCommitmentStorePath(), "utf8");
160157
assert(!raw.includes("CALL_TOOL"), "raw source text leaked into commitment store");
161158
});
@@ -212,8 +209,8 @@ async function verifyLegacySourceIsPrunedOnDueRead() {
212209
nowMs,
213210
});
214211
assert(due.length === 1, `unexpected due count ${due.length}`);
215-
assert(!("sourceUserText" in due[0]!), "legacy source user text surfaced as due");
216-
assert(!("sourceAssistantText" in due[0]!), "legacy source assistant text surfaced as due");
212+
assert(!("sourceUserText" in due[0]), "legacy source user text surfaced as due");
213+
assert(!("sourceAssistantText" in due[0]), "legacy source assistant text surfaced as due");
217214
const raw = await fs.readFile(storePath, "utf8");
218215
assert(!raw.includes("CALL_TOOL"), "legacy source text remained after due read");
219216
});
@@ -273,9 +270,9 @@ async function verifyExpiryTransitionsAndStripsLegacySource() {
273270

274271
const store = await loadCommitmentStore();
275272
assert(store.commitments[0]?.status === "expired", "legacy commitment was not expired");
276-
assert(!("sourceUserText" in store.commitments[0]!), "legacy source user text was retained");
273+
assert(!("sourceUserText" in store.commitments[0]), "legacy source user text was retained");
277274
assert(
278-
!("sourceAssistantText" in store.commitments[0]!),
275+
!("sourceAssistantText" in store.commitments[0]),
279276
"legacy source assistant text was retained",
280277
);
281278
const raw = await fs.readFile(resolveCommitmentStorePath(), "utf8");

src/agents/pi-embedded-runner/run.ts

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -375,14 +375,20 @@ export async function runEmbeddedPiAgent(
375375
const sessionLane = resolveSessionLane(params.sessionKey?.trim() || params.sessionId);
376376
const globalLane = resolveGlobalLane(params.lane);
377377
const laneTaskTimeoutMs = resolveEmbeddedRunLaneTimeoutMs(params.timeoutMs);
378+
const withGatewayDrainOption = (
379+
opts?: CommandQueueEnqueueOptions,
380+
): CommandQueueEnqueueOptions | undefined =>
381+
params.allowGatewayDrain ? { ...opts, allowDuringGatewayDrain: true } : opts;
378382
const withLaneTimeout = (opts?: CommandQueueEnqueueOptions) =>
379-
withEmbeddedRunLaneTimeout(opts, laneTaskTimeoutMs);
383+
withEmbeddedRunLaneTimeout(withGatewayDrainOption(opts), laneTaskTimeoutMs);
380384
const enqueueGlobal = <T>(task: () => Promise<T>, opts?: CommandQueueEnqueueOptions) =>
381385
params.enqueue
382386
? params.enqueue(task, withLaneTimeout(opts))
383387
: enqueueCommandInLane(globalLane, task, withLaneTimeout(opts));
384388
const enqueueSession = <T>(task: () => Promise<T>, opts?: CommandQueueEnqueueOptions) =>
385-
params.enqueue ? params.enqueue(task, opts) : enqueueCommandInLane(sessionLane, task, opts);
389+
params.enqueue
390+
? params.enqueue(task, withGatewayDrainOption(opts))
391+
: enqueueCommandInLane(sessionLane, task, withGatewayDrainOption(opts));
386392
const channelHint = params.messageChannel ?? params.messageProvider;
387393
const resolvedToolResultFormat =
388394
params.toolResultFormat ??

src/agents/pi-embedded-runner/run/params.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,8 @@ export type RunEmbeddedPiAgentParams = {
100100
forceHeartbeatTool?: boolean;
101101
/** Allow runtime plugins for this run to late-bind the gateway subagent. */
102102
allowGatewaySubagentBinding?: boolean;
103+
/** Internal restart-drain path for work accepted before external ingress closed. */
104+
allowGatewayDrain?: boolean;
103105
sessionFile: string;
104106
workspaceDir: string;
105107
agentDir?: string;

0 commit comments

Comments
 (0)