Skip to content

Commit 59917ef

Browse files
Merge 68542eb into e964987
2 parents e964987 + 68542eb commit 59917ef

5 files changed

Lines changed: 98 additions & 19 deletions

File tree

extensions/telegram/src/polling-session.ts

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,11 @@ export class TelegramPollingSession {
223223
#spooledUpdateHandlerTimeoutMs: number;
224224
#spooledUpdateHandlerAbortGraceMs: number;
225225
#deliveryDrainInFlight = false;
226+
// Callback supplied by the active isolated-ingress cycle to request the cycle
227+
// abort itself when the bot has visibly lost its grammy initialized state
228+
// mid-cycle. When set, calling it triggers a clean exit so the outer
229+
// `runUntilAbort` loop creates a fresh bot and re-runs `bot.init()`.
230+
#requestCycleRestartOnBotReinitNeeded: ((reason: string) => void) | null = null;
226231

227232
constructor(private readonly opts: TelegramPollingSessionOpts) {
228233
this.#transportState = new TelegramPollingTransportState({
@@ -465,9 +470,20 @@ export class TelegramPollingSession {
465470
);
466471
return;
467472
}
473+
const errMessage = formatErrorMessage(params.err);
468474
this.opts.log(
469-
`[telegram][diag] spooled update ${params.update.updateId} failed; keeping for retry: ${formatErrorMessage(params.err)}`,
475+
`[telegram][diag] spooled update ${params.update.updateId} failed; keeping for retry: ${errMessage}`,
470476
);
477+
// If the grammy bot has lost its initialized state mid-cycle, every
478+
// subsequent update handler fails with the same message in a tight retry
479+
// loop. Ask the active cycle to abort itself so the outer runUntilAbort
480+
// loop can create a fresh TelegramBot instance and re-run bot.init().
481+
if (typeof errMessage === "string" && errMessage.includes("Bot not initialized")) {
482+
const requestRestart = this.#requestCycleRestartOnBotReinitNeeded;
483+
if (requestRestart) {
484+
requestRestart(`spooled update ${params.update.updateId} hit Bot-not-initialized`);
485+
}
486+
}
471487
}
472488

473489
async #waitForSpooledUpdateHandlers(): Promise<void> {
@@ -724,6 +740,20 @@ export class TelegramPollingSession {
724740
void worker.stop();
725741
};
726742
this.opts.abortSignal?.addEventListener("abort", stopOnAbort, { once: true });
743+
// Install a one-shot callback that the spool failure path can use to ask
744+
// this cycle to restart when it sees grammy's "Bot not initialized" error.
745+
// Setting restartRequested and stopping the worker tears the cycle down via
746+
// the existing try/finally cleanup below.
747+
this.#requestCycleRestartOnBotReinitNeeded = (reason: string) => {
748+
if (restartRequested) {
749+
return;
750+
}
751+
restartRequested = true;
752+
this.opts.log(
753+
`[telegram][diag] requesting isolated polling cycle restart to re-run bot.init(): ${reason}`,
754+
);
755+
void worker.stop();
756+
};
727757
const drainIntervalMs = Math.max(100, Math.floor(ingress.drainIntervalMs ?? 500));
728758
let drainActive = false;
729759
const stopBot = () => {
@@ -820,6 +850,9 @@ export class TelegramPollingSession {
820850
clearInterval(drainTimer);
821851
unsubscribe();
822852
this.opts.abortSignal?.removeEventListener("abort", stopOnAbort);
853+
// Clear the restart-request callback so a future cycle is not
854+
// accidentally talking to this cycle's local state.
855+
this.#requestCycleRestartOnBotReinitNeeded = null;
823856
await worker.stop();
824857
if (!restartRequested) {
825858
await drainOnce();

src/agents/auth-profiles/store.ts

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -590,7 +590,12 @@ export function loadAuthProfileStoreForSecretsRuntime(agentDir?: string): AuthPr
590590
return loadAuthProfileStoreForRuntime(agentDir, {
591591
readOnly: true,
592592
allowKeychainPrompt: false,
593-
resolveLegacyOAuthSidecars: false,
593+
// Include legacy OAuth sidecar material when the runtime is resolving
594+
// secrets for an agent turn. Without this, embedded agent runs cannot reach
595+
// the access token for openai-codex profiles whose `oauthRef.source` is
596+
// "openclaw-credentials", and resolveApiKeyForProfile() falls through to
597+
// "No API key found".
598+
resolveLegacyOAuthSidecars: true,
594599
});
595600
}
596601

@@ -604,7 +609,10 @@ export function loadAuthProfileStoreWithoutExternalProfiles(
604609
const options: LoadAuthProfileStoreOptions = {
605610
readOnly: true,
606611
allowKeychainPrompt: loadOptions?.allowKeychainPrompt ?? false,
607-
resolveLegacyOAuthSidecars: loadOptions?.resolveLegacyOAuthSidecars ?? false,
612+
// Default sidecar resolution to true so callers that do not explicitly
613+
// override still pick up legacy OAuth credential material for isolated and
614+
// sub-agent auth resolution paths.
615+
resolveLegacyOAuthSidecars: loadOptions?.resolveLegacyOAuthSidecars ?? true,
608616
};
609617
const store = loadAuthProfileStoreForAgent(agentDir, options);
610618
const authPath = resolveAuthStorePath(agentDir);
@@ -639,20 +647,31 @@ export function ensureAuthProfileStore(
639647

640648
export function ensureAuthProfileStoreWithoutExternalProfiles(
641649
agentDir?: string,
642-
options?: { allowKeychainPrompt?: boolean },
650+
options?: { allowKeychainPrompt?: boolean; resolveLegacyOAuthSidecars?: boolean },
643651
): AuthProfileStore {
644-
const runtimeStore = resolveRuntimeAuthProfileStore(agentDir, options);
652+
// Forward `resolveLegacyOAuthSidecars` through this entry point so embedded
653+
// runner sub-agents and isolated session lanes can read legacy sidecar
654+
// credential material. Default true to match
655+
// `loadAuthProfileStoreWithoutExternalProfiles`.
656+
const resolveLegacyOAuthSidecars = options?.resolveLegacyOAuthSidecars ?? true;
657+
const effectiveOptions: LoadAuthProfileStoreOptions = {
658+
...(options ?? {}),
659+
resolveLegacyOAuthSidecars,
660+
};
661+
const runtimeStore = resolveRuntimeAuthProfileStore(agentDir, effectiveOptions);
645662
if (runtimeStore) {
646663
return runtimeStore;
647664
}
648-
const store = loadAuthProfileStoreForAgent(agentDir, options);
665+
const store = loadAuthProfileStoreForAgent(agentDir, effectiveOptions);
649666
const authPath = resolveAuthStorePath(agentDir);
650667
const mainAuthPath = resolveAuthStorePath();
651668
if (!agentDir || authPath === mainAuthPath) {
652669
return store;
653670
}
654671

655-
const mainStore = loadAuthProfileStoreForAgent(undefined, options);
672+
// Use the same options for the main fallback load; sub-agents that merge in
673+
// the main store need the same legacy sidecar material.
674+
const mainStore = loadAuthProfileStoreForAgent(undefined, effectiveOptions);
656675
return mergeAuthProfileStores(mainStore, store);
657676
}
658677

src/logging/diagnostic-session-attention.test.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,10 @@ describe("classifySessionAttention", () => {
6868
reason: "queued_behind_terminal_active_work",
6969
classification: "stalled_agent_run",
7070
activeWorkKind: "embedded_run",
71-
recoveryEligible: false,
71+
// Terminal progress signal + queued items = lane is effectively
72+
// done with the active turn; recovery coordinator should release
73+
// the lane so the queue can drain.
74+
recoveryEligible: true,
7275
},
7376
},
7477
{

src/logging/diagnostic-session-attention.ts

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ export type SessionAttentionClassification =
1414
reason: string;
1515
classification: "blocked_tool_call" | "stalled_agent_run";
1616
activeWorkKind?: DiagnosticSessionActiveWorkKind;
17-
recoveryEligible: false;
17+
recoveryEligible: boolean;
1818
}
1919
| {
2020
eventType: "session.stuck";
@@ -48,12 +48,17 @@ export function classifySessionAttention(params: {
4848
params.activity.activeWorkKind === "embedded_run" &&
4949
isTerminalDiagnosticProgressReason(params.activity.lastProgressReason)
5050
) {
51+
// The active embedded_run has emitted a terminal progress signal
52+
// (e.g., `rawResponseItem/completed`, `embedded_run:ended`) yet the
53+
// lane is still holding queued items. The work signal indicates the
54+
// active turn is effectively done, so allow the recovery coordinator
55+
// to release the lane and let the queue drain.
5156
return {
5257
eventType: "session.stalled",
5358
reason: "queued_behind_terminal_active_work",
5459
classification: "stalled_agent_run",
5560
activeWorkKind: params.activity.activeWorkKind,
56-
recoveryEligible: false,
61+
recoveryEligible: true,
5762
};
5863
}
5964
if ((params.activity.lastProgressAgeMs ?? 0) > params.staleMs) {

src/logging/diagnostic.ts

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import { monitorEventLoopDelay, performance } from "node:perf_hooks";
2+
import { resolveEmbeddedSessionLane } from "../agents/pi-embedded-runner/lanes.js";
23
import { getRuntimeConfig } from "../config/config.js";
34
import { resolveAllAgentSessionStoreTargetsSync } from "../config/sessions/targets.js";
45
import type { OpenClawConfig } from "../config/types.openclaw.js";
@@ -9,6 +10,7 @@ import {
910
type DiagnosticPhaseSnapshot,
1011
type DiagnosticLivenessWarningReason,
1112
} from "../infra/diagnostic-events.js";
13+
import { resetCommandLane } from "../process/command-queue.js";
1214
import { emitDiagnosticMemorySample, resetDiagnosticMemoryForTest } from "./diagnostic-memory.js";
1315
import {
1416
getCurrentDiagnosticPhase,
@@ -737,6 +739,22 @@ export function logSessionStateChange(
737739
if (params.state === "idle") {
738740
state.queueDepth = Math.max(0, state.queueDepth - 1);
739741
state.activeQueuedTurn = false;
742+
// Belt-and-suspenders: if the lane returns to idle but still has queued
743+
// items, force a pump. Normally `drainLane` re-fires recursively after
744+
// each task completes, but in production we have observed lanes that go
745+
// `idle` with `queueDepth > 0` and never dequeue (e.g., after an embedded
746+
// run ends with terminal progress). Calling `resetCommandLane` bumps the
747+
// lane generation, clears any stale `activeTaskIds`, and re-invokes
748+
// `drainLane` — a no-op when the lane queue is already empty.
749+
if (state.queueDepth > 0 && state.sessionKey) {
750+
try {
751+
resetCommandLane(resolveEmbeddedSessionLane(state.sessionKey));
752+
} catch (err) {
753+
diag.warn(
754+
`lane-pump-on-idle failed: sessionKey=${state.sessionKey} error="${String(err)}"`,
755+
);
756+
}
757+
}
740758
}
741759
if (!isProbeSession && diag.isEnabled("debug")) {
742760
diag.debug(
@@ -1116,6 +1134,14 @@ export function startDiagnosticHeartbeat(
11161134
thresholdMs: stuckSessionWarnMs,
11171135
abortThresholdMs: stuckSessionAbortMs,
11181136
});
1137+
const activeAbortRecoveryEligible =
1138+
classification !== undefined &&
1139+
isActiveAbortRecoveryEligible({
1140+
classification,
1141+
activity,
1142+
ageMs: attentionAgeMs,
1143+
stuckSessionAbortMs,
1144+
});
11191145
if (classification?.recoveryEligible) {
11201146
requestStuckSessionRecovery({
11211147
recover: opts?.recoverStuckSession ?? recoverStuckSession,
@@ -1125,19 +1151,12 @@ export function startDiagnosticHeartbeat(
11251151
sessionKey: state.sessionKey,
11261152
ageMs: attentionAgeMs,
11271153
queueDepth: state.queueDepth,
1154+
...(activeAbortRecoveryEligible ? { allowActiveAbort: true } : {}),
11281155
expectedState: state.state,
11291156
stateGeneration: state.generation,
11301157
},
11311158
});
1132-
} else if (
1133-
classification &&
1134-
isActiveAbortRecoveryEligible({
1135-
classification,
1136-
activity,
1137-
ageMs: attentionAgeMs,
1138-
stuckSessionAbortMs,
1139-
})
1140-
) {
1159+
} else if (classification && activeAbortRecoveryEligible) {
11411160
requestStuckSessionRecovery({
11421161
recover: opts?.recoverStuckSession ?? recoverStuckSession,
11431162
classification,

0 commit comments

Comments
 (0)