@@ -9,7 +9,9 @@ const mocks = vi.hoisted(() => ({
99 isEmbeddedPiRunActive : vi . fn ( ) ,
1010 isEmbeddedPiRunHandleActive : vi . fn ( ) ,
1111 getCommandLaneSnapshot : vi . fn ( ) ,
12+ enqueueCommandInLane : vi . fn ( async ( _lane : string , task : ( ) => Promise < unknown > ) => task ( ) ) ,
1213 resetCommandLane : vi . fn ( ) ,
14+ callGateway : vi . fn ( ) ,
1315 resolveActiveEmbeddedRunSessionId : vi . fn ( ) ,
1416 resolveActiveEmbeddedRunHandleSessionId : vi . fn ( ) ,
1517 resolveEmbeddedSessionLane : vi . fn ( ( key : string ) => `session:${ key } ` ) ,
@@ -52,10 +54,15 @@ vi.mock("../agents/pi-embedded-runner/lanes.js", () => ({
5254} ) ) ;
5355
5456vi . mock ( "../process/command-queue.js" , ( ) => ( {
57+ enqueueCommandInLane : mocks . enqueueCommandInLane ,
5558 getCommandLaneSnapshot : mocks . getCommandLaneSnapshot ,
5659 resetCommandLane : mocks . resetCommandLane ,
5760} ) ) ;
5861
62+ vi . mock ( "../gateway/call.js" , ( ) => ( {
63+ callGateway : mocks . callGateway ,
64+ } ) ) ;
65+
5966vi . mock ( "./diagnostic-runtime.js" , ( ) => ( {
6067 diagnosticLogger : mocks . diag ,
6168} ) ) ;
@@ -72,6 +79,12 @@ function resetMocks() {
7279 mocks . isEmbeddedPiRunActive . mockReset ( ) ;
7380 mocks . isEmbeddedPiRunHandleActive . mockReset ( ) ;
7481 mocks . getCommandLaneSnapshot . mockReset ( ) ;
82+ mocks . enqueueCommandInLane . mockReset ( ) ;
83+ mocks . enqueueCommandInLane . mockImplementation (
84+ async ( _lane : string , task : ( ) => Promise < unknown > ) => task ( ) ,
85+ ) ;
86+ mocks . callGateway . mockReset ( ) ;
87+ mocks . callGateway . mockResolvedValue ( { ok : true } ) ;
7588 mocks . getCommandLaneSnapshot . mockReturnValue ( {
7689 lane : "session:agent:main:main" ,
7790 queuedCount : 1 ,
@@ -89,13 +102,6 @@ function resetMocks() {
89102 mocks . diag . warn . mockReset ( ) ;
90103}
91104
92- function warnLogMessages ( ) : string [ ] {
93- return mocks . diag . warn . mock . calls . map ( ( [ message ] ) => {
94- expect ( typeof message ) . toBe ( "string" ) ;
95- return message as string ;
96- } ) ;
97- }
98-
99105describe ( "stuck session recovery" , ( ) => {
100106 beforeEach ( ( ) => {
101107 resetMocks ( ) ;
@@ -115,10 +121,10 @@ describe("stuck session recovery", () => {
115121 expect ( mocks . waitForEmbeddedPiRunEnd ) . not . toHaveBeenCalled ( ) ;
116122 expect ( mocks . forceClearEmbeddedPiRun ) . not . toHaveBeenCalled ( ) ;
117123 expect ( mocks . resetCommandLane ) . not . toHaveBeenCalled ( ) ;
118- expect ( warnLogMessages ( ) ) . toEqual ( [
119- "stuck session recovery skipped: sessionId=session-1 sessionKey=agent:main:main age=180s queueDepth=1 activeSessionId=session-1" ,
120- "stuck session recovery outcome: status=skipped action=observe_only sessionId=session-1 sessionKey=agent:main:main activeSessionId=session-1 activeWorkKind=embedded_run reason=active_embedded_run" ,
121- ] ) ;
124+ expect ( mocks . diag . warn ) . toHaveBeenCalledWith (
125+ expect . stringContaining ( "reason=active_embedded_run" ) ,
126+ ) ;
127+ expect ( mocks . diag . warn ) . toHaveBeenCalledWith ( expect . stringContaining ( "action=observe_only" ) ) ;
122128 } ) ;
123129
124130 it ( "aborts an active embedded run when active abort recovery is enabled" , async ( ) => {
@@ -179,10 +185,15 @@ describe("stuck session recovery", () => {
179185 fs . rmSync ( tempDir , { recursive : true , force : true } ) ;
180186 }
181187
182- expect ( warnLogMessages ( ) ) . toEqual ( [
183- 'stuck session recovery: sessionId=run-456 sessionKey=agent:clawblocker:cron:job-123:run:run-456 age=629s action=abort_embedded_run aborted=true drained=true released=0 stopped="Twitter Mention Moderation Agent" cronJobId=job-123 cronRunId=run-456 lastAssistant="There are 40 cached mentions."' ,
184- "stuck session recovery outcome: status=aborted action=abort_embedded_run sessionId=run-456 sessionKey=agent:clawblocker:cron:job-123:run:run-456 activeSessionId=run-456 activeWorkKind=embedded_run lane=session:agent:clawblocker:cron:job-123:run:run-456 aborted=true drained=true forceCleared=false released=0" ,
185- ] ) ;
188+ expect ( mocks . diag . warn ) . toHaveBeenCalledWith (
189+ expect . stringContaining ( "action=abort_embedded_run" ) ,
190+ ) ;
191+ expect ( mocks . diag . warn ) . toHaveBeenCalledWith (
192+ expect . stringContaining ( 'stopped="Twitter Mention Moderation Agent"' ) ,
193+ ) ;
194+ expect ( mocks . diag . warn ) . toHaveBeenCalledWith (
195+ expect . stringContaining ( 'lastAssistant="There are 40 cached mentions."' ) ,
196+ ) ;
186197 } ) ;
187198
188199 it ( "force-clears and releases the session lane when abort cleanup does not drain" , async ( ) => {
@@ -257,9 +268,12 @@ describe("stuck session recovery", () => {
257268 expect ( mocks . abortEmbeddedPiRun ) . not . toHaveBeenCalled ( ) ;
258269 expect ( mocks . forceClearEmbeddedPiRun ) . not . toHaveBeenCalled ( ) ;
259270 expect ( mocks . resetCommandLane ) . not . toHaveBeenCalled ( ) ;
260- expect ( warnLogMessages ( ) ) . toEqual ( [
261- "stuck session recovery outcome: status=skipped action=keep_lane sessionId=queued-reply-session sessionKey=agent:main:main activeSessionId=queued-reply-session activeWorkKind=embedded_run reason=active_reply_work" ,
262- ] ) ;
271+ expect ( mocks . diag . warn ) . toHaveBeenCalledWith (
272+ expect . stringContaining ( "reason=active_reply_work" ) ,
273+ ) ;
274+ expect ( mocks . diag . warn ) . toHaveBeenCalledWith (
275+ expect . stringContaining ( "activeSessionId=queued-reply-session" ) ,
276+ ) ;
263277 } ) ;
264278
265279 it ( "does not release the session lane while unregistered lane work is active" , async ( ) => {
@@ -286,9 +300,10 @@ describe("stuck session recovery", () => {
286300 expect ( mocks . abortEmbeddedPiRun ) . not . toHaveBeenCalled ( ) ;
287301 expect ( mocks . forceClearEmbeddedPiRun ) . not . toHaveBeenCalled ( ) ;
288302 expect ( mocks . resetCommandLane ) . not . toHaveBeenCalled ( ) ;
289- expect ( warnLogMessages ( ) ) . toEqual ( [
290- "stuck session recovery outcome: status=skipped action=keep_lane sessionId=unregistered-work-session sessionKey=agent:main:main lane=session:agent:main:main reason=active_lane_task laneActive=1 laneQueued=1" ,
291- ] ) ;
303+ expect ( mocks . diag . warn ) . toHaveBeenCalledWith (
304+ expect . stringContaining ( "reason=active_lane_task" ) ,
305+ ) ;
306+ expect ( mocks . diag . warn ) . toHaveBeenCalledWith ( expect . stringContaining ( "laneActive=1" ) ) ;
292307 } ) ;
293308
294309 it ( "reports when recovery finds no active work to release" , async ( ) => {
@@ -304,9 +319,76 @@ describe("stuck session recovery", () => {
304319 } ) ;
305320
306321 expect ( mocks . resetCommandLane ) . toHaveBeenCalledWith ( "session:agent:main:main" ) ;
307- expect ( warnLogMessages ( ) ) . toEqual ( [
308- "stuck session recovery outcome: status=noop action=none sessionId=stale-session sessionKey=agent:main:main lane=session:agent:main:main reason=no_active_work" ,
309- ] ) ;
322+ expect ( mocks . diag . warn ) . toHaveBeenCalledWith ( expect . stringContaining ( "reason=no_active_work" ) ) ;
323+ } ) ;
324+
325+ it ( "closes a stale unfinished topic turn even when task runs are empty" , async ( ) => {
326+ const previousStateDir = process . env . OPENCLAW_STATE_DIR ;
327+ const tempDir = fs . mkdtempSync ( path . join ( os . tmpdir ( ) , "openclaw-stale-reply-turn-" ) ) ;
328+ const sessionKey = "agent:openclaw:telegram:group:-100:topic:2" ;
329+ try {
330+ process . env . OPENCLAW_STATE_DIR = tempDir ;
331+ const { resolveStateDir } = await import ( "../config/paths.js" ) ;
332+ const stateDir = resolveStateDir ( ) ;
333+ const storeDir = path . join ( tempDir , "agents" , "openclaw" , "sessions" ) ;
334+ const resolvedStoreDir = path . join ( stateDir , "agents" , "openclaw" , "sessions" ) ;
335+ fs . mkdirSync ( storeDir , { recursive : true } ) ;
336+ fs . mkdirSync ( resolvedStoreDir , { recursive : true } ) ;
337+ const storeJson = JSON . stringify ( {
338+ [ sessionKey ] : {
339+ sessionId : "session-1" ,
340+ replyTurnState : "running" ,
341+ replyTurnStartedAt : Date . now ( ) - 180_000 ,
342+ replyTurnUpdatedAt : Date . now ( ) - 180_000 ,
343+ deliveryContext : {
344+ channel : "telegram" ,
345+ to : "-100" ,
346+ threadId : "2" ,
347+ } ,
348+ } ,
349+ } ) ;
350+ fs . writeFileSync ( path . join ( storeDir , "sessions.json" ) , storeJson ) ;
351+ fs . writeFileSync ( path . join ( resolvedStoreDir , "sessions.json" ) , storeJson ) ;
352+ mocks . resolveActiveEmbeddedRunHandleSessionId . mockReturnValue ( undefined ) ;
353+ mocks . resolveActiveEmbeddedRunSessionId . mockReturnValue ( undefined ) ;
354+ mocks . isEmbeddedPiRunActive . mockReturnValue ( false ) ;
355+ mocks . resetCommandLane . mockReturnValue ( 0 ) ;
356+
357+ const outcome = await recoverStuckDiagnosticSession ( {
358+ sessionId : "session-1" ,
359+ sessionKey,
360+ ageMs : 180_000 ,
361+ } ) ;
362+
363+ expect ( outcome . status ) . toBe ( "released" ) ;
364+ expect ( "reason" in outcome ? outcome . reason : undefined ) . toBe ( "stale_reply_turn_closed" ) ;
365+ expect ( mocks . enqueueCommandInLane ) . toHaveBeenCalledWith ( "message" , expect . any ( Function ) ) ;
366+ expect ( mocks . callGateway ) . toHaveBeenCalledWith (
367+ expect . objectContaining ( {
368+ method : "message.action" ,
369+ params : expect . objectContaining ( {
370+ action : "send" ,
371+ channel : "telegram" ,
372+ idempotencyKey : `stale-reply-turn-recovery:${ sessionKey } ` ,
373+ params : expect . objectContaining ( {
374+ to : "-100" ,
375+ threadId : "2" ,
376+ message : expect . stringContaining ( "interrupted" ) ,
377+ } ) ,
378+ } ) ,
379+ } ) ,
380+ ) ;
381+ const store = JSON . parse ( fs . readFileSync ( path . join ( storeDir , "sessions.json" ) , "utf8" ) ) ;
382+ expect ( store [ sessionKey ] . replyTurnState ) . toBe ( "failed" ) ;
383+ expect ( store [ sessionKey ] . replyTurnLastError ) . toBe ( "recovered_after_restart" ) ;
384+ } finally {
385+ if ( previousStateDir === undefined ) {
386+ delete process . env . OPENCLAW_STATE_DIR ;
387+ } else {
388+ process . env . OPENCLAW_STATE_DIR = previousStateDir ;
389+ }
390+ fs . rmSync ( tempDir , { recursive : true , force : true } ) ;
391+ }
310392 } ) ;
311393
312394 it ( "releases a stale session-id lane when no session key is available" , async ( ) => {
0 commit comments