Skip to content

Commit b74aa47

Browse files
committed
fix(heartbeat): honor ack policy for pending replay
1 parent cd212fe commit b74aa47

3 files changed

Lines changed: 136 additions & 5 deletions

File tree

src/auto-reply/reply/get-reply.fast-path.test.ts

Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,83 @@ describe("getReplyFromConfig fast test bootstrap", () => {
130130
expect(vi.mocked(runPreparedReplyMock)).toHaveBeenCalledOnce();
131131
});
132132

133+
it("clears stale ack-only heartbeat pending delivery before replay", async () => {
134+
const home = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-heartbeat-pending-clear-"));
135+
const storePath = path.join(home, "sessions.json");
136+
const sessionKey = "agent:main:telegram:123";
137+
await fs.writeFile(
138+
storePath,
139+
JSON.stringify({
140+
[sessionKey]: {
141+
sessionId: "pending-ack",
142+
updatedAt: Date.now(),
143+
pendingFinalDelivery: true,
144+
pendingFinalDeliveryText: "HEARTBEAT_OK",
145+
pendingFinalDeliveryCreatedAt: 1,
146+
pendingFinalDeliveryAttemptCount: 4,
147+
pendingFinalDeliveryLastError: null,
148+
},
149+
}),
150+
"utf8",
151+
);
152+
const cfg = withFastReplyConfig({
153+
agents: {
154+
defaults: {
155+
model: "openai/gpt-5.5",
156+
workspace: home,
157+
heartbeat: { ackMaxChars: 300 },
158+
},
159+
},
160+
session: { store: storePath },
161+
} as OpenClawConfig);
162+
163+
await expect(
164+
getReplyFromConfig(buildGetReplyCtx(), { isHeartbeat: true }, cfg),
165+
).resolves.toEqual({ text: "ok" });
166+
167+
const stored = JSON.parse(await fs.readFile(storePath, "utf8"))[sessionKey];
168+
expect(stored.pendingFinalDelivery).toBeUndefined();
169+
expect(stored.pendingFinalDeliveryText).toBeUndefined();
170+
expect(stored.pendingFinalDeliveryAttemptCount).toBeUndefined();
171+
});
172+
173+
it("uses ackMaxChars when replaying stale heartbeat pending delivery", async () => {
174+
const home = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-heartbeat-pending-replay-"));
175+
const storePath = path.join(home, "sessions.json");
176+
const sessionKey = "agent:main:telegram:123";
177+
await fs.writeFile(
178+
storePath,
179+
JSON.stringify({
180+
[sessionKey]: {
181+
sessionId: "pending-ack-with-remainder",
182+
updatedAt: Date.now(),
183+
pendingFinalDelivery: true,
184+
pendingFinalDeliveryText: "HEARTBEAT_OK short",
185+
},
186+
}),
187+
"utf8",
188+
);
189+
const cfg = withFastReplyConfig({
190+
agents: {
191+
defaults: {
192+
model: "openai/gpt-5.5",
193+
workspace: home,
194+
heartbeat: { ackMaxChars: 0 },
195+
},
196+
},
197+
session: { store: storePath },
198+
} as OpenClawConfig);
199+
200+
await expect(
201+
getReplyFromConfig(buildGetReplyCtx(), { isHeartbeat: true }, cfg),
202+
).resolves.toEqual({ text: "short" });
203+
204+
const stored = JSON.parse(await fs.readFile(storePath, "utf8"))[sessionKey];
205+
expect(stored.pendingFinalDelivery).toBe(true);
206+
expect(stored.pendingFinalDeliveryText).toBe("short");
207+
expect(stored.pendingFinalDeliveryAttemptCount).toBe(1);
208+
});
209+
133210
it("handles native /status before workspace bootstrap", async () => {
134211
const home = await fs.mkdtemp(path.join(os.tmpdir(), "openclaw-native-status-fast-"));
135212
const targetSessionKey = "agent:main:telegram:123";

src/auto-reply/reply/get-reply.ts

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import fs from "node:fs/promises";
22
import {
3+
resolveAgentConfig,
34
resolveAgentDir,
45
resolveAgentWorkspaceDir,
56
resolveSessionAgentId,
@@ -19,7 +20,7 @@ import { createLazyImportLoader } from "../../shared/lazy-promise.js";
1920
import { normalizeOptionalString } from "../../shared/string-coerce.js";
2021
import { normalizeStringEntries } from "../../shared/string-normalization.js";
2122
import type { GetReplyOptions } from "../get-reply-options.types.js";
22-
import { stripHeartbeatToken } from "../heartbeat.js";
23+
import { DEFAULT_HEARTBEAT_ACK_MAX_CHARS, stripHeartbeatToken } from "../heartbeat.js";
2324
import type { ReplyPayload } from "../reply-payload.js";
2425
import type { MsgContext } from "../templating.js";
2526
import { normalizeVerboseLevel } from "../thinking.js";
@@ -52,8 +53,25 @@ import { createTypingController } from "./typing.js";
5253

5354
type ResetCommandAction = "new" | "reset";
5455

55-
function isHeartbeatPendingFinalDeliveryEffectivelyEmpty(text: string): boolean {
56-
return stripHeartbeatToken(text, { mode: "heartbeat" }).shouldSkip;
56+
function classifyHeartbeatPendingFinalDelivery(text: string, ackMaxChars: number) {
57+
const stripped = stripHeartbeatToken(text, {
58+
mode: "heartbeat",
59+
maxAckChars: ackMaxChars,
60+
});
61+
return {
62+
shouldClear: stripped.shouldSkip,
63+
replayText: stripped.didStrip && stripped.text ? stripped.text : text,
64+
};
65+
}
66+
67+
function resolveHeartbeatAckMaxChars(cfg: OpenClawConfig, agentId: string): number {
68+
const agentHeartbeat = resolveAgentConfig(cfg, agentId)?.heartbeat;
69+
return Math.max(
70+
0,
71+
agentHeartbeat?.ackMaxChars ??
72+
cfg.agents?.defaults?.heartbeat?.ackMaxChars ??
73+
DEFAULT_HEARTBEAT_ACK_MAX_CHARS,
74+
);
5775
}
5876

5977
const sessionResetModelRuntimeLoader = createLazyImportLoader(
@@ -376,7 +394,11 @@ export async function getReplyFromConfig(
376394
// If it's a user message, we deliver the lost reply first, then continue.
377395
// For now, let's just return the lost reply if it's a heartbeat.
378396
if (opts?.isHeartbeat) {
379-
if (isHeartbeatPendingFinalDeliveryEffectivelyEmpty(text)) {
397+
const heartbeatPending = classifyHeartbeatPendingFinalDelivery(
398+
text,
399+
resolveHeartbeatAckMaxChars(cfg, agentId),
400+
);
401+
if (heartbeatPending.shouldClear) {
380402
sessionEntry.pendingFinalDelivery = undefined;
381403
sessionEntry.pendingFinalDeliveryText = undefined;
382404
sessionEntry.pendingFinalDeliveryCreatedAt = undefined;
@@ -409,6 +431,7 @@ export async function getReplyFromConfig(
409431
sessionEntry.pendingFinalDeliveryLastAttemptAt = updatedAt;
410432
sessionEntry.pendingFinalDeliveryAttemptCount = attemptCount;
411433
sessionEntry.pendingFinalDeliveryLastError = null;
434+
sessionEntry.pendingFinalDeliveryText = heartbeatPending.replayText;
412435
sessionEntry.updatedAt = updatedAt;
413436
if (sessionKey && sessionStore) {
414437
sessionStore[sessionKey] = sessionEntry;
@@ -419,14 +442,15 @@ export async function getReplyFromConfig(
419442
storePath,
420443
sessionKey,
421444
update: async () => ({
445+
pendingFinalDeliveryText: heartbeatPending.replayText,
422446
pendingFinalDeliveryLastAttemptAt: updatedAt,
423447
pendingFinalDeliveryAttemptCount: attemptCount,
424448
pendingFinalDeliveryLastError: null,
425449
updatedAt,
426450
}),
427451
});
428452
}
429-
return { text };
453+
return { text: heartbeatPending.replayText };
430454
}
431455
}
432456
}

src/infra/heartbeat-runner.skips-busy-session-lane.test.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -204,6 +204,7 @@ describe("heartbeat runner skips when target session lane is busy", () => {
204204
it("does not defer on a recent heartbeat ack pending final delivery", async () => {
205205
await withTempHeartbeatSandbox(async ({ storePath, replySpy }) => {
206206
const cfg = createHeartbeatTelegramConfig();
207+
cfg.session = { store: storePath };
207208
await seedMainSessionStore(storePath, cfg, {
208209
lastChannel: "telegram",
209210
lastProvider: "heartbeat",
@@ -228,6 +229,35 @@ describe("heartbeat runner skips when target session lane is busy", () => {
228229
});
229230
});
230231

232+
it("keeps deferring recent pending delivery when ackMaxChars makes the remainder real content", async () => {
233+
await withTempHeartbeatSandbox(async ({ storePath, replySpy }) => {
234+
const cfg = createHeartbeatTelegramConfig();
235+
cfg.session = { store: storePath };
236+
cfg.agents!.defaults!.heartbeat = { every: "30m", ackMaxChars: 0 };
237+
await seedMainSessionStore(storePath, cfg, {
238+
lastChannel: "telegram",
239+
lastProvider: "heartbeat",
240+
lastTo: "heartbeat",
241+
updatedAt: Date.now(),
242+
pendingFinalDelivery: true,
243+
pendingFinalDeliveryText: "HEARTBEAT_OK short",
244+
});
245+
replySpy.mockResolvedValue({ text: "HEARTBEAT_OK" });
246+
247+
const result = await runHeartbeatOnce({
248+
cfg,
249+
deps: {
250+
getQueueSize: vi.fn((_lane?: string) => 0),
251+
nowMs: () => Date.now(),
252+
getReplyFromConfig: replySpy,
253+
} as HeartbeatDeps,
254+
});
255+
256+
expect(result).toEqual({ status: "skipped", reason: HEARTBEAT_SKIP_REQUESTS_IN_FLIGHT });
257+
expect(replySpy).not.toHaveBeenCalled();
258+
});
259+
});
260+
231261
it("proceeds normally when session lane is idle", async () => {
232262
await withTempHeartbeatSandbox(async ({ storePath, replySpy }) => {
233263
const cfg = createHeartbeatTelegramConfig();

0 commit comments

Comments
 (0)