Skip to content

Commit fdf8ddd

Browse files
clawsweeper[bot]BryanTegomohTakhoffman
authored
fix(agents): classify expired thinking signatures (#88340)
Summary: - The branch adds thinking-signature replay-invalid classification, retries matching terminal stream-error eve ... output, preserves static fallback model params, and updates related tests including a Copilot hook fixture. - PR surface: Source +57, Tests +177. Total +234 across 6 files. - Reproducibility: yes. for the classifier boundary: current main lacks a thinking-signature replay-invalid ma ... ort supplies the exact provider error payload. The time-dependent live expiry path was not reproduced here. Automerge notes: - PR branch already contained follow-up commit before automerge: fix(agents): classify expired thinking signatures - PR branch already contained follow-up commit before automerge: fix(agents): recover thinking signature stream errors - PR branch already contained follow-up commit before automerge: fix(agents): recover expired thinking signatures - PR branch already contained follow-up commit before automerge: fix(clawsweeper): address review for automerge-openclaw-openclaw-8807… Validation: - ClawSweeper review passed for head b65f2b8. - Required merge gates passed before the squash merge. Prepared head SHA: b65f2b8 Review: #88340 (comment) Co-authored-by: Bryan Tegomoh <bryan.tegomoh@gmail.com> Co-authored-by: clawsweeper <274271284+clawsweeper[bot]@users.noreply.github.com> Co-authored-by: clawsweeper[bot] <274271284+clawsweeper[bot]@users.noreply.github.com> Approved-by: takhoffman Co-authored-by: takhoffman <781889+takhoffman@users.noreply.github.com>
1 parent 3a88142 commit fdf8ddd

6 files changed

Lines changed: 247 additions & 13 deletions

File tree

extensions/copilot/src/hooks-bridge.test.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ describe("createHooksBridge", () => {
55
const hookBase = {
66
sessionId: "runtime-session",
77
timestamp: new Date(0),
8+
cwd: "/",
89
workingDirectory: "/",
910
};
1011

@@ -40,6 +41,7 @@ describe("createHooksBridge", () => {
4041
const hooks = createHooksBridge({ onPreToolUse })!;
4142
const input = {
4243
...hookBase,
44+
cwd: "/tmp",
4345
workingDirectory: "/tmp",
4446
toolName: "bash",
4547
toolArgs: { cmd: "ls" },

src/agents/embedded-agent-helpers.isbillingerrormessage.test.ts

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1563,6 +1563,25 @@ describe("classifyProviderRuntimeFailureKind", () => {
15631563
).toBe("replay_invalid");
15641564
});
15651565

1566+
it("classifies expired Anthropic thinking signatures as replay invalid", () => {
1567+
expect(
1568+
classifyProviderRuntimeFailureKind(
1569+
'{"type":"error","error":{"type":"invalid_request_error","message":"messages.1.content.440: Invalid `signature` in `thinking` block"}}',
1570+
),
1571+
).toBe("replay_invalid");
1572+
expect(
1573+
classifyProviderRuntimeFailureKind(
1574+
"ValidationException: invalid signature on thinking block",
1575+
),
1576+
).toBe("replay_invalid");
1577+
expect(
1578+
classifyProviderRuntimeFailureKind(
1579+
"ValidationException: signature present in thinking block",
1580+
),
1581+
).not.toBe("replay_invalid");
1582+
expect(classifyProviderRuntimeFailureKind("Invalid signature")).not.toBe("replay_invalid");
1583+
});
1584+
15661585
it("splits ambiguous provider runtime failures instead of collapsing to unknown", () => {
15671586
expect(classifyProviderRuntimeFailureKind({})).toBe("empty_response");
15681587
expect(classifyProviderRuntimeFailureKind("Unknown error (no error details in response)")).toBe(

src/agents/embedded-agent-helpers/errors.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -356,6 +356,8 @@ const INTERRUPTED_NETWORK_ERROR_RE =
356356
/\beconnrefused\b|\beconnreset\b|\beconnaborted\b|\benetreset\b|\behostunreach\b|\behostdown\b|\benetunreach\b|\bepipe\b|\bsocket hang up\b|\bconnection refused\b|\bconnection reset\b|\bconnection aborted\b|\bnetwork is unreachable\b|\bhost is unreachable\b|\bfetch failed\b|\bconnection error\b|\bnetwork request failed\b/i;
357357
const REPLAY_INVALID_RE =
358358
/\bprevious_response_id\b.*\b(?:invalid|unknown|not found|does not exist|expired|mismatch)\b|\btool_(?:use|call)\.(?:input|arguments)\b.*\b(?:missing|required)\b|\bincorrect role information\b|\broles must alternate\b|\binput item id does not belong to this connection\b/i;
359+
const THINKING_SIGNATURE_ERROR_RE =
360+
/\b(?:invalid|expired)\b.*\bsignature\b|\bsignature\b.*\b(?:invalid|expired)\b/i;
359361
const SANDBOX_BLOCKED_RE =
360362
/\bapproval is required\b|\bapproval timed out\b|\bapproval was denied\b|\bblocked by sandbox\b|\bsandbox\b.*\b(?:blocked|denied|forbidden|disabled|not allowed)\b|\bexec denied\s*\(/i;
361363
const NO_BODY_HTTP_WRAPPER_RE =
@@ -471,7 +473,11 @@ function isDnsTransportErrorMessage(raw: string): boolean {
471473
}
472474

473475
function isReplayInvalidErrorMessage(raw: string): boolean {
474-
return REPLAY_INVALID_RE.test(raw);
476+
return REPLAY_INVALID_RE.test(raw) || isThinkingSignatureReplayInvalidErrorMessage(raw);
477+
}
478+
479+
function isThinkingSignatureReplayInvalidErrorMessage(raw: string): boolean {
480+
return /\bthinking\b/i.test(raw) && THINKING_SIGNATURE_ERROR_RE.test(raw);
475481
}
476482

477483
function isSandboxBlockedErrorMessage(raw: string): boolean {

src/agents/embedded-agent-runner/model.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ type ProviderRuntimeHooks = {
7777
type StaticCatalogFallbackModel = Model & {
7878
compat?: ModelCompatConfig;
7979
contextTokens?: number;
80+
params?: Record<string, unknown>;
8081
mediaInput?: ModelMediaInputConfig;
8182
};
8283

@@ -1025,7 +1026,7 @@ function resolveConfiguredFallbackModel(params: {
10251026
provider,
10261027
modelId,
10271028
providerParams: providerConfig?.params,
1028-
configuredParams: configuredModel?.params,
1029+
configuredParams: metadataModel?.params,
10291030
});
10301031
const fallbackTransport = resolveProviderTransport({
10311032
provider,

src/agents/embedded-agent-runner/thinking.test.ts

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -482,6 +482,37 @@ describe("wrapAnthropicStreamWithRecovery", () => {
482482
const anthropicThinkingError = new Error(
483483
"thinking or redacted_thinking blocks in the latest assistant message cannot be modified",
484484
);
485+
const terminalThinkingSignatureError =
486+
"ValidationException: invalid signature on thinking block in message history";
487+
488+
function createTestAssistantMessage(
489+
overrides: Partial<AssistantMessage> & Pick<AssistantMessage, "content" | "stopReason">,
490+
): AssistantMessage {
491+
return castAgentMessage({
492+
role: "assistant",
493+
api: "anthropic-messages",
494+
provider: "anthropic",
495+
model: "claude-sonnet-4-6",
496+
usage: {
497+
input: 0,
498+
output: 0,
499+
cacheRead: 0,
500+
cacheWrite: 0,
501+
totalTokens: 0,
502+
cost: { input: 0, output: 0, cacheRead: 0, cacheWrite: 0, total: 0 },
503+
},
504+
timestamp: 0,
505+
...overrides,
506+
}) as AssistantMessage;
507+
}
508+
509+
function createTestStreamErrorMessage(errorMessage: string): AssistantMessage {
510+
return createTestAssistantMessage({
511+
content: [{ type: "text", text: "stream failed" }],
512+
stopReason: "error",
513+
errorMessage,
514+
});
515+
}
485516

486517
it("retries once with omitted-reasoning text when the request is rejected before streaming", async () => {
487518
let callCount = 0;
@@ -584,6 +615,131 @@ describe("wrapAnthropicStreamWithRecovery", () => {
584615
expect(callCount).toBe(2);
585616
});
586617

618+
it("retries pre-content terminal stream-error events with omitted-reasoning text", async () => {
619+
let callCount = 0;
620+
const contexts: Array<{ messages?: AgentMessage[] }> = [];
621+
const finalMessage = createTestAssistantMessage({
622+
content: [{ type: "text", text: "recovered" }],
623+
stopReason: "stop",
624+
});
625+
const wrapped = wrapAnthropicStreamWithRecovery(
626+
((_model, context) => {
627+
callCount += 1;
628+
const attempt = callCount;
629+
contexts.push(context as { messages?: AgentMessage[] });
630+
const stream = createAssistantMessageEventStream();
631+
queueMicrotask(() => {
632+
if (attempt === 1) {
633+
stream.push({
634+
type: "error",
635+
reason: "error",
636+
error: createTestStreamErrorMessage(terminalThinkingSignatureError),
637+
});
638+
} else {
639+
stream.push({ type: "done", reason: "stop", message: finalMessage });
640+
}
641+
stream.end();
642+
});
643+
return stream;
644+
}) as Parameters<typeof wrapAnthropicStreamWithRecovery>[0],
645+
{ id: "test-session" },
646+
);
647+
648+
const response = wrapped(
649+
{} as never,
650+
{
651+
messages: castAgentMessages([
652+
{
653+
role: "assistant",
654+
content: [{ type: "thinking", thinking: "secret", thinkingSignature: "sig" }],
655+
},
656+
]),
657+
} as never,
658+
{} as never,
659+
) as { result: () => Promise<unknown> } & AsyncIterable<unknown>;
660+
const events: unknown[] = [];
661+
for await (const event of response) {
662+
events.push(event);
663+
}
664+
665+
expect(events).toEqual([{ type: "done", reason: "stop", message: finalMessage }]);
666+
await expect(response.result()).resolves.toEqual(finalMessage);
667+
expect(callCount).toBe(2);
668+
const retryMessage = contexts[1]?.messages?.[0];
669+
if (!retryMessage || retryMessage.role !== "assistant") {
670+
throw new Error("Expected Anthropic recovery retry to start with an assistant message");
671+
}
672+
expect(retryMessage.content).toEqual([
673+
{ type: "text", text: OMITTED_ASSISTANT_REASONING_TEXT },
674+
]);
675+
});
676+
677+
it("does not retry non-thinking terminal stream-error events", async () => {
678+
let callCount = 0;
679+
const errorMessage = createTestStreamErrorMessage("rate limit exceeded");
680+
const wrapped = wrapAnthropicStreamWithRecovery(
681+
(() => {
682+
callCount += 1;
683+
const stream = createAssistantMessageEventStream();
684+
queueMicrotask(() => {
685+
stream.push({ type: "error", reason: "error", error: errorMessage });
686+
stream.end();
687+
});
688+
return stream;
689+
}) as Parameters<typeof wrapAnthropicStreamWithRecovery>[0],
690+
{ id: "test-session" },
691+
);
692+
693+
const response = wrapped({} as never, { messages: [] } as never, {} as never) as {
694+
result: () => Promise<unknown>;
695+
} & AsyncIterable<unknown>;
696+
const events: unknown[] = [];
697+
for await (const event of response) {
698+
events.push(event);
699+
}
700+
701+
expect(events).toEqual([{ type: "error", reason: "error", error: errorMessage }]);
702+
await expect(response.result()).resolves.toEqual(errorMessage);
703+
expect(callCount).toBe(1);
704+
});
705+
706+
it("does not retry terminal stream-error events after output was yielded", async () => {
707+
let callCount = 0;
708+
const partialMessage = createTestAssistantMessage({
709+
content: [{ type: "text", text: "" }],
710+
stopReason: "stop",
711+
});
712+
const errorMessage = createTestStreamErrorMessage(terminalThinkingSignatureError);
713+
const wrapped = wrapAnthropicStreamWithRecovery(
714+
(() => {
715+
callCount += 1;
716+
const stream = createAssistantMessageEventStream();
717+
queueMicrotask(() => {
718+
stream.push({ type: "start", partial: partialMessage });
719+
stream.push({ type: "error", reason: "error", error: errorMessage });
720+
stream.end();
721+
});
722+
return stream;
723+
}) as Parameters<typeof wrapAnthropicStreamWithRecovery>[0],
724+
{ id: "test-session" },
725+
);
726+
727+
const response = wrapped({} as never, { messages: [] } as never, {} as never) as {
728+
result: () => Promise<unknown>;
729+
} & AsyncIterable<unknown>;
730+
const events: unknown[] = [];
731+
for await (const event of response) {
732+
events.push(event);
733+
}
734+
735+
expect(events).toEqual([
736+
{ type: "start", partial: partialMessage },
737+
{ type: "error", reason: "error", error: errorMessage },
738+
]);
739+
await expect(response.result()).resolves.toEqual(errorMessage);
740+
expect(callCount).toBe(1);
741+
});
742+
587743
it("does not retry when the stream fails after yielding a chunk", async () => {
588744
let callCount = 0;
589745
const wrapped = wrapAnthropicStreamWithRecovery(

src/agents/embedded-agent-runner/thinking.ts

Lines changed: 61 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { formatErrorMessage } from "../../infra/errors.js";
2+
import type { AssistantMessageEvent } from "../../llm/types.js";
23
import { createAssistantMessageEventStream } from "../../llm/utils/event-stream.js";
34
import type { AgentMessage, StreamFn } from "../runtime/index.js";
45
import { log } from "./logger.js";
@@ -427,7 +428,13 @@ function shouldRecoverAnthropicThinkingError(
427428
error: unknown,
428429
sessionMeta: RecoverySessionMeta,
429430
): boolean {
430-
const message = formatErrorMessage(error);
431+
return shouldRecoverAnthropicThinkingErrorMessage(formatErrorMessage(error), sessionMeta);
432+
}
433+
434+
function shouldRecoverAnthropicThinkingErrorMessage(
435+
message: string,
436+
sessionMeta: RecoverySessionMeta,
437+
): boolean {
431438
if (!THINKING_BLOCK_ERROR_PATTERN.test(message)) {
432439
return false;
433440
}
@@ -440,17 +447,66 @@ function shouldRecoverAnthropicThinkingError(
440447
return true;
441448
}
442449

450+
function isAssistantMessageErrorEvent(
451+
event: unknown,
452+
): event is Extract<AssistantMessageEvent, { type: "error" }> {
453+
return (
454+
Boolean(event) && typeof event === "object" && (event as { type?: unknown }).type === "error"
455+
);
456+
}
457+
458+
function getAssistantMessageErrorText(
459+
event: Extract<AssistantMessageEvent, { type: "error" }>,
460+
): string {
461+
const errorMessage = (event.error as { errorMessage?: unknown }).errorMessage;
462+
return typeof errorMessage === "string" ? errorMessage : "";
463+
}
464+
465+
async function retryStreamWithoutThinking(
466+
outer: ReturnType<typeof createAssistantMessageEventStream>,
467+
retry: () => ReturnType<StreamFn>,
468+
): Promise<AssistantMessage> {
469+
const retryStream = retry();
470+
const resolvedRetry = retryStream instanceof Promise ? await retryStream : retryStream;
471+
for await (const chunk of resolvedRetry as AsyncIterable<unknown>) {
472+
outer.push(chunk as Parameters<typeof outer.push>[0]);
473+
}
474+
const result = await (resolvedRetry as { result?: () => Promise<AssistantMessage> }).result?.();
475+
return result as AssistantMessage;
476+
}
477+
443478
async function pumpStreamWithRecovery(
444479
outer: ReturnType<typeof createAssistantMessageEventStream>,
445480
stream: ReturnType<StreamFn>,
446481
sessionMeta: RecoverySessionMeta,
447482
retry: () => ReturnType<StreamFn>,
448483
): Promise<AssistantMessage> {
449-
let yieldedChunk = false;
484+
let yieldedOutput = false;
450485
try {
451486
const resolved = stream instanceof Promise ? await stream : stream;
452487
for await (const chunk of resolved as AsyncIterable<unknown>) {
453-
yieldedChunk = true;
488+
if (isAssistantMessageErrorEvent(chunk)) {
489+
if (
490+
shouldRecoverAnthropicThinkingErrorMessage(
491+
getAssistantMessageErrorText(chunk),
492+
sessionMeta,
493+
)
494+
) {
495+
if (yieldedOutput) {
496+
log.warn(
497+
`[session-recovery] Anthropic thinking error occurred after streaming began; skipping retry to avoid duplicate chunks: sessionId=${sessionMeta.id}`,
498+
);
499+
} else {
500+
sessionMeta.recoveredAnthropicThinking = true;
501+
log.warn(
502+
`[session-recovery] Anthropic thinking stream error; retrying once without thinking blocks: sessionId=${sessionMeta.id}`,
503+
);
504+
return retryStreamWithoutThinking(outer, retry);
505+
}
506+
}
507+
} else {
508+
yieldedOutput = true;
509+
}
454510
outer.push(chunk as Parameters<typeof outer.push>[0]);
455511
}
456512
const result = await (resolved as { result?: () => Promise<AssistantMessage> }).result?.();
@@ -459,7 +515,7 @@ async function pumpStreamWithRecovery(
459515
if (!shouldRecoverAnthropicThinkingError(error, sessionMeta)) {
460516
throw error;
461517
}
462-
if (yieldedChunk) {
518+
if (yieldedOutput) {
463519
log.warn(
464520
`[session-recovery] Anthropic thinking error occurred after streaming began; skipping retry to avoid duplicate chunks: sessionId=${sessionMeta.id}`,
465521
);
@@ -469,13 +525,7 @@ async function pumpStreamWithRecovery(
469525
log.warn(
470526
`[session-recovery] Anthropic thinking error during stream; retrying once without thinking blocks: sessionId=${sessionMeta.id}`,
471527
);
472-
const retryStream = retry();
473-
const resolvedRetry = retryStream instanceof Promise ? await retryStream : retryStream;
474-
for await (const chunk of resolvedRetry as AsyncIterable<unknown>) {
475-
outer.push(chunk as Parameters<typeof outer.push>[0]);
476-
}
477-
const result = await (resolvedRetry as { result?: () => Promise<AssistantMessage> }).result?.();
478-
return result as AssistantMessage;
528+
return retryStreamWithoutThinking(outer, retry);
479529
}
480530
}
481531

0 commit comments

Comments
 (0)