11import path from "node:path" ;
2- import { abortAgentHarnessRun } from "openclaw/plugin-sdk/agent-harness-runtime" ;
32import { describe , expect , it , vi } from "vitest" ;
43import { CODEX_GPT5_BEHAVIOR_CONTRACT } from "../../prompt-overlay.js" ;
54import type { CodexServerNotification } from "./protocol.js" ;
@@ -18,48 +17,52 @@ import {
1817
1918setupRunAttemptTestHooks ( ) ;
2019
21- function createSteeringParams ( name : string ) {
20+ let steeringSessionIndex = 0 ;
21+
22+ function createSteeringParams ( ) {
23+ const sessionId = `steering-session-${ ++ steeringSessionIndex } ` ;
2224 const params = createParams (
23- path . join ( tempDir , `${ name } .jsonl` ) ,
24- path . join ( tempDir , `${ name } -workspace` ) ,
25+ path . join ( tempDir , `${ sessionId } .jsonl` ) ,
26+ path . join ( tempDir , `${ sessionId } -workspace` ) ,
2527 ) ;
26- params . sessionId = `session-${ name } ` ;
27- params . sessionKey = `agent:main:session-${ name } ` ;
28+ params . sessionId = sessionId ;
29+ params . sessionKey = `agent:main:${ sessionId } ` ;
30+ params . runId = `run-${ sessionId } ` ;
2831 return params ;
2932}
3033
31- async function queueActiveRunMessageEventually (
34+ async function waitAndQueueActiveRunMessage (
3235 sessionId : string ,
3336 text : string ,
3437 options ?: Parameters < typeof queueActiveRunMessageForTest > [ 2 ] ,
3538) {
36- await vi . waitFor (
37- ( ) => expect ( queueActiveRunMessageForTest ( sessionId , text , options ) ) . toBe ( true ) ,
38- fastWait ,
39- ) ;
39+ let queued = false ;
40+ await vi . waitFor ( ( ) => {
41+ if ( ! queued ) {
42+ queued = queueActiveRunMessageForTest ( sessionId , text , options ) ;
43+ }
44+ expect ( queued ) . toBe ( true ) ;
45+ } , fastWait ) ;
4046}
4147
4248describe ( "runCodexAppServerAttempt steering" , ( ) => {
43- it ( "forwards queued user input and aborts the active app-server turn" , async ( ) => {
44- const { requests, waitForMethod } = createStartedThreadHarness ( ) ;
45- const params = createSteeringParams ( "steering-forward" ) ;
49+ it ( "forwards queued user input to the active app-server turn" , async ( ) => {
50+ const { requests, waitForMethod, completeTurn } = createStartedThreadHarness ( ) ;
51+ const params = createSteeringParams ( ) ;
4652
47- const run = runCodexAppServerAttempt ( params , { pluginConfig : { appServer : { mode : "yolo" } } } ) ;
53+ const run = runCodexAppServerAttempt ( params , {
54+ pluginConfig : { appServer : { mode : "yolo" } } ,
55+ } ) ;
4856 await waitForMethod ( "turn/start" ) ;
4957
50- await queueActiveRunMessageEventually ( params . sessionId , "more context" , { debounceMs : 1 } ) ;
58+ await waitAndQueueActiveRunMessage ( params . sessionId , "more context" , { debounceMs : 0 } ) ;
5159 await vi . waitFor (
5260 ( ) => expect ( requests . map ( ( entry ) => entry . method ) ) . toContain ( "turn/steer" ) ,
5361 fastWait ,
5462 ) ;
55- expect ( abortAgentHarnessRun ( params . sessionId ) ) . toBe ( true ) ;
56- await vi . waitFor (
57- ( ) => expect ( requests . map ( ( entry ) => entry . method ) ) . toContain ( "turn/interrupt" ) ,
58- fastWait ,
59- ) ;
6063
61- const result = await run ;
62- expect ( result . aborted ) . toBe ( true ) ;
64+ await completeTurn ( { threadId : "thread-1" , turnId : "turn-1" } ) ;
65+ await run ;
6366 const threadStart = requests . find ( ( entry ) => entry . method === "thread/start" ) ;
6467 const threadStartParams = threadStart ?. params as
6568 | {
@@ -81,27 +84,21 @@ describe("runCodexAppServerAttempt steering", () => {
8184 expectedTurnId : "turn-1" ,
8285 input : [ { type : "text" , text : "more context" , text_elements : [ ] } ] ,
8386 } ) ;
84- const interrupt = requests . find ( ( entry ) => entry . method === "turn/interrupt" ) ;
85- expect ( interrupt ?. params ) . toEqual ( { threadId : "thread-1" , turnId : "turn-1" } ) ;
8687 } ) ;
8788
8889 it ( "accepts message-tool-only steering for active Codex app-server source replies" , async ( ) => {
8990 const { requests, waitForMethod, completeTurn } = createStartedThreadHarness ( ) ;
90- const params = createSteeringParams ( "steering-message-tool" ) ;
91+ const params = createSteeringParams ( ) ;
9192 params . sourceReplyDeliveryMode = "message_tool_only" ;
9293
9394 const run = runCodexAppServerAttempt ( params ) ;
9495 await waitForMethod ( "turn/start" ) ;
9596
96- await queueActiveRunMessageEventually (
97- params . sessionId ,
98- "subagent complete" ,
99- {
100- debounceMs : 1 ,
101- steeringMode : "all" ,
102- sourceReplyDeliveryMode : "message_tool_only" ,
103- } ,
104- ) ;
97+ await waitAndQueueActiveRunMessage ( params . sessionId , "subagent complete" , {
98+ debounceMs : 0 ,
99+ steeringMode : "all" ,
100+ sourceReplyDeliveryMode : "message_tool_only" ,
101+ } ) ;
105102
106103 await vi . waitFor (
107104 ( ) =>
@@ -115,53 +112,51 @@ describe("runCodexAppServerAttempt steering", () => {
115112 } ,
116113 } ,
117114 ] ) ,
118- fastWait ,
115+ { interval : 1 } ,
119116 ) ;
120117
121118 await completeTurn ( { threadId : "thread-1" , turnId : "turn-1" } ) ;
122119 await run ;
123120 } ) ;
124121
125- it ( "batches default queued steering before sending turn/steer " , async ( ) => {
122+ it ( "flushes batched default queued steering during normal turn cleanup " , async ( ) => {
126123 const { requests, waitForMethod, completeTurn } = createStartedThreadHarness ( ) ;
127- const params = createSteeringParams ( "steering-batch-default" ) ;
124+ const params = createSteeringParams ( ) ;
128125
129126 const run = runCodexAppServerAttempt ( params ) ;
130127 await waitForMethod ( "turn/start" ) ;
131128
132- await queueActiveRunMessageEventually ( params . sessionId , "first" , { debounceMs : 5 } ) ;
133- expect ( queueActiveRunMessageForTest ( params . sessionId , "second" , { debounceMs : 5 } ) ) . toBe ( true ) ;
134-
135- await vi . waitFor (
136- ( ) =>
137- expect ( requests . filter ( ( entry ) => entry . method === "turn/steer" ) ) . toEqual ( [
138- {
139- method : "turn/steer" ,
140- params : {
141- threadId : "thread-1" ,
142- expectedTurnId : "turn-1" ,
143- input : [
144- { type : "text" , text : "first" , text_elements : [ ] } ,
145- { type : "text" , text : "second" , text_elements : [ ] } ,
146- ] ,
147- } ,
148- } ,
149- ] ) ,
150- fastWait ,
129+ await waitAndQueueActiveRunMessage ( params . sessionId , "first" , { debounceMs : 30_000 } ) ;
130+ expect ( queueActiveRunMessageForTest ( params . sessionId , "second" , { debounceMs : 30_000 } ) ) . toBe (
131+ true ,
151132 ) ;
152133
153134 await completeTurn ( { threadId : "thread-1" , turnId : "turn-1" } ) ;
154135 await run ;
136+
137+ expect ( requests . filter ( ( entry ) => entry . method === "turn/steer" ) ) . toEqual ( [
138+ {
139+ method : "turn/steer" ,
140+ params : {
141+ threadId : "thread-1" ,
142+ expectedTurnId : "turn-1" ,
143+ input : [
144+ { type : "text" , text : "first" , text_elements : [ ] } ,
145+ { type : "text" , text : "second" , text_elements : [ ] } ,
146+ ] ,
147+ } ,
148+ } ,
149+ ] ) ;
155150 } ) ;
156151
157152 it ( "flushes pending default queued steering during normal turn cleanup" , async ( ) => {
158153 const { requests, waitForMethod, completeTurn } = createStartedThreadHarness ( ) ;
159- const params = createSteeringParams ( "steering-flush" ) ;
154+ const params = createSteeringParams ( ) ;
160155
161156 const run = runCodexAppServerAttempt ( params ) ;
162157 await waitForMethod ( "turn/start" ) ;
163158
164- await queueActiveRunMessageEventually ( params . sessionId , "late steer" , { debounceMs : 30_000 } ) ;
159+ await waitAndQueueActiveRunMessage ( params . sessionId , "late steer" , { debounceMs : 30_000 } ) ;
165160
166161 await completeTurn ( { threadId : "thread-1" , turnId : "turn-1" } ) ;
167162 await run ;
@@ -178,44 +173,40 @@ describe("runCodexAppServerAttempt steering", () => {
178173 ] ) ;
179174 } ) ;
180175
181- it ( "batches explicit all-mode steering before sending turn/steer " , async ( ) => {
176+ it ( "flushes batched explicit all-mode steering during normal turn cleanup " , async ( ) => {
182177 const { requests, waitForMethod, completeTurn } = createStartedThreadHarness ( ) ;
183- const params = createSteeringParams ( "steering-batch-all" ) ;
178+ const params = createSteeringParams ( ) ;
184179
185180 const run = runCodexAppServerAttempt ( params ) ;
186181 await waitForMethod ( "turn/start" ) ;
187182
188- await queueActiveRunMessageEventually ( params . sessionId , "first" , {
189- debounceMs : 5 ,
183+ await waitAndQueueActiveRunMessage ( params . sessionId , "first" , {
184+ debounceMs : 30_000 ,
190185 steeringMode : "all" ,
191186 } ) ;
192187 expect (
193188 queueActiveRunMessageForTest ( params . sessionId , "second" , {
194- debounceMs : 5 ,
189+ debounceMs : 30_000 ,
195190 steeringMode : "all" ,
196191 } ) ,
197192 ) . toBe ( true ) ;
198193
199- await vi . waitFor (
200- ( ) =>
201- expect ( requests . filter ( ( entry ) => entry . method === "turn/steer" ) ) . toEqual ( [
202- {
203- method : "turn/steer" ,
204- params : {
205- threadId : "thread-1" ,
206- expectedTurnId : "turn-1" ,
207- input : [
208- { type : "text" , text : "first" , text_elements : [ ] } ,
209- { type : "text" , text : "second" , text_elements : [ ] } ,
210- ] ,
211- } ,
212- } ,
213- ] ) ,
214- fastWait ,
215- ) ;
216-
217194 await completeTurn ( { threadId : "thread-1" , turnId : "turn-1" } ) ;
218195 await run ;
196+
197+ expect ( requests . filter ( ( entry ) => entry . method === "turn/steer" ) ) . toEqual ( [
198+ {
199+ method : "turn/steer" ,
200+ params : {
201+ threadId : "thread-1" ,
202+ expectedTurnId : "turn-1" ,
203+ input : [
204+ { type : "text" , text : "first" , text_elements : [ ] } ,
205+ { type : "text" , text : "second" , text_elements : [ ] } ,
206+ ] ,
207+ } ,
208+ } ,
209+ ] ) ;
219210 } ) ;
220211
221212 it ( "routes request_user_input prompts through the active run follow-up queue" , async ( ) => {
@@ -253,7 +244,7 @@ describe("runCodexAppServerAttempt steering", () => {
253244 } ) as never ,
254245 ) ;
255246
256- const params = createSteeringParams ( "steering-request-input" ) ;
247+ const params = createSteeringParams ( ) ;
257248 params . onBlockReply = vi . fn ( ) ;
258249 const run = runCodexAppServerAttempt ( params ) ;
259250 await vi . waitFor (
@@ -286,7 +277,7 @@ describe("runCodexAppServerAttempt steering", () => {
286277 } ) ;
287278
288279 await vi . waitFor ( ( ) => expect ( params . onBlockReply ) . toHaveBeenCalledTimes ( 1 ) , fastWait ) ;
289- await queueActiveRunMessageEventually ( params . sessionId , "2" ) ;
280+ await waitAndQueueActiveRunMessage ( params . sessionId , "2" ) ;
290281 await expect ( response ) . resolves . toEqual ( {
291282 answers : { mode : { answers : [ "Deep" ] } } ,
292283 } ) ;
0 commit comments