Skip to content

Commit 96dcbdd

Browse files
Galin IlievGalin Iliev
authored andcommitted
fix(cron): route global main cron wakes
1 parent 6f7bde4 commit 96dcbdd

2 files changed

Lines changed: 100 additions & 2 deletions

File tree

src/gateway/server-cron.test.ts

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,6 +384,92 @@ describe("buildGatewayCronService", () => {
384384
}
385385
});
386386

387+
it("routes global-scope main cron jobs through the global queue for queued wakes", async () => {
388+
const cfg = {
389+
...createCronConfig("server-cron-global-queued"),
390+
session: { mainKey: "main", scope: "global" },
391+
} as OpenClawConfig;
392+
loadConfigMock.mockReturnValue(cfg);
393+
394+
const state = buildGatewayCronService({
395+
cfg,
396+
deps: {} as CliDeps,
397+
broadcast: () => {},
398+
});
399+
try {
400+
const job = await state.cron.add({
401+
name: "global-queued",
402+
enabled: true,
403+
schedule: { kind: "at", at: new Date(1).toISOString() },
404+
sessionTarget: "main",
405+
wakeMode: "next-heartbeat",
406+
payload: { kind: "systemEvent", text: "hello global" },
407+
});
408+
409+
await state.cron.run(job.id, "force");
410+
411+
expect(callArg(enqueueSystemEventMock, 0, 0, "system event text")).toBe("hello global");
412+
const eventOptions = requireRecord(
413+
callArg(enqueueSystemEventMock, 0, 1, "system event options"),
414+
"options",
415+
);
416+
expect(eventOptions.sessionKey).toBe("global");
417+
const heartbeatRequest = requireRecord(
418+
callArg(requestHeartbeatMock, 0, 0, "heartbeat request"),
419+
"request",
420+
);
421+
expect(heartbeatRequest.agentId).toBe("main");
422+
expect(heartbeatRequest.sessionKey).toBe("global");
423+
} finally {
424+
state.cron.stop();
425+
}
426+
});
427+
428+
it("routes global-scope immediate main cron jobs through the global heartbeat lane", async () => {
429+
const cfg = {
430+
...createCronConfig("server-cron-global-now"),
431+
session: { mainKey: "main", scope: "global" },
432+
} as OpenClawConfig;
433+
loadConfigMock.mockReturnValue(cfg);
434+
435+
const state = buildGatewayCronService({
436+
cfg,
437+
deps: {} as CliDeps,
438+
broadcast: () => {},
439+
});
440+
try {
441+
const job = await state.cron.add({
442+
name: "global-now",
443+
enabled: true,
444+
schedule: { kind: "at", at: new Date(1).toISOString() },
445+
sessionTarget: "main",
446+
wakeMode: "now",
447+
payload: { kind: "systemEvent", text: "hello now" },
448+
});
449+
450+
await state.cron.run(job.id, "force");
451+
452+
const eventOptions = requireRecord(
453+
callArg(enqueueSystemEventMock, 0, 1, "system event options"),
454+
"options",
455+
);
456+
expect(eventOptions.sessionKey).toBe("global");
457+
const heartbeatRun = requireRecord(
458+
callArg(runHeartbeatOnceMock, 0, 0, "heartbeat run options"),
459+
"heartbeat run options",
460+
);
461+
expect(heartbeatRun.agentId).toBe("main");
462+
expect(heartbeatRun.sessionKey).toBe("global");
463+
expect(heartbeatRun.heartbeat).toEqual({
464+
target: "last",
465+
to: undefined,
466+
accountId: undefined,
467+
});
468+
} finally {
469+
state.cron.stop();
470+
}
471+
});
472+
387473
it("forwards heartbeat overrides through the cron wake adapter", () => {
388474
const cfg = createCronConfig("server-cron-heartbeat-override");
389475
loadConfigMock.mockReturnValue(cfg);

src/gateway/server-cron.ts

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,11 @@ import type {
3434
PluginHookGatewayCronService,
3535
PluginHookGatewayContext,
3636
} from "../plugins/hook-types.js";
37-
import { normalizeAgentId, toAgentStoreSessionKey } from "../routing/session-key.js";
37+
import {
38+
normalizeAgentId,
39+
resolveEventSessionKey,
40+
toAgentStoreSessionKey,
41+
} from "../routing/session-key.js";
3842
import { defaultRuntime } from "../runtime.js";
3943
import { parseAgentSessionKey } from "../sessions/session-key-utils.js";
4044
import {
@@ -228,13 +232,21 @@ export function buildGatewayCronService(params: {
228232
requestedAgentId ?? derivedAgentId,
229233
);
230234
const agentId = resolvedAgentId || undefined;
231-
const sessionKey = agentId
235+
const resolvedSessionKey = agentId
232236
? resolveCronSessionKey({
233237
runtimeConfig,
234238
agentId,
235239
requestedSessionKey,
236240
})
237241
: undefined;
242+
const sessionKey =
243+
resolvedSessionKey && runtimeConfig.session?.scope === "global"
244+
? resolveEventSessionKey(
245+
resolvedSessionKey,
246+
runtimeConfig.session?.mainKey,
247+
runtimeConfig.session?.scope,
248+
)
249+
: resolvedSessionKey;
238250
return { runtimeConfig, agentId, sessionKey };
239251
};
240252

0 commit comments

Comments
 (0)