Skip to content

Commit 556af3f

Browse files
authored
fix(cron): cancel timed-out runs before side effects (#22411) thanks @Takhoffman
Verified: - pnpm check - pnpm vitest run src/memory/qmd-manager.test.ts src/cron/service.issue-regressions.test.ts src/cron/isolated-agent.delivers-response-has-heartbeat-ok-but-includes.test.ts --maxWorkers=1 Co-authored-by: Takhoffman <781889+Takhoffman@users.noreply.github.com> Co-authored-by: Tak Hoffman <781889+Takhoffman@users.noreply.github.com>
1 parent 64b273a commit 556af3f

5 files changed

Lines changed: 195 additions & 8 deletions

File tree

src/agents/subagent-announce.ts

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -556,7 +556,11 @@ async function maybeQueueSubagentAnnounce(params: {
556556
triggerMessage: string;
557557
summaryLine?: string;
558558
requesterOrigin?: DeliveryContext;
559+
signal?: AbortSignal;
559560
}): Promise<"steered" | "queued" | "none"> {
561+
if (params.signal?.aborted) {
562+
return "none";
563+
}
560564
const { cfg, entry } = loadRequesterSessionEntry(params.requesterSessionKey);
561565
const canonicalKey = resolveRequesterStoreKey(cfg, params.requesterSessionKey);
562566
const sessionId = entry?.sessionId;
@@ -637,7 +641,14 @@ async function sendSubagentAnnounceDirectly(params: {
637641
completionDirectOrigin?: DeliveryContext;
638642
directOrigin?: DeliveryContext;
639643
requesterIsSubagent: boolean;
644+
signal?: AbortSignal;
640645
}): Promise<SubagentAnnounceDeliveryResult> {
646+
if (params.signal?.aborted) {
647+
return {
648+
delivered: false,
649+
path: "none",
650+
};
651+
}
641652
const cfg = loadConfig();
642653
const announceTimeoutMs = resolveSubagentAnnounceTimeoutMs(cfg);
643654
const canonicalRequesterSessionKey = resolveRequesterStoreKey(
@@ -691,6 +702,12 @@ async function sendSubagentAnnounceDirectly(params: {
691702
completionDirectOrigin?.threadId != null && completionDirectOrigin.threadId !== ""
692703
? String(completionDirectOrigin.threadId)
693704
: undefined;
705+
if (params.signal?.aborted) {
706+
return {
707+
delivered: false,
708+
path: "none",
709+
};
710+
}
694711
await callGateway({
695712
method: "send",
696713
params: {
@@ -717,6 +734,12 @@ async function sendSubagentAnnounceDirectly(params: {
717734
directOrigin?.threadId != null && directOrigin.threadId !== ""
718735
? String(directOrigin.threadId)
719736
: undefined;
737+
if (params.signal?.aborted) {
738+
return {
739+
delivered: false,
740+
path: "none",
741+
};
742+
}
720743
await callGateway({
721744
method: "agent",
722745
params: {
@@ -761,7 +784,14 @@ async function deliverSubagentAnnouncement(params: {
761784
completionRouteMode?: "bound" | "fallback" | "hook";
762785
spawnMode?: SpawnSubagentMode;
763786
directIdempotencyKey: string;
787+
signal?: AbortSignal;
764788
}): Promise<SubagentAnnounceDeliveryResult> {
789+
if (params.signal?.aborted) {
790+
return {
791+
delivered: false,
792+
path: "none",
793+
};
794+
}
765795
// Non-completion mode mirrors historical behavior: try queued/steered delivery first,
766796
// then (only if not queued) attempt direct delivery.
767797
if (!params.expectsCompletionMessage) {
@@ -771,6 +801,7 @@ async function deliverSubagentAnnouncement(params: {
771801
triggerMessage: params.triggerMessage,
772802
summaryLine: params.summaryLine,
773803
requesterOrigin: params.requesterOrigin,
804+
signal: params.signal,
774805
});
775806
const queued = queueOutcomeToDeliveryResult(queueOutcome);
776807
if (queued.delivered) {
@@ -791,6 +822,7 @@ async function deliverSubagentAnnouncement(params: {
791822
directOrigin: params.directOrigin,
792823
requesterIsSubagent: params.requesterIsSubagent,
793824
expectsCompletionMessage: params.expectsCompletionMessage,
825+
signal: params.signal,
794826
});
795827
if (direct.delivered || !params.expectsCompletionMessage) {
796828
return direct;
@@ -804,6 +836,7 @@ async function deliverSubagentAnnouncement(params: {
804836
triggerMessage: params.triggerMessage,
805837
summaryLine: params.summaryLine,
806838
requesterOrigin: params.requesterOrigin,
839+
signal: params.signal,
807840
});
808841
if (queueOutcome === "steered" || queueOutcome === "queued") {
809842
return queueOutcomeToDeliveryResult(queueOutcome);
@@ -956,6 +989,7 @@ export async function runSubagentAnnounceFlow(params: {
956989
announceType?: SubagentAnnounceType;
957990
expectsCompletionMessage?: boolean;
958991
spawnMode?: SpawnSubagentMode;
992+
signal?: AbortSignal;
959993
}): Promise<boolean> {
960994
let didAnnounce = false;
961995
const expectsCompletionMessage = params.expectsCompletionMessage === true;
@@ -1216,6 +1250,7 @@ export async function runSubagentAnnounceFlow(params: {
12161250
completionRouteMode: completionResolution.routeMode,
12171251
spawnMode: params.spawnMode,
12181252
directIdempotencyKey,
1253+
signal: params.signal,
12191254
});
12201255
didAnnounce = delivery.delivered;
12211256
if (!delivery.delivered && delivery.path === "direct" && delivery.error) {

src/cron/isolated-agent.delivers-response-has-heartbeat-ok-but-includes.test.ts

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,4 +133,56 @@ describe("runCronIsolatedAgentTurn", () => {
133133
expect(deps.sendMessageTelegram).not.toHaveBeenCalled();
134134
});
135135
});
136+
137+
it("skips structured outbound delivery when timeout abort is already set", async () => {
138+
await withTempCronHome(async (home) => {
139+
const storePath = await writeSessionStore(home, {
140+
lastProvider: "telegram",
141+
lastChannel: "telegram",
142+
lastTo: "123",
143+
});
144+
const deps: CliDeps = {
145+
sendMessageSlack: vi.fn(),
146+
sendMessageWhatsApp: vi.fn(),
147+
sendMessageTelegram: vi.fn().mockResolvedValue({
148+
messageId: "t1",
149+
chatId: "123",
150+
}),
151+
sendMessageDiscord: vi.fn(),
152+
sendMessageSignal: vi.fn(),
153+
sendMessageIMessage: vi.fn(),
154+
};
155+
const controller = new AbortController();
156+
controller.abort("cron: job execution timed out");
157+
158+
vi.mocked(runEmbeddedPiAgent).mockResolvedValue({
159+
payloads: [{ text: "HEARTBEAT_OK", mediaUrl: "https://example.com/img.png" }],
160+
meta: {
161+
durationMs: 5,
162+
agentMeta: { sessionId: "s", provider: "p", model: "m" },
163+
},
164+
});
165+
166+
const res = await runCronIsolatedAgentTurn({
167+
cfg: makeCfg(home, storePath),
168+
deps,
169+
job: {
170+
...makeJob({
171+
kind: "agentTurn",
172+
message: "do it",
173+
}),
174+
delivery: { mode: "announce", channel: "telegram", to: "123" },
175+
},
176+
message: "do it",
177+
sessionKey: "cron:job-1",
178+
signal: controller.signal,
179+
lane: "cron",
180+
});
181+
182+
expect(res.status).toBe("error");
183+
expect(res.error).toContain("timed out");
184+
expect(deps.sendMessageTelegram).not.toHaveBeenCalled();
185+
expect(runSubagentAnnounceFlow).not.toHaveBeenCalled();
186+
});
187+
});
136188
});

src/cron/isolated-agent/run.ts

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,10 +156,19 @@ export async function runCronIsolatedAgentTurn(params: {
156156
job: CronJob;
157157
message: string;
158158
abortSignal?: AbortSignal;
159+
signal?: AbortSignal;
159160
sessionKey: string;
160161
agentId?: string;
161162
lane?: string;
162163
}): Promise<RunCronAgentTurnResult> {
164+
const abortSignal = params.abortSignal ?? params.signal;
165+
const isAborted = () => abortSignal?.aborted === true;
166+
const abortReason = () => {
167+
const reason = abortSignal?.reason;
168+
return typeof reason === "string" && reason.trim()
169+
? reason.trim()
170+
: "cron: job execution timed out";
171+
};
163172
const isFastTestEnv = process.env.OPENCLAW_TEST_FAST === "1";
164173
const defaultAgentId = resolveDefaultAgentId(params.cfg);
165174
const requestedAgentId =
@@ -473,8 +482,8 @@ export async function runCronIsolatedAgentTurn(params: {
473482
agentDir,
474483
fallbacksOverride: resolveAgentModelFallbacksOverride(params.cfg, agentId),
475484
run: (providerOverride, modelOverride) => {
476-
if (params.abortSignal?.aborted) {
477-
throw new Error("cron: isolated run aborted");
485+
if (abortSignal?.aborted) {
486+
throw new Error(abortReason());
478487
}
479488
if (isCliProvider(providerOverride, cfgWithAgentDefaults)) {
480489
const cliSessionId = getCliSessionId(cronSession.sessionEntry, providerOverride);
@@ -517,7 +526,7 @@ export async function runCronIsolatedAgentTurn(params: {
517526
runId: cronSession.sessionEntry.sessionId,
518527
requireExplicitMessageTarget: true,
519528
disableMessageTool: deliveryRequested,
520-
abortSignal: params.abortSignal,
529+
abortSignal,
521530
});
522531
},
523532
});
@@ -529,6 +538,10 @@ export async function runCronIsolatedAgentTurn(params: {
529538
return withRunSession({ status: "error", error: String(err) });
530539
}
531540

541+
if (isAborted()) {
542+
return withRunSession({ status: "error", error: abortReason() });
543+
}
544+
532545
const payloads = runResult.payloads ?? [];
533546

534547
// Update token+model fields in the session store.
@@ -584,6 +597,10 @@ export async function runCronIsolatedAgentTurn(params: {
584597
}
585598
await persistSessionEntry();
586599
}
600+
601+
if (isAborted()) {
602+
return withRunSession({ status: "error", error: abortReason(), ...telemetry });
603+
}
587604
const firstText = payloads[0]?.text ?? "";
588605
let summary = pickSummaryFromPayloads(payloads) ?? pickSummaryFromOutput(firstText);
589606
let outputText = pickLastNonEmptyTextFromPayloads(payloads);
@@ -672,6 +689,9 @@ export async function runCronIsolatedAgentTurn(params: {
672689
? [{ text: synthesizedText }]
673690
: [];
674691
if (payloadsForDelivery.length > 0) {
692+
if (isAborted()) {
693+
return withRunSession({ status: "error", error: abortReason(), ...telemetry });
694+
}
675695
const deliveryResults = await deliverOutboundPayloads({
676696
cfg: cfgWithAgentDefaults,
677697
channel: resolvedDelivery.channel,
@@ -683,6 +703,7 @@ export async function runCronIsolatedAgentTurn(params: {
683703
identity,
684704
bestEffort: deliveryBestEffort,
685705
deps: createOutboundSendDeps(params.deps),
706+
abortSignal,
686707
});
687708
delivered = deliveryResults.length > 0;
688709
}
@@ -765,6 +786,9 @@ export async function runCronIsolatedAgentTurn(params: {
765786
return withRunSession({ status: "ok", summary, outputText, delivered: true, ...telemetry });
766787
}
767788
try {
789+
if (isAborted()) {
790+
return withRunSession({ status: "error", error: abortReason(), ...telemetry });
791+
}
768792
const didAnnounce = await runSubagentAnnounceFlow({
769793
childSessionKey: agentSessionKey,
770794
childRunId: `${params.job.id}:${runSessionId}`,
@@ -785,6 +809,7 @@ export async function runCronIsolatedAgentTurn(params: {
785809
endedAt: runEndedAt,
786810
outcome: { status: "ok" },
787811
announceType: "cron job",
812+
signal: abortSignal,
788813
});
789814
if (didAnnounce) {
790815
delivered = true;

src/cron/service.issue-regressions.test.ts

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -731,6 +731,60 @@ describe("Cron issue regressions", () => {
731731
expect(job?.state.lastError).toContain("timed out");
732732
});
733733

734+
it("suppresses isolated follow-up side effects after timeout", async () => {
735+
vi.useRealTimers();
736+
const store = await makeStorePath();
737+
const scheduledAt = Date.parse("2026-02-15T13:00:00.000Z");
738+
const enqueueSystemEvent = vi.fn();
739+
740+
const cronJob = createIsolatedRegressionJob({
741+
id: "timeout-side-effects",
742+
name: "timeout side effects",
743+
scheduledAt,
744+
schedule: { kind: "every", everyMs: 60_000, anchorMs: scheduledAt },
745+
payload: { kind: "agentTurn", message: "work", timeoutSeconds: 0.01 },
746+
state: { nextRunAtMs: scheduledAt },
747+
});
748+
await writeCronJobs(store.storePath, [cronJob]);
749+
750+
let now = scheduledAt;
751+
const state = createCronServiceState({
752+
cronEnabled: true,
753+
storePath: store.storePath,
754+
log: noopLogger,
755+
nowMs: () => now,
756+
enqueueSystemEvent,
757+
requestHeartbeatNow: vi.fn(),
758+
runIsolatedAgentJob: vi.fn(async (params) => {
759+
const abortSignal = params.abortSignal;
760+
await new Promise<void>((resolve, reject) => {
761+
const onAbort = () => {
762+
abortSignal?.removeEventListener("abort", onAbort);
763+
now += 100;
764+
reject(new Error("aborted"));
765+
};
766+
abortSignal?.addEventListener("abort", onAbort, { once: true });
767+
});
768+
return {
769+
status: "ok" as const,
770+
summary: "late-summary",
771+
delivered: false,
772+
error:
773+
abortSignal?.aborted && typeof abortSignal.reason === "string"
774+
? abortSignal.reason
775+
: undefined,
776+
};
777+
}),
778+
});
779+
780+
await onTimer(state);
781+
782+
const jobAfterTimeout = state.store?.jobs.find((j) => j.id === "timeout-side-effects");
783+
expect(jobAfterTimeout?.state.lastStatus).toBe("error");
784+
expect(jobAfterTimeout?.state.lastError).toContain("timed out");
785+
expect(enqueueSystemEvent).not.toHaveBeenCalled();
786+
});
787+
734788
it("applies timeoutSeconds to manual cron.run isolated executions", async () => {
735789
vi.useRealTimers();
736790
const store = await makeStorePath();

0 commit comments

Comments
 (0)