Skip to content

Commit e2cc8a7

Browse files
committed
fix: tighten system event dedupe identity
1 parent ca10200 commit e2cc8a7

3 files changed

Lines changed: 143 additions & 5 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ Docs: https://docs.openclaw.ai
2121

2222
### Fixes
2323

24+
- System events: dedupe keyed events across the queue while preserving unkeyed, delivery-route, and trust-boundary event identity. (#73040) Thanks @statxc.
2425
- Agents/UI: compact exec and tool progress rows by hiding redundant shell tool names, replacing known workspace paths with short context markers, and preserving Discord trace scrubbing for compact command lines.
2526
- ACPX: run and await the embedded ACP backend startup probe by default so the gateway `ready` signal no longer fires before the acpx runtime has either become usable or reported a probe failure; set `OPENCLAW_ACPX_RUNTIME_STARTUP_PROBE=0` to restore lazy startup. Fixes #79596. Thanks @bzelones.
2627
- OpenAI-compatible models: strip prior assistant reasoning fields from replayed Chat Completions history by default, preventing oMLX/vLLM Qwen follow-up turns from rejecting or stalling on stale `reasoning` payloads. Fixes #46637. Thanks @zipzagster and @lexhoefsloot.

src/infra/system-events.test.ts

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,18 @@ describe("system events (session routing)", () => {
309309
]);
310310
});
311311

312+
it("allows non-consecutive unkeyed duplicate events", () => {
313+
const key = "agent:main:test-unkeyed-noncons-dupe";
314+
const first = enqueueSystemEvent("Node connected", { sessionKey: key });
315+
const interleaved = enqueueSystemEvent("Heartbeat tick", { sessionKey: key });
316+
const retry = enqueueSystemEvent("Node connected", { sessionKey: key });
317+
318+
expect(first).toBe(true);
319+
expect(interleaved).toBe(true);
320+
expect(retry).toBe(true);
321+
expect(peekSystemEvents(key)).toEqual(["Node connected", "Heartbeat tick", "Node connected"]);
322+
});
323+
312324
it("allows the same text under a different context key", () => {
313325
const key = "agent:main:test-context-disambiguates";
314326
const reactionA = enqueueSystemEvent("Discord reaction added: ✅", {
@@ -325,6 +337,42 @@ describe("system events (session routing)", () => {
325337
expect(peekSystemEventEntries(key)).toHaveLength(2);
326338
});
327339

340+
it("allows the same text and context under a different delivery route", () => {
341+
const key = "agent:main:test-context-route-disambiguates";
342+
const first = enqueueSystemEvent("Build completed", {
343+
sessionKey: key,
344+
contextKey: "build:123",
345+
deliveryContext: { channel: "telegram", to: "100" },
346+
});
347+
const second = enqueueSystemEvent("Build completed", {
348+
sessionKey: key,
349+
contextKey: "build:123",
350+
deliveryContext: { channel: "telegram", to: "200" },
351+
});
352+
353+
expect(first).toBe(true);
354+
expect(second).toBe(true);
355+
expect(peekSystemEventEntries(key)).toHaveLength(2);
356+
});
357+
358+
it("allows the same text and context under different trust metadata", () => {
359+
const key = "agent:main:test-context-trust-disambiguates";
360+
const trusted = enqueueSystemEvent("Hook finished", {
361+
sessionKey: key,
362+
contextKey: "hook:done",
363+
trusted: true,
364+
});
365+
const untrusted = enqueueSystemEvent("Hook finished", {
366+
sessionKey: key,
367+
contextKey: "hook:done",
368+
trusted: false,
369+
});
370+
371+
expect(trusted).toBe(true);
372+
expect(untrusted).toBe(true);
373+
expect(peekSystemEventEntries(key)).toHaveLength(2);
374+
});
375+
328376
it("preserves lastContextKey when a duplicate is skipped", () => {
329377
const key = "agent:main:test-context-preserved";
330378
enqueueSystemEvent("Node connected", { sessionKey: key, contextKey: "build:123" });
@@ -345,6 +393,60 @@ describe("system events (session routing)", () => {
345393

346394
expect(isSystemEventContextChanged(key, "build:123")).toBe(false);
347395
});
396+
397+
it("preserves lastContextKey from the newest contextful event after partial consume", () => {
398+
const key = "agent:main:test-context-preserved-after-consume";
399+
enqueueSystemEvent("startup", { sessionKey: key });
400+
enqueueSystemEvent("contextful", { sessionKey: key, contextKey: "build:123" });
401+
enqueueSystemEvent("unkeyed followup", { sessionKey: key });
402+
const inspected = peekSystemEventEntries(key).slice(0, 1);
403+
404+
expect(consumeSystemEventEntries(key, inspected).map((entry) => entry.text)).toEqual([
405+
"startup",
406+
]);
407+
expect(isSystemEventContextChanged(key, "build:123")).toBe(false);
408+
});
409+
410+
it("allows a keyed duplicate after the original is evicted", () => {
411+
const key = "agent:main:test-keyed-duplicate-after-eviction";
412+
enqueueSystemEvent("Build completed", { sessionKey: key, contextKey: "build:123" });
413+
for (let index = 0; index < 20; index += 1) {
414+
enqueueSystemEvent(`event ${index}`, { sessionKey: key, contextKey: `event:${index}` });
415+
}
416+
417+
expect(
418+
enqueueSystemEvent("Build completed", { sessionKey: key, contextKey: "build:123" }),
419+
).toBe(true);
420+
});
421+
422+
it("allows a keyed duplicate after the original is consumed from the prefix", () => {
423+
const key = "agent:main:test-keyed-duplicate-after-prefix-consume";
424+
enqueueSystemEvent("Build completed", { sessionKey: key, contextKey: "build:123" });
425+
const inspected = peekSystemEventEntries(key);
426+
427+
expect(consumeSystemEventEntries(key, inspected).map((entry) => entry.text)).toEqual([
428+
"Build completed",
429+
]);
430+
expect(
431+
enqueueSystemEvent("Build completed", { sessionKey: key, contextKey: "build:123" }),
432+
).toBe(true);
433+
});
434+
435+
it("allows a keyed duplicate after the original is selectively consumed", () => {
436+
const key = "agent:main:test-keyed-duplicate-after-selected-consume";
437+
enqueueSystemEvent("Build completed", { sessionKey: key, contextKey: "build:123" });
438+
enqueueSystemEvent("Other event", { sessionKey: key, contextKey: "build:other" });
439+
const selected = peekSystemEventEntries(key).filter(
440+
(entry) => entry.text === "Build completed",
441+
);
442+
443+
expect(consumeSelectedSystemEventEntries(key, selected).map((entry) => entry.text)).toEqual([
444+
"Build completed",
445+
]);
446+
expect(
447+
enqueueSystemEvent("Build completed", { sessionKey: key, contextKey: "build:123" }),
448+
).toBe(true);
449+
});
348450
});
349451

350452
describe("isCronSystemEvent", () => {

src/infra/system-events.ts

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -90,9 +90,17 @@ function findDuplicateInQueue(
9090
queue: readonly SystemEvent[],
9191
text: string,
9292
contextKey: string | null,
93+
deliveryContext: DeliveryContext | undefined,
94+
trusted: boolean,
9395
): SystemEvent | undefined {
96+
if (contextKey === null) {
97+
const last = queue[queue.length - 1];
98+
return last && isDuplicateSystemEvent(last, { text, contextKey, deliveryContext, trusted })
99+
? last
100+
: undefined;
101+
}
94102
for (const event of queue) {
95-
if (event.text === text && (event.contextKey ?? null) === contextKey) {
103+
if (isDuplicateSystemEvent(event, { text, contextKey, deliveryContext, trusted })) {
96104
return event;
97105
}
98106
}
@@ -114,7 +122,16 @@ export function enqueueSystemEvent(text: string, options: SystemEventOptions) {
114122
}
115123
const normalizedContextKey = normalizeContextKey(options?.contextKey);
116124
const normalizedDeliveryContext = normalizeDeliveryContext(options?.deliveryContext);
117-
if (findDuplicateInQueue(entry.queue, cleaned, normalizedContextKey)) {
125+
const trusted = options.trusted !== false;
126+
if (
127+
findDuplicateInQueue(
128+
entry.queue,
129+
cleaned,
130+
normalizedContextKey,
131+
normalizedDeliveryContext,
132+
trusted,
133+
)
134+
) {
118135
return false;
119136
}
120137
applyContextKeyPolicy(entry, normalizedContextKey);
@@ -123,7 +140,7 @@ export function enqueueSystemEvent(text: string, options: SystemEventOptions) {
123140
ts: Date.now(),
124141
contextKey: normalizedContextKey,
125142
deliveryContext: normalizedDeliveryContext,
126-
trusted: options.trusted !== false,
143+
trusted,
127144
});
128145
if (entry.queue.length > MAX_EVENTS) {
129146
entry.queue.shift();
@@ -154,6 +171,18 @@ function areDeliveryContextsEqual(left?: DeliveryContext, right?: DeliveryContex
154171
return channelRouteDedupeKey(left) === channelRouteDedupeKey(right);
155172
}
156173

174+
function isDuplicateSystemEvent(
175+
existing: SystemEvent,
176+
incoming: Pick<SystemEvent, "text" | "contextKey" | "deliveryContext" | "trusted">,
177+
): boolean {
178+
return (
179+
existing.text === incoming.text &&
180+
(existing.contextKey ?? null) === (incoming.contextKey ?? null) &&
181+
(existing.trusted ?? true) === (incoming.trusted ?? true) &&
182+
areDeliveryContextsEqual(existing.deliveryContext, incoming.deliveryContext)
183+
);
184+
}
185+
157186
function areSystemEventsEqual(left: SystemEvent, right: SystemEvent): boolean {
158187
return (
159188
left.text === right.text &&
@@ -170,8 +199,14 @@ function resetQueueState(key: string, entry: SessionQueue) {
170199
queues.delete(key);
171200
return;
172201
}
173-
const newest = entry.queue[entry.queue.length - 1];
174-
entry.lastContextKey = newest.contextKey ?? null;
202+
for (let index = entry.queue.length - 1; index >= 0; index -= 1) {
203+
const contextKey = entry.queue[index].contextKey ?? null;
204+
if (contextKey !== null) {
205+
entry.lastContextKey = contextKey;
206+
return;
207+
}
208+
}
209+
entry.lastContextKey = null;
175210
}
176211

177212
export function consumeSystemEventEntries(

0 commit comments

Comments
 (0)