Skip to content

Commit 01d908a

Browse files
committed
fix(cron): preserve deferred heartbeat target override
1 parent 4593eb8 commit 01d908a

4 files changed

Lines changed: 23 additions & 6 deletions

File tree

src/cron/service/state.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import type { CronConfig } from "../../config/types.cron.js";
2-
import type { HeartbeatRunResult } from "../../infra/heartbeat-wake.js";
2+
import type { HeartbeatRunResult, HeartbeatWakeRequest } from "../../infra/heartbeat-wake.js";
33
import type {
44
CronDeliveryStatus,
55
CronJob,
@@ -64,7 +64,7 @@ export type CronServiceDeps = {
6464
text: string,
6565
opts?: { agentId?: string; sessionKey?: string; contextKey?: string; trusted?: boolean },
6666
) => void;
67-
requestHeartbeatNow: (opts?: { reason?: string; agentId?: string; sessionKey?: string }) => void;
67+
requestHeartbeatNow: (opts?: HeartbeatWakeRequest) => void;
6868
runHeartbeatOnce?: (opts?: {
6969
reason?: string;
7070
agentId?: string;

src/cron/service/timer.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1220,6 +1220,7 @@ async function executeMainSessionCronJob(
12201220
reason,
12211221
agentId: job.agentId,
12221222
sessionKey: targetMainSessionKey,
1223+
heartbeat: { target: "last" },
12231224
});
12241225
return { status: "ok", summary: text };
12251226
}
@@ -1234,6 +1235,7 @@ async function executeMainSessionCronJob(
12341235
reason,
12351236
agentId: job.agentId,
12361237
sessionKey: targetMainSessionKey,
1238+
heartbeat: { target: "last" },
12371239
});
12381240
return { status: "ok", summary: text };
12391241
}
@@ -1256,6 +1258,7 @@ async function executeMainSessionCronJob(
12561258
reason: `cron:${job.id}`,
12571259
agentId: job.agentId,
12581260
sessionKey: targetMainSessionKey,
1261+
heartbeat: { target: "last" },
12591262
});
12601263
return { status: "ok", summary: text };
12611264
}

src/infra/heartbeat-runner.ts

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ import {
8686
areHeartbeatsEnabled,
8787
type HeartbeatRunResult,
8888
type HeartbeatWakeHandler,
89+
type HeartbeatWakeRequest,
8990
requestHeartbeatNow,
9091
setHeartbeatsEnabled,
9192
setHeartbeatWakeHandler,
@@ -1409,6 +1410,7 @@ export function startHeartbeatRunner(opts: {
14091410
const reason = params?.reason;
14101411
const requestedAgentId = params?.agentId ? normalizeAgentId(params.agentId) : undefined;
14111412
const requestedSessionKey = normalizeOptionalString(params?.sessionKey);
1413+
const requestedHeartbeat = params?.heartbeat;
14121414
const isInterval = reason === "interval";
14131415
const startedAt = Date.now();
14141416
const now = startedAt;
@@ -1428,7 +1430,7 @@ export function startHeartbeatRunner(opts: {
14281430
const res = await runOnce({
14291431
cfg: state.cfg,
14301432
agentId: targetAgent.agentId,
1431-
heartbeat: targetAgent.heartbeat,
1433+
heartbeat: requestedHeartbeat ?? targetAgent.heartbeat,
14321434
reason,
14331435
sessionKey: requestedSessionKey,
14341436
deps: { runtime: state.runtime },
@@ -1496,11 +1498,12 @@ export function startHeartbeatRunner(opts: {
14961498
}
14971499
};
14981500

1499-
const wakeHandler: HeartbeatWakeHandler = async (params) =>
1501+
const wakeHandler: HeartbeatWakeHandler = async (params: HeartbeatWakeRequest) =>
15001502
run({
15011503
reason: params.reason,
15021504
agentId: params.agentId,
15031505
sessionKey: params.sessionKey,
1506+
heartbeat: params.heartbeat,
15041507
});
15051508
const disposeWakeHandler = setHeartbeatWakeHandler(wakeHandler);
15061509
updateConfig(state.cfg);

src/infra/heartbeat-wake.ts

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,14 @@ export type HeartbeatRunResult =
1010
| { status: "skipped"; reason: string }
1111
| { status: "failed"; reason: string };
1212

13-
export type HeartbeatWakeHandler = (opts: {
13+
export type HeartbeatWakeRequest = {
1414
reason?: string;
1515
agentId?: string;
1616
sessionKey?: string;
17-
}) => Promise<HeartbeatRunResult>;
17+
heartbeat?: { target?: string };
18+
};
19+
20+
export type HeartbeatWakeHandler = (opts: HeartbeatWakeRequest) => Promise<HeartbeatRunResult>;
1821

1922
let heartbeatsEnabled = true;
2023

@@ -33,6 +36,7 @@ type PendingWakeReason = {
3336
requestedAt: number;
3437
agentId?: string;
3538
sessionKey?: string;
39+
heartbeat?: { target?: string };
3640
};
3741

3842
let handler: HeartbeatWakeHandler | null = null;
@@ -87,6 +91,7 @@ function queuePendingWakeReason(params?: {
8791
requestedAt?: number;
8892
agentId?: string;
8993
sessionKey?: string;
94+
heartbeat?: { target?: string };
9095
}) {
9196
const requestedAt = params?.requestedAt ?? Date.now();
9297
const normalizedReason = normalizeWakeReason(params?.reason);
@@ -102,6 +107,7 @@ function queuePendingWakeReason(params?: {
102107
requestedAt,
103108
agentId: normalizedAgentId,
104109
sessionKey: normalizedSessionKey,
110+
heartbeat: params?.heartbeat,
105111
};
106112
const previous = pendingWakes.get(wakeTargetKey);
107113
if (!previous) {
@@ -162,6 +168,7 @@ function schedule(coalesceMs: number, kind: WakeTimerKind = "normal") {
162168
reason: pendingWake.reason ?? undefined,
163169
...(pendingWake.agentId ? { agentId: pendingWake.agentId } : {}),
164170
...(pendingWake.sessionKey ? { sessionKey: pendingWake.sessionKey } : {}),
171+
...(pendingWake.heartbeat ? { heartbeat: pendingWake.heartbeat } : {}),
165172
};
166173
const res = await active(wakeOpts);
167174
if (res.status === "skipped" && res.reason === "requests-in-flight") {
@@ -170,6 +177,7 @@ function schedule(coalesceMs: number, kind: WakeTimerKind = "normal") {
170177
reason: pendingWake.reason ?? "retry",
171178
agentId: pendingWake.agentId,
172179
sessionKey: pendingWake.sessionKey,
180+
heartbeat: pendingWake.heartbeat,
173181
});
174182
schedule(DEFAULT_RETRY_MS, "retry");
175183
}
@@ -181,6 +189,7 @@ function schedule(coalesceMs: number, kind: WakeTimerKind = "normal") {
181189
reason: pendingWake.reason ?? "retry",
182190
agentId: pendingWake.agentId,
183191
sessionKey: pendingWake.sessionKey,
192+
heartbeat: pendingWake.heartbeat,
184193
});
185194
}
186195
schedule(DEFAULT_RETRY_MS, "retry");
@@ -241,11 +250,13 @@ export function requestHeartbeatNow(opts?: {
241250
coalesceMs?: number;
242251
agentId?: string;
243252
sessionKey?: string;
253+
heartbeat?: { target?: string };
244254
}) {
245255
queuePendingWakeReason({
246256
reason: opts?.reason,
247257
agentId: opts?.agentId,
248258
sessionKey: opts?.sessionKey,
259+
heartbeat: opts?.heartbeat,
249260
});
250261
schedule(opts?.coalesceMs ?? DEFAULT_COALESCE_MS, "normal");
251262
}

0 commit comments

Comments
 (0)