Skip to content

Commit a4f0e50

Browse files
snowzlmsnowzlmclawsweeper[bot]Takhoffman
authored
fix(gateway): preserve stale channel restart diagnostics (#90937)
Summary: - This PR sanitizes status patches from aborted channel tasks in the gateway manager and adds regression tests for stale restart diagnostics. - PR surface: Source +56, Tests +78. Total +134 across 2 files. - Reproducibility: yes. Source inspection and the PR's before-fix regression show the sequence: non-manual sto ... while the stale task remains, then a late `connected=true` / `lastError=null` status patch on current main. Automerge notes: - PR branch already contained follow-up commit before automerge: fix(gateway): preserve stale restart diagnostics - PR branch already contained follow-up commit before automerge: fix(gateway): preserve stale channel restart diagnostics Validation: - ClawSweeper review passed for head 53b37e5. - Required merge gates passed before the squash merge. Prepared head SHA: 53b37e5 Review: #90937 (comment) Co-authored-by: snowzlm <snowzlm@noreply.codeberg.org> Co-authored-by: clawsweeper <274271284+clawsweeper[bot]@users.noreply.github.com> Co-authored-by: clawsweeper[bot] <274271284+clawsweeper[bot]@users.noreply.github.com> Approved-by: takhoffman Co-authored-by: takhoffman <781889+takhoffman@users.noreply.github.com>
1 parent b8adc11 commit a4f0e50

2 files changed

Lines changed: 151 additions & 17 deletions

File tree

src/gateway/server-channels.test.ts

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -300,6 +300,40 @@ describe("server-channels auto restart", () => {
300300
expect(account?.lastError).toBeNull();
301301
});
302302

303+
it("lets stop hooks update status after aborting the running task", async () => {
304+
const startAccount = vi.fn(async (ctx: ChannelGatewayContext<TestAccount>) => {
305+
ctx.setStatus({
306+
accountId: DEFAULT_ACCOUNT_ID,
307+
running: true,
308+
connected: true,
309+
lastError: "startup warning",
310+
});
311+
await new Promise<void>((resolve) => {
312+
ctx.abortSignal.addEventListener("abort", () => resolve(), { once: true });
313+
});
314+
});
315+
const stopAccount = vi.fn(async (ctx: ChannelGatewayContext<TestAccount>) => {
316+
ctx.setStatus({
317+
accountId: DEFAULT_ACCOUNT_ID,
318+
connected: false,
319+
lastError: null,
320+
});
321+
});
322+
installTestRegistry(createTestPlugin({ startAccount, stopAccount }));
323+
const manager = createManager();
324+
325+
await manager.startChannels();
326+
await flushMicrotasks();
327+
await manager.stopChannel("discord", DEFAULT_ACCOUNT_ID);
328+
329+
const snapshot = manager.getRuntimeSnapshot();
330+
const account = snapshot.channelAccounts.discord?.[DEFAULT_ACCOUNT_ID];
331+
expect(stopAccount).toHaveBeenCalledTimes(1);
332+
expect(account?.running).toBe(false);
333+
expect(account?.connected).toBe(false);
334+
expect(account?.lastError).toBeNull();
335+
});
336+
303337
it("does not enumerate configured accounts when stopping a never-started channel", async () => {
304338
const listAccountIds = vi.fn(() => [DEFAULT_ACCOUNT_ID]);
305339
const resolveAccount = vi.fn(() => ({ enabled: true, configured: true }));
@@ -433,6 +467,50 @@ describe("server-channels auto restart", () => {
433467
expect(hoisted.sleepWithAbort).not.toHaveBeenCalled();
434468
});
435469

470+
it("keeps recovery timeout diagnostics when a stale task reports connected after abort", async () => {
471+
let emitLateStatus: (() => void) | undefined;
472+
const startAccount = vi.fn(async (ctx: ChannelGatewayContext<TestAccount>) => {
473+
ctx.setStatus({
474+
accountId: DEFAULT_ACCOUNT_ID,
475+
connected: true,
476+
lastError: null,
477+
});
478+
await new Promise<void>(() => {
479+
ctx.abortSignal.addEventListener(
480+
"abort",
481+
() => {
482+
emitLateStatus = () =>
483+
ctx.setStatus({
484+
accountId: DEFAULT_ACCOUNT_ID,
485+
connected: true,
486+
lastError: null,
487+
});
488+
},
489+
{ once: true },
490+
);
491+
});
492+
});
493+
installTestRegistry(createTestPlugin({ startAccount }));
494+
const manager = createManager();
495+
496+
await manager.startChannels();
497+
const recoveryStopTask = manager.stopChannel("discord", DEFAULT_ACCOUNT_ID, {
498+
manual: false,
499+
});
500+
await vi.advanceTimersByTimeAsync(5_000);
501+
await recoveryStopTask;
502+
await manager.startChannel("discord", DEFAULT_ACCOUNT_ID);
503+
504+
emitLateStatus?.();
505+
const account = manager.getRuntimeSnapshot().channelAccounts.discord?.[DEFAULT_ACCOUNT_ID];
506+
expect(startAccount).toHaveBeenCalledTimes(1);
507+
expect(account?.running).toBe(false);
508+
expect(account?.connected).toBe(false);
509+
expect(account?.restartPending).toBe(true);
510+
expect(account?.reconnectAttempts).toBe(0);
511+
expect(account?.lastError).toContain("channel stop timed out");
512+
});
513+
436514
it("restarts immediately when recovery stop timeout settles with an error", async () => {
437515
const rejectFirstTask = createDeferred();
438516
let startCount = 0;

src/gateway/server-channels.ts

Lines changed: 73 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,37 @@ type ChannelRuntimeStore = {
5252
runtimes: Map<string, ChannelAccountSnapshot>;
5353
};
5454

55+
function sanitizeAbortedTaskStatusPatch(
56+
patch: ChannelAccountSnapshot,
57+
current: ChannelAccountSnapshot,
58+
): ChannelAccountSnapshot {
59+
const next = { ...patch };
60+
delete next.running;
61+
delete next.restartPending;
62+
delete next.reconnectAttempts;
63+
delete next.lastStartAt;
64+
delete next.lastStopAt;
65+
66+
// A stale task may still emit a late "connected" heartbeat after the gateway
67+
// has already aborted it and marked restart recovery pending. Do not let that
68+
// old task make the stopped runtime look connected again.
69+
if (next.connected === true) {
70+
delete next.connected;
71+
delete next.lastConnectedAt;
72+
delete next.lastEventAt;
73+
delete next.lastTransportActivityAt;
74+
}
75+
76+
// Preserve actionable lifecycle diagnostics (for example a stop-timeout
77+
// recovery error) against late stale-task status patches that merely clear
78+
// plugin transport errors.
79+
if (next.lastError === null && current.lastError) {
80+
delete next.lastError;
81+
}
82+
83+
return next;
84+
}
85+
5586
type HealthMonitorConfig = {
5687
healthMonitor?: {
5788
enabled?: boolean;
@@ -336,6 +367,32 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage
336367
return next;
337368
};
338369

370+
const setRuntimeFromTaskStatus = (
371+
channelId: ChannelId,
372+
accountId: string,
373+
patch: ChannelAccountSnapshot,
374+
abortSignal?: AbortSignal,
375+
): ChannelAccountSnapshot => {
376+
const safePatch = abortSignal?.aborted
377+
? sanitizeAbortedTaskStatusPatch(patch, getRuntime(channelId, accountId))
378+
: patch;
379+
return setRuntime(channelId, accountId, safePatch);
380+
};
381+
382+
const setStoppedRuntime = (
383+
channelId: ChannelId,
384+
accountId: string,
385+
patch: Omit<ChannelAccountSnapshot, "accountId" | "running"> = {},
386+
): ChannelAccountSnapshot => {
387+
const current = getRuntime(channelId, accountId);
388+
return setRuntime(channelId, accountId, {
389+
accountId,
390+
running: false,
391+
...(typeof current.connected === "boolean" ? { connected: false } : {}),
392+
...patch,
393+
});
394+
};
395+
339396
const getChannelRuntime = async (): Promise<PluginRuntimeChannel | undefined> => {
340397
if (channelRuntime) {
341398
return channelRuntime;
@@ -492,9 +549,7 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage
492549
}
493550

494551
if (abort.signal.aborted || manuallyStopped.has(rKey)) {
495-
setRuntime(channelId, id, {
496-
accountId: id,
497-
running: false,
552+
setStoppedRuntime(channelId, id, {
498553
restartPending: false,
499554
lastStopAt: Date.now(),
500555
});
@@ -559,7 +614,7 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage
559614
abortSignal: abort.signal,
560615
log,
561616
getStatus: () => getRuntime(channelId, id),
562-
setStatus: (next) => setRuntime(channelId, id, next),
617+
setStatus: (next) => setRuntimeFromTaskStatus(channelId, id, next, abort.signal),
563618
...(channelRuntimeForTask ? { channelRuntime: channelRuntimeForTask } : {}),
564619
});
565620
const routeRegistry = getPluginHttpRouteRegistry?.();
@@ -588,9 +643,7 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage
588643
})
589644
.then(async () => {
590645
await cleanupTaskScopedApprovalRuntime("channel cleanup failed");
591-
setRuntime(channelId, id, {
592-
accountId: id,
593-
running: false,
646+
setStoppedRuntime(channelId, id, {
594647
lastStopAt: Date.now(),
595648
});
596649
})
@@ -689,9 +742,7 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage
689742
store.tasks.set(id, trackedPromise);
690743
} catch (error) {
691744
if (!handedOffTask) {
692-
setRuntime(channelId, id, {
693-
accountId: id,
694-
running: false,
745+
setStoppedRuntime(channelId, id, {
695746
restartPending: false,
696747
lastError: formatErrorMessage(error),
697748
});
@@ -785,12 +836,19 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage
785836
log.warn?.(
786837
`[${id}] channel stop exceeded ${CHANNEL_STOP_ABORT_TIMEOUT_MS}ms after abort; continuing shutdown`,
787838
);
788-
setRuntime(channelId, id, {
789-
accountId: id,
790-
running: manual,
839+
const stoppedPatch = {
791840
restartPending: !manual,
792841
lastError: `channel stop timed out after ${CHANNEL_STOP_ABORT_TIMEOUT_MS}ms`,
793-
});
842+
};
843+
if (manual) {
844+
setRuntime(channelId, id, {
845+
accountId: id,
846+
running: true,
847+
...stoppedPatch,
848+
});
849+
} else {
850+
setStoppedRuntime(channelId, id, stoppedPatch);
851+
}
794852
if (!manual) {
795853
recoveryStopTimedOut.add(rKey);
796854
}
@@ -800,9 +858,7 @@ export function createChannelManager(opts: ChannelManagerOptions): ChannelManage
800858
recoveryStartRequested.delete(rKey);
801859
store.aborts.delete(id);
802860
store.tasks.delete(id);
803-
setRuntime(channelId, id, {
804-
accountId: id,
805-
running: false,
861+
setStoppedRuntime(channelId, id, {
806862
restartPending: false,
807863
lastStopAt: Date.now(),
808864
});

0 commit comments

Comments
 (0)