@@ -37,7 +37,7 @@ const DEFAULT_AGENT_JOB_CONCURRENCY: usize = 16;
3737const MAX_AGENT_JOB_CONCURRENCY : usize = 64 ;
3838const STATUS_POLL_INTERVAL : Duration = Duration :: from_millis ( 250 ) ;
3939const PROGRESS_EMIT_INTERVAL : Duration = Duration :: from_secs ( 1 ) ;
40- const RUNNING_ITEM_TIMEOUT : Duration = Duration :: from_secs ( 60 * 30 ) ;
40+ const DEFAULT_AGENT_JOB_ITEM_TIMEOUT : Duration = Duration :: from_secs ( 60 * 30 ) ;
4141
4242#[ derive( Debug , Deserialize ) ]
4343struct SpawnAgentsOnCsvArgs {
@@ -49,6 +49,7 @@ struct SpawnAgentsOnCsvArgs {
4949 output_schema : Option < Value > ,
5050 max_concurrency : Option < usize > ,
5151 max_workers : Option < usize > ,
52+ max_runtime_seconds : Option < u64 > ,
5253}
5354
5455#[ derive( Debug , Deserialize ) ]
@@ -310,13 +311,15 @@ mod spawn_agents_on_csv {
310311 let job_name = args
311312 . job_name
312313 . unwrap_or_else ( || format ! ( "agent-job-{job_suffix}" ) ) ;
314+ let max_runtime_seconds = normalize_max_runtime_seconds ( args. max_runtime_seconds ) ?;
313315 let _job = db
314316 . create_agent_job (
315317 & codex_state:: AgentJobCreateParams {
316318 id : job_id. clone ( ) ,
317319 name : job_name,
318320 instruction : args. instruction ,
319321 auto_export : true ,
322+ max_runtime_seconds,
320323 output_schema_json : args. output_schema ,
321324 input_headers : headers,
322325 input_csv_path : input_path. display ( ) . to_string ( ) ,
@@ -527,6 +530,18 @@ fn normalize_concurrency(requested: Option<usize>, max_threads: Option<usize>) -
527530 }
528531}
529532
533+ fn normalize_max_runtime_seconds ( requested : Option < u64 > ) -> Result < Option < u64 > , FunctionCallError > {
534+ let Some ( requested) = requested else {
535+ return Ok ( None ) ;
536+ } ;
537+ if requested == 0 {
538+ return Err ( FunctionCallError :: RespondToModel (
539+ "max_runtime_seconds must be >= 1" . to_string ( ) ,
540+ ) ) ;
541+ }
542+ Ok ( Some ( requested) )
543+ }
544+
530545async fn run_agent_job_loop (
531546 session : Arc < Session > ,
532547 turn : Arc < TurnContext > ,
@@ -536,18 +551,19 @@ async fn run_agent_job_loop(
536551) -> anyhow:: Result < ( ) > {
537552 let mut active_items: HashMap < ThreadId , ActiveJobItem > = HashMap :: new ( ) ;
538553 let mut progress_emitter = JobProgressEmitter :: new ( ) ;
554+ let job = db
555+ . get_agent_job ( job_id. as_str ( ) )
556+ . await ?
557+ . ok_or_else ( || anyhow:: anyhow!( "agent job {job_id} was not found" ) ) ?;
558+ let runtime_timeout = job_runtime_timeout ( & job) ;
539559 recover_running_items (
540560 session. clone ( ) ,
541561 db. clone ( ) ,
542562 job_id. as_str ( ) ,
543563 & mut active_items,
564+ runtime_timeout,
544565 )
545566 . await ?;
546-
547- let job = db
548- . get_agent_job ( job_id. as_str ( ) )
549- . await ?
550- . ok_or_else ( || anyhow:: anyhow!( "agent job {job_id} was not found" ) ) ?;
551567 let initial_progress = db. get_agent_job_progress ( job_id. as_str ( ) ) . await ?;
552568 progress_emitter
553569 . maybe_emit ( & session, & turn, job_id. as_str ( ) , & initial_progress, true )
@@ -632,6 +648,7 @@ async fn run_agent_job_loop(
632648 db. clone ( ) ,
633649 job_id. as_str ( ) ,
634650 & mut active_items,
651+ runtime_timeout,
635652 )
636653 . await ?
637654 {
@@ -708,14 +725,14 @@ async fn recover_running_items(
708725 db : Arc < codex_state:: StateRuntime > ,
709726 job_id : & str ,
710727 active_items : & mut HashMap < ThreadId , ActiveJobItem > ,
728+ runtime_timeout : Duration ,
711729) -> anyhow:: Result < ( ) > {
712730 let running_items = db
713731 . list_agent_job_items ( job_id, Some ( codex_state:: AgentJobItemStatus :: Running ) , None )
714732 . await ?;
715733 for item in running_items {
716- if is_item_stale ( & item) {
717- let error_message =
718- format ! ( "worker exceeded max runtime of {:?}" , RUNNING_ITEM_TIMEOUT ) ;
734+ if is_item_stale ( & item, runtime_timeout) {
735+ let error_message = format ! ( "worker exceeded max runtime of {runtime_timeout:?}" ) ;
719736 db. mark_agent_job_item_failed ( job_id, item. item_id . as_str ( ) , error_message. as_str ( ) )
720737 . await ?;
721738 continue ;
@@ -782,18 +799,19 @@ async fn reap_stale_active_items(
782799 db : Arc < codex_state:: StateRuntime > ,
783800 job_id : & str ,
784801 active_items : & mut HashMap < ThreadId , ActiveJobItem > ,
802+ runtime_timeout : Duration ,
785803) -> anyhow:: Result < bool > {
786804 let mut stale = Vec :: new ( ) ;
787805 for ( thread_id, item) in active_items. iter ( ) {
788- if item. started_at . elapsed ( ) >= RUNNING_ITEM_TIMEOUT {
806+ if item. started_at . elapsed ( ) >= runtime_timeout {
789807 stale. push ( ( * thread_id, item. item_id . clone ( ) ) ) ;
790808 }
791809 }
792810 if stale. is_empty ( ) {
793811 return Ok ( false ) ;
794812 }
795813 for ( thread_id, item_id) in stale {
796- let error_message = format ! ( "worker exceeded max runtime of {:?}" , RUNNING_ITEM_TIMEOUT ) ;
814+ let error_message = format ! ( "worker exceeded max runtime of {runtime_timeout :?}" ) ;
797815 db. mark_agent_job_item_failed ( job_id, item_id. as_str ( ) , error_message. as_str ( ) )
798816 . await ?;
799817 let _ = session
@@ -922,6 +940,12 @@ fn ensure_unique_headers(headers: &[String]) -> Result<(), FunctionCallError> {
922940 Ok ( ( ) )
923941}
924942
943+ fn job_runtime_timeout ( job : & codex_state:: AgentJob ) -> Duration {
944+ job. max_runtime_seconds
945+ . map ( Duration :: from_secs)
946+ . unwrap_or ( DEFAULT_AGENT_JOB_ITEM_TIMEOUT )
947+ }
948+
925949fn started_at_from_item ( item : & codex_state:: AgentJobItem ) -> Instant {
926950 let now = chrono:: Utc :: now ( ) ;
927951 let age = now. signed_duration_since ( item. updated_at ) ;
@@ -932,10 +956,10 @@ fn started_at_from_item(item: &codex_state::AgentJobItem) -> Instant {
932956 }
933957}
934958
935- fn is_item_stale ( item : & codex_state:: AgentJobItem ) -> bool {
959+ fn is_item_stale ( item : & codex_state:: AgentJobItem , runtime_timeout : Duration ) -> bool {
936960 let now = chrono:: Utc :: now ( ) ;
937961 if let Ok ( age) = now. signed_duration_since ( item. updated_at ) . to_std ( ) {
938- age >= RUNNING_ITEM_TIMEOUT
962+ age >= runtime_timeout
939963 } else {
940964 false
941965 }
0 commit comments