Skip to content

Commit 064a307

Browse files
vignesh07steipete
authored andcommitted
Heartbeat: queue pending wakes per target
1 parent a7c25f2 commit 064a307

2 files changed

Lines changed: 88 additions & 36 deletions

File tree

src/infra/heartbeat-wake.test.ts

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -279,4 +279,41 @@ describe("heartbeat-wake", () => {
279279
sessionKey: "agent:ops:discord:channel:alerts",
280280
});
281281
});
282+
283+
it("executes distinct targeted wakes queued in the same coalescing window", async () => {
284+
vi.useFakeTimers();
285+
const handler = vi.fn().mockResolvedValue({ status: "ran", durationMs: 1 });
286+
setHeartbeatWakeHandler(handler);
287+
288+
requestHeartbeatNow({
289+
reason: "cron:job-a",
290+
agentId: "ops",
291+
sessionKey: "agent:ops:discord:channel:alerts",
292+
coalesceMs: 100,
293+
});
294+
requestHeartbeatNow({
295+
reason: "cron:job-b",
296+
agentId: "main",
297+
sessionKey: "agent:main:telegram:group:-1001",
298+
coalesceMs: 100,
299+
});
300+
301+
await vi.advanceTimersByTimeAsync(100);
302+
303+
expect(handler).toHaveBeenCalledTimes(2);
304+
expect(handler.mock.calls.map((call) => call[0])).toEqual(
305+
expect.arrayContaining([
306+
{
307+
reason: "cron:job-a",
308+
agentId: "ops",
309+
sessionKey: "agent:ops:discord:channel:alerts",
310+
},
311+
{
312+
reason: "cron:job-b",
313+
agentId: "main",
314+
sessionKey: "agent:main:telegram:group:-1001",
315+
},
316+
]),
317+
);
318+
});
282319
});

src/infra/heartbeat-wake.ts

Lines changed: 51 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ type PendingWakeReason = {
2020

2121
let handler: HeartbeatWakeHandler | null = null;
2222
let handlerGeneration = 0;
23-
let pendingWake: PendingWakeReason | null = null;
23+
const pendingWakes = new Map<string, PendingWakeReason>();
2424
let scheduled = false;
2525
let running = false;
2626
let timer: NodeJS.Timeout | null = null;
@@ -67,6 +67,12 @@ function normalizeWakeTarget(value?: string): string | undefined {
6767
return trimmed || undefined;
6868
}
6969

70+
function getWakeTargetKey(params: { agentId?: string; sessionKey?: string }) {
71+
const agentId = normalizeWakeTarget(params.agentId);
72+
const sessionKey = normalizeWakeTarget(params.sessionKey);
73+
return `${agentId ?? ""}::${sessionKey ?? ""}`;
74+
}
75+
7076
function queuePendingWakeReason(params?: {
7177
reason?: string;
7278
requestedAt?: number;
@@ -75,23 +81,30 @@ function queuePendingWakeReason(params?: {
7581
}) {
7682
const requestedAt = params?.requestedAt ?? Date.now();
7783
const normalizedReason = normalizeWakeReason(params?.reason);
84+
const normalizedAgentId = normalizeWakeTarget(params?.agentId);
85+
const normalizedSessionKey = normalizeWakeTarget(params?.sessionKey);
86+
const wakeTargetKey = getWakeTargetKey({
87+
agentId: normalizedAgentId,
88+
sessionKey: normalizedSessionKey,
89+
});
7890
const next: PendingWakeReason = {
7991
reason: normalizedReason,
8092
priority: resolveReasonPriority(normalizedReason),
8193
requestedAt,
82-
agentId: normalizeWakeTarget(params?.agentId),
83-
sessionKey: normalizeWakeTarget(params?.sessionKey),
94+
agentId: normalizedAgentId,
95+
sessionKey: normalizedSessionKey,
8496
};
85-
if (!pendingWake) {
86-
pendingWake = next;
97+
const previous = pendingWakes.get(wakeTargetKey);
98+
if (!previous) {
99+
pendingWakes.set(wakeTargetKey, next);
87100
return;
88101
}
89-
if (next.priority > pendingWake.priority) {
90-
pendingWake = next;
102+
if (next.priority > previous.priority) {
103+
pendingWakes.set(wakeTargetKey, next);
91104
return;
92105
}
93-
if (next.priority === pendingWake.priority && next.requestedAt >= pendingWake.requestedAt) {
94-
pendingWake = next;
106+
if (next.priority === previous.priority && next.requestedAt >= previous.requestedAt) {
107+
pendingWakes.set(wakeTargetKey, next);
95108
}
96109
}
97110

@@ -131,38 +144,40 @@ function schedule(coalesceMs: number, kind: WakeTimerKind = "normal") {
131144
return;
132145
}
133146

134-
const reason = pendingWake?.reason;
135-
const agentId = pendingWake?.agentId;
136-
const sessionKey = pendingWake?.sessionKey;
137-
pendingWake = null;
147+
const pendingBatch = Array.from(pendingWakes.values());
148+
pendingWakes.clear();
138149
running = true;
139150
try {
140-
const wakeOpts = {
141-
reason: reason ?? undefined,
142-
...(agentId ? { agentId } : {}),
143-
...(sessionKey ? { sessionKey } : {}),
144-
};
145-
const res = await active(wakeOpts);
146-
if (res.status === "skipped" && res.reason === "requests-in-flight") {
147-
// The main lane is busy; retry soon.
148-
queuePendingWakeReason({
149-
reason: reason ?? "retry",
150-
agentId,
151-
sessionKey,
152-
});
153-
schedule(DEFAULT_RETRY_MS, "retry");
151+
for (const pendingWake of pendingBatch) {
152+
const wakeOpts = {
153+
reason: pendingWake.reason ?? undefined,
154+
...(pendingWake.agentId ? { agentId: pendingWake.agentId } : {}),
155+
...(pendingWake.sessionKey ? { sessionKey: pendingWake.sessionKey } : {}),
156+
};
157+
const res = await active(wakeOpts);
158+
if (res.status === "skipped" && res.reason === "requests-in-flight") {
159+
// The main lane is busy; retry this wake target soon.
160+
queuePendingWakeReason({
161+
reason: pendingWake.reason ?? "retry",
162+
agentId: pendingWake.agentId,
163+
sessionKey: pendingWake.sessionKey,
164+
});
165+
schedule(DEFAULT_RETRY_MS, "retry");
166+
}
154167
}
155168
} catch {
156169
// Error is already logged by the heartbeat runner; schedule a retry.
157-
queuePendingWakeReason({
158-
reason: reason ?? "retry",
159-
agentId,
160-
sessionKey,
161-
});
170+
for (const pendingWake of pendingBatch) {
171+
queuePendingWakeReason({
172+
reason: pendingWake.reason ?? "retry",
173+
agentId: pendingWake.agentId,
174+
sessionKey: pendingWake.sessionKey,
175+
});
176+
}
162177
schedule(DEFAULT_RETRY_MS, "retry");
163178
} finally {
164179
running = false;
165-
if (pendingWake || scheduled) {
180+
if (pendingWakes.size > 0 || scheduled) {
166181
schedule(delay, "normal");
167182
}
168183
}
@@ -197,7 +212,7 @@ export function setHeartbeatWakeHandler(next: HeartbeatWakeHandler | null): () =
197212
running = false;
198213
scheduled = false;
199214
}
200-
if (handler && pendingWake) {
215+
if (handler && pendingWakes.size > 0) {
201216
schedule(DEFAULT_COALESCE_MS, "normal");
202217
}
203218
return () => {
@@ -231,7 +246,7 @@ export function hasHeartbeatWakeHandler() {
231246
}
232247

233248
export function hasPendingHeartbeatWake() {
234-
return pendingWake !== null || Boolean(timer) || scheduled;
249+
return pendingWakes.size > 0 || Boolean(timer) || scheduled;
235250
}
236251

237252
export function resetHeartbeatWakeStateForTests() {
@@ -241,7 +256,7 @@ export function resetHeartbeatWakeStateForTests() {
241256
timer = null;
242257
timerDueAt = null;
243258
timerKind = null;
244-
pendingWake = null;
259+
pendingWakes.clear();
245260
scheduled = false;
246261
running = false;
247262
handlerGeneration += 1;

0 commit comments

Comments
 (0)