Skip to content

Commit 1af6855

Browse files
committed
refactor(agents): thread post-compaction guard observer
1 parent e0fafdc commit 1af6855

7 files changed

Lines changed: 71 additions & 101 deletions

File tree

src/agents/pi-embedded-runner/post-compaction-loop-guard.ts

Lines changed: 0 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -29,21 +29,13 @@ export type PostCompactionLoopGuard = {
2929
snapshot: () => { armed: boolean; remainingAttempts: number };
3030
};
3131

32-
export type PostCompactionGuardScope = {
33-
sessionKey?: string;
34-
sessionId?: string;
35-
runId?: string;
36-
};
37-
3832
type GuardState = {
3933
enabled: boolean;
4034
windowSize: number;
4135
remainingAttempts: number;
4236
history: PostCompactionGuardObservation[];
4337
};
4438

45-
const activeGuards = new Map<string, PostCompactionLoopGuard>();
46-
4739
function asPositiveInt(value: number | undefined, fallback: number): number {
4840
if (typeof value !== "number" || !Number.isInteger(value) || value <= 0) {
4941
return fallback;
@@ -113,56 +105,6 @@ export function createPostCompactionLoopGuard(
113105
return { armPostCompaction, observe, snapshot };
114106
}
115107

116-
function normalizeScopePart(value: string | undefined): string | undefined {
117-
const trimmed = value?.trim();
118-
return trimmed ? trimmed : undefined;
119-
}
120-
121-
function scopeKeys(scope: PostCompactionGuardScope): string[] {
122-
const runId = normalizeScopePart(scope.runId);
123-
const keys: string[] = [];
124-
for (const [kind, id] of [
125-
["sessionKey", normalizeScopePart(scope.sessionKey)],
126-
["sessionId", normalizeScopePart(scope.sessionId)],
127-
] as const) {
128-
if (!id) {
129-
continue;
130-
}
131-
keys.push(runId ? `${kind}:${id}:run:${runId}` : `${kind}:${id}`);
132-
}
133-
return keys;
134-
}
135-
136-
export function registerPostCompactionLoopGuard(
137-
scope: PostCompactionGuardScope,
138-
guard: PostCompactionLoopGuard,
139-
): () => void {
140-
const keys = scopeKeys(scope);
141-
for (const key of keys) {
142-
activeGuards.set(key, guard);
143-
}
144-
return () => {
145-
for (const key of keys) {
146-
if (activeGuards.get(key) === guard) {
147-
activeGuards.delete(key);
148-
}
149-
}
150-
};
151-
}
152-
153-
export function observePostCompactionLoopGuard(
154-
scope: PostCompactionGuardScope,
155-
call: PostCompactionGuardObservation,
156-
): PostCompactionGuardVerdict | undefined {
157-
for (const key of scopeKeys(scope)) {
158-
const guard = activeGuards.get(key);
159-
if (guard) {
160-
return guard.observe(call);
161-
}
162-
}
163-
return undefined;
164-
}
165-
166108
export class PostCompactionLoopPersistedError extends Error {
167109
readonly detector: "compaction_loop_persisted";
168110
readonly count: number;

src/agents/pi-embedded-runner/run.compaction-loop-guard.test.ts

Lines changed: 28 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,10 @@ import type {
44
getDiagnosticSessionState as GetDiagnosticSessionStateType,
55
SessionState,
66
} from "../../logging/diagnostic-session-state.js";
7-
import type { wrapToolWithBeforeToolCallHook as WrapToolWithBeforeToolCallHookType } from "../pi-tools.before-tool-call.js";
7+
import type {
8+
ToolOutcomeObserver,
9+
wrapToolWithBeforeToolCallHook as WrapToolWithBeforeToolCallHookType,
10+
} from "../pi-tools.before-tool-call.js";
811
import type {
912
recordToolCall as RecordToolCallType,
1013
recordToolCallOutcome as RecordToolCallOutcomeType,
@@ -72,6 +75,7 @@ async function executeWrappedToolOutcome(
7275
toolName: string,
7376
toolParams: unknown,
7477
result: unknown,
78+
onToolOutcome?: ToolOutcomeObserver,
7579
runId = baseParams.runId,
7680
): Promise<unknown> {
7781
const tool = wrapToolWithBeforeToolCallHook(
@@ -84,6 +88,7 @@ async function executeWrappedToolOutcome(
8488
sessionKey: baseParams.sessionKey,
8589
sessionId: baseParams.sessionId,
8690
runId,
91+
onToolOutcome,
8792
},
8893
);
8994
liveToolCallSeq += 1;
@@ -159,12 +164,15 @@ describe("post-compaction loop guard wired into runEmbeddedPiAgent", () => {
159164
// Attempt 2: post-compaction. The live wrapped-tool path records each
160165
// outcome while the prompt is still running; the third identical result
161166
// aborts before the attempt can return.
162-
mockedRunEmbeddedAttempt.mockImplementationOnce(async () => {
167+
mockedRunEmbeddedAttempt.mockImplementationOnce(async (attemptParams: unknown) => {
168+
const onToolOutcome = (attemptParams as { onToolOutcome?: ToolOutcomeObserver })
169+
.onToolOutcome;
163170
for (let i = 0; i < 3; i += 1) {
164171
await executeWrappedToolOutcome(
165172
"gateway",
166173
{ action: "lookup", path: "x" },
167174
"identical-result",
175+
onToolOutcome,
168176
);
169177
}
170178
attemptReturned = true;
@@ -200,9 +208,16 @@ describe("post-compaction loop guard wired into runEmbeddedPiAgent", () => {
200208
// Attempt 2 (post-compaction): identical args, but DIFFERENT result hash
201209
// each time. This fills the window without triggering the persisted-loop
202210
// abort because the tool is making progress.
203-
mockedRunEmbeddedAttempt.mockImplementationOnce(async () => {
211+
mockedRunEmbeddedAttempt.mockImplementationOnce(async (attemptParams: unknown) => {
212+
const onToolOutcome = (attemptParams as { onToolOutcome?: ToolOutcomeObserver })
213+
.onToolOutcome;
204214
for (let i = 0; i < 3; i += 1) {
205-
await executeWrappedToolOutcome("gateway", { action: "lookup", path: "x" }, `result-${i}`);
215+
await executeWrappedToolOutcome(
216+
"gateway",
217+
{ action: "lookup", path: "x" },
218+
`result-${i}`,
219+
onToolOutcome,
220+
);
206221
}
207222
return makeAttemptResult({
208223
promptError: null,
@@ -235,9 +250,11 @@ describe("post-compaction loop guard wired into runEmbeddedPiAgent", () => {
235250
// Attempt 2 (post-compaction): two distinct records → window full,
236251
// guard disarms with no abort. We then append more identical records
237252
// afterwards in this test to confirm they are not observed by the guard.
238-
mockedRunEmbeddedAttempt.mockImplementationOnce(async () => {
239-
await executeWrappedToolOutcome("read", { path: "/a" }, "ra");
240-
await executeWrappedToolOutcome("write", { path: "/b" }, "rb");
253+
mockedRunEmbeddedAttempt.mockImplementationOnce(async (attemptParams: unknown) => {
254+
const onToolOutcome = (attemptParams as { onToolOutcome?: ToolOutcomeObserver })
255+
.onToolOutcome;
256+
await executeWrappedToolOutcome("read", { path: "/a" }, "ra", onToolOutcome);
257+
await executeWrappedToolOutcome("write", { path: "/b" }, "rb", onToolOutcome);
241258
return makeAttemptResult({
242259
promptError: null,
243260
toolMetas: [{ toolName: "read" }, { toolName: "write" }],
@@ -293,12 +310,15 @@ describe("post-compaction loop guard wired into runEmbeddedPiAgent", () => {
293310
// Attempt 2 (post-compaction): three identical live tool outcomes while
294311
// history is already at the cap. The guard aborts on the third result
295312
// before the mocked attempt can return.
296-
mockedRunEmbeddedAttempt.mockImplementationOnce(async () => {
313+
mockedRunEmbeddedAttempt.mockImplementationOnce(async (attemptParams: unknown) => {
314+
const onToolOutcome = (attemptParams as { onToolOutcome?: ToolOutcomeObserver })
315+
.onToolOutcome;
297316
for (let i = 0; i < 3; i += 1) {
298317
await executeWrappedToolOutcome(
299318
"gateway",
300319
{ action: "lookup", path: "x" },
301320
"identical-result",
321+
onToolOutcome,
302322
);
303323
}
304324
// History is still capped at HISTORY_TRIM_CAP after the trim.

src/agents/pi-embedded-runner/run.ts

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,8 @@ import { log } from "./logger.js";
9494
import { resolveModelAsync } from "./model.js";
9595
import {
9696
createPostCompactionLoopGuard,
97-
registerPostCompactionLoopGuard,
97+
PostCompactionLoopPersistedError,
98+
type PostCompactionGuardObservation,
9899
} from "./post-compaction-loop-guard.js";
99100
import { createEmbeddedRunReplayState, observeReplayMetadata } from "./replay-state.js";
100101
import { handleAssistantFailover } from "./run/assistant-failover.js";
@@ -792,14 +793,14 @@ export async function runEmbeddedPiAgent(
792793
const postCompactionGuard = createPostCompactionLoopGuard(
793794
params.config?.tools?.loopDetection?.postCompactionGuard,
794795
);
795-
const unregisterPostCompactionGuard = registerPostCompactionLoopGuard(
796-
{
797-
sessionKey: params.sessionKey,
798-
sessionId: params.sessionId,
799-
runId: params.runId,
800-
},
801-
postCompactionGuard,
802-
);
796+
const observePostCompactionToolOutcome = (
797+
observation: PostCompactionGuardObservation,
798+
): void => {
799+
const verdict = postCompactionGuard.observe(observation);
800+
if (verdict.shouldAbort) {
801+
throw PostCompactionLoopPersistedError.fromVerdict(verdict);
802+
}
803+
};
803804
let lastRetryFailoverReason: FailoverReason | null = null;
804805
let planningOnlyRetryInstruction: string | null = null;
805806
let reasoningOnlyRetryInstruction: string | null = null;
@@ -1160,6 +1161,7 @@ export async function runEmbeddedPiAgent(
11601161
agentId: workspaceResolution.agentId,
11611162
legacyBeforeAgentStartResult,
11621163
thinkLevel,
1164+
onToolOutcome: observePostCompactionToolOutcome,
11631165
fastMode: params.fastMode,
11641166
verboseLevel: params.verboseLevel,
11651167
reasoningLevel: params.reasoningLevel,
@@ -2786,7 +2788,6 @@ export async function runEmbeddedPiAgent(
27862788
};
27872789
}
27882790
} finally {
2789-
unregisterPostCompactionGuard();
27902791
forgetPromptBuildDrainCacheForRun(params.runId);
27912792
stopRuntimeAuthRefreshTimer();
27922793
await runAgentCleanupStep({

src/agents/pi-embedded-runner/run/attempt.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -928,6 +928,7 @@ export async function runEmbeddedAttempt(
928928
forceHeartbeatTool: params.forceHeartbeatTool,
929929
authProfileStore: params.authProfileStore,
930930
recordToolPrepStage: (name) => corePluginToolStages.mark(name),
931+
onToolOutcome: params.onToolOutcome,
931932
onYield: (message) => {
932933
yieldDetected = true;
933934
yieldMessage = message;
@@ -1650,6 +1651,7 @@ export async function runEmbeddedAttempt(
16501651
sessionId: params.sessionId,
16511652
runId: params.runId,
16521653
loopDetection: clientToolLoopDetection,
1654+
onToolOutcome: params.onToolOutcome,
16531655
},
16541656
)
16551657
: [];

src/agents/pi-embedded-runner/run/types.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import type { DiagnosticTraceContext } from "../../../infra/diagnostic-trace-con
99
import type { PluginHookBeforeAgentStartResult } from "../../../plugins/hook-before-agent-start.types.js";
1010
import type { AuthProfileStore } from "../../auth-profiles/types.js";
1111
import type { MessagingToolSend } from "../../pi-embedded-messaging.types.js";
12+
import type { ToolOutcomeObserver } from "../../pi-tools.before-tool-call.js";
1213
import type { AgentRuntimePlan } from "../../runtime-plan/types.js";
1314
import type { ToolErrorSummary } from "../../tool-error-summary.js";
1415
import type { NormalizedUsage } from "../../usage.js";
@@ -40,6 +41,8 @@ export type EmbeddedRunAttemptParams = EmbeddedRunAttemptBase & {
4041
agentHarnessId?: string;
4142
/** OpenClaw-owned runtime policy prepared by the orchestrator for this attempt. */
4243
runtimePlan?: AgentRuntimePlan;
44+
/** Live observer called after wrapped tool outcomes are recorded. */
45+
onToolOutcome?: ToolOutcomeObserver;
4346
model: Model<Api>;
4447
authStorage: AuthStorage;
4548
/** Auth profile store already resolved during startup for this attempt. */

src/agents/pi-tools.before-tool-call.ts

Lines changed: 20 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -26,14 +26,18 @@ import {
2626
import { createLazyRuntimeSurface } from "../shared/lazy-runtime.js";
2727
import { isPlainObject } from "../utils.js";
2828
import { copyChannelAgentToolMeta } from "./channel-tools.js";
29-
import {
30-
observePostCompactionLoopGuard,
31-
PostCompactionLoopPersistedError,
32-
} from "./pi-embedded-runner/post-compaction-loop-guard.js";
3329
import { normalizeToolName } from "./tool-policy.js";
3430
import type { AnyAgentTool } from "./tools/common.js";
3531
import { callGatewayTool } from "./tools/gateway.js";
3632

33+
export type ToolOutcomeObservation = {
34+
toolName: string;
35+
argsHash: string;
36+
resultHash: string;
37+
};
38+
39+
export type ToolOutcomeObserver = (observation: ToolOutcomeObservation) => void;
40+
3741
export type HookContext = {
3842
agentId?: string;
3943
config?: OpenClawConfig;
@@ -43,6 +47,7 @@ export type HookContext = {
4347
runId?: string;
4448
trace?: DiagnosticTraceContext;
4549
loopDetection?: ToolLoopDetectionConfig;
50+
onToolOutcome?: ToolOutcomeObserver;
4651
};
4752

4853
type HookBlockedKind = "veto" | "failure";
@@ -376,9 +381,10 @@ async function recordLoopOutcome(args: {
376381
result?: unknown;
377382
error?: unknown;
378383
}): Promise<void> {
379-
if (!args.ctx?.sessionKey) {
384+
if (!args.ctx?.sessionKey && !args.ctx?.sessionId) {
380385
return;
381386
}
387+
let recordedOutcome: ToolOutcomeObservation | undefined;
382388
try {
383389
const { getDiagnosticSessionState, recordToolCallOutcome } = await loadBeforeToolCallRuntime();
384390
const sessionState = getDiagnosticSessionState({
@@ -394,29 +400,19 @@ async function recordLoopOutcome(args: {
394400
config: args.ctx.loopDetection,
395401
...(args.ctx.runId && { runId: args.ctx.runId }),
396402
});
397-
if (record?.resultHash) {
398-
const verdict = observePostCompactionLoopGuard(
399-
{
400-
sessionKey: args.ctx.sessionKey,
401-
sessionId: args.ctx.sessionId,
402-
runId: args.ctx.runId,
403-
},
404-
{
405-
toolName: record.toolName,
406-
argsHash: record.argsHash,
407-
resultHash: record.resultHash,
408-
},
409-
);
410-
if (verdict?.shouldAbort) {
411-
throw PostCompactionLoopPersistedError.fromVerdict(verdict);
412-
}
403+
if (record?.resultHash && args.ctx.onToolOutcome) {
404+
recordedOutcome = {
405+
toolName: record.toolName,
406+
argsHash: record.argsHash,
407+
resultHash: record.resultHash,
408+
};
413409
}
414410
} catch (err) {
415-
if (err instanceof PostCompactionLoopPersistedError) {
416-
throw err;
417-
}
418411
log.warn(`tool loop outcome tracking failed: tool=${args.toolName} error=${String(err)}`);
419412
}
413+
if (recordedOutcome) {
414+
args.ctx.onToolOutcome?.(recordedOutcome);
415+
}
420416
}
421417

422418
export async function runBeforeToolCallHook(args: {

src/agents/pi-tools.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ import type { ModelAuthMode } from "./model-auth.js";
2727
import { resolveOpenClawPluginToolsForOptions } from "./openclaw-plugin-tools.js";
2828
import { createOpenClawTools } from "./openclaw-tools.js";
2929
import { wrapToolWithAbortSignal } from "./pi-tools.abort.js";
30-
import { wrapToolWithBeforeToolCallHook } from "./pi-tools.before-tool-call.js";
30+
import {
31+
type ToolOutcomeObserver,
32+
wrapToolWithBeforeToolCallHook,
33+
} from "./pi-tools.before-tool-call.js";
3134
import { applyDeferredFollowupToolDescriptions } from "./pi-tools.deferred-followup.js";
3235
import { filterToolsByMessageProvider } from "./pi-tools.message-provider-policy.js";
3336
import {
@@ -378,6 +381,8 @@ export function createOpenClawCodingTools(options?: {
378381
onYield?: (message: string) => Promise<void> | void;
379382
/** Optional instrumentation callback for tool preparation stage timing. */
380383
recordToolPrepStage?: (name: string) => void;
384+
/** Live observer called after wrapped tool outcomes are recorded. */
385+
onToolOutcome?: ToolOutcomeObserver;
381386
}): AnyAgentTool[] {
382387
const execToolName = "exec";
383388
const sandbox = options?.sandbox?.enabled ? options.sandbox : undefined;
@@ -838,6 +843,7 @@ export function createOpenClawCodingTools(options?: {
838843
runId: options?.runId,
839844
...(options?.trace ? { trace: options.trace } : {}),
840845
loopDetection: resolveToolLoopDetectionConfig({ cfg: options?.config, agentId }),
846+
onToolOutcome: options?.onToolOutcome,
841847
}),
842848
);
843849
options?.recordToolPrepStage?.("tool-hooks");

0 commit comments

Comments
 (0)