Skip to content

Commit 8eef34d

Browse files
Make agent job runtime timeout configurable
1 parent 96a645e commit 8eef34d

5 files changed

Lines changed: 60 additions & 13 deletions

File tree

codex-rs/core/src/tools/handlers/agent_jobs.rs

Lines changed: 37 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ const DEFAULT_AGENT_JOB_CONCURRENCY: usize = 16;
3737
const MAX_AGENT_JOB_CONCURRENCY: usize = 64;
3838
const STATUS_POLL_INTERVAL: Duration = Duration::from_millis(250);
3939
const PROGRESS_EMIT_INTERVAL: Duration = Duration::from_secs(1);
40-
const RUNNING_ITEM_TIMEOUT: Duration = Duration::from_secs(60 * 30);
40+
const DEFAULT_AGENT_JOB_ITEM_TIMEOUT: Duration = Duration::from_secs(60 * 30);
4141

4242
#[derive(Debug, Deserialize)]
4343
struct SpawnAgentsOnCsvArgs {
@@ -49,6 +49,7 @@ struct SpawnAgentsOnCsvArgs {
4949
output_schema: Option<Value>,
5050
max_concurrency: Option<usize>,
5151
max_workers: Option<usize>,
52+
max_runtime_seconds: Option<u64>,
5253
}
5354

5455
#[derive(Debug, Deserialize)]
@@ -310,13 +311,15 @@ mod spawn_agents_on_csv {
310311
let job_name = args
311312
.job_name
312313
.unwrap_or_else(|| format!("agent-job-{job_suffix}"));
314+
let max_runtime_seconds = normalize_max_runtime_seconds(args.max_runtime_seconds)?;
313315
let _job = db
314316
.create_agent_job(
315317
&codex_state::AgentJobCreateParams {
316318
id: job_id.clone(),
317319
name: job_name,
318320
instruction: args.instruction,
319321
auto_export: true,
322+
max_runtime_seconds,
320323
output_schema_json: args.output_schema,
321324
input_headers: headers,
322325
input_csv_path: input_path.display().to_string(),
@@ -527,6 +530,18 @@ fn normalize_concurrency(requested: Option<usize>, max_threads: Option<usize>) -
527530
}
528531
}
529532

533+
fn normalize_max_runtime_seconds(requested: Option<u64>) -> Result<Option<u64>, FunctionCallError> {
534+
let Some(requested) = requested else {
535+
return Ok(None);
536+
};
537+
if requested == 0 {
538+
return Err(FunctionCallError::RespondToModel(
539+
"max_runtime_seconds must be >= 1".to_string(),
540+
));
541+
}
542+
Ok(Some(requested))
543+
}
544+
530545
async fn run_agent_job_loop(
531546
session: Arc<Session>,
532547
turn: Arc<TurnContext>,
@@ -536,18 +551,19 @@ async fn run_agent_job_loop(
536551
) -> anyhow::Result<()> {
537552
let mut active_items: HashMap<ThreadId, ActiveJobItem> = HashMap::new();
538553
let mut progress_emitter = JobProgressEmitter::new();
554+
let job = db
555+
.get_agent_job(job_id.as_str())
556+
.await?
557+
.ok_or_else(|| anyhow::anyhow!("agent job {job_id} was not found"))?;
558+
let runtime_timeout = job_runtime_timeout(&job);
539559
recover_running_items(
540560
session.clone(),
541561
db.clone(),
542562
job_id.as_str(),
543563
&mut active_items,
564+
runtime_timeout,
544565
)
545566
.await?;
546-
547-
let job = db
548-
.get_agent_job(job_id.as_str())
549-
.await?
550-
.ok_or_else(|| anyhow::anyhow!("agent job {job_id} was not found"))?;
551567
let initial_progress = db.get_agent_job_progress(job_id.as_str()).await?;
552568
progress_emitter
553569
.maybe_emit(&session, &turn, job_id.as_str(), &initial_progress, true)
@@ -632,6 +648,7 @@ async fn run_agent_job_loop(
632648
db.clone(),
633649
job_id.as_str(),
634650
&mut active_items,
651+
runtime_timeout,
635652
)
636653
.await?
637654
{
@@ -708,14 +725,14 @@ async fn recover_running_items(
708725
db: Arc<codex_state::StateRuntime>,
709726
job_id: &str,
710727
active_items: &mut HashMap<ThreadId, ActiveJobItem>,
728+
runtime_timeout: Duration,
711729
) -> anyhow::Result<()> {
712730
let running_items = db
713731
.list_agent_job_items(job_id, Some(codex_state::AgentJobItemStatus::Running), None)
714732
.await?;
715733
for item in running_items {
716-
if is_item_stale(&item) {
717-
let error_message =
718-
format!("worker exceeded max runtime of {:?}", RUNNING_ITEM_TIMEOUT);
734+
if is_item_stale(&item, runtime_timeout) {
735+
let error_message = format!("worker exceeded max runtime of {runtime_timeout:?}");
719736
db.mark_agent_job_item_failed(job_id, item.item_id.as_str(), error_message.as_str())
720737
.await?;
721738
continue;
@@ -782,18 +799,19 @@ async fn reap_stale_active_items(
782799
db: Arc<codex_state::StateRuntime>,
783800
job_id: &str,
784801
active_items: &mut HashMap<ThreadId, ActiveJobItem>,
802+
runtime_timeout: Duration,
785803
) -> anyhow::Result<bool> {
786804
let mut stale = Vec::new();
787805
for (thread_id, item) in active_items.iter() {
788-
if item.started_at.elapsed() >= RUNNING_ITEM_TIMEOUT {
806+
if item.started_at.elapsed() >= runtime_timeout {
789807
stale.push((*thread_id, item.item_id.clone()));
790808
}
791809
}
792810
if stale.is_empty() {
793811
return Ok(false);
794812
}
795813
for (thread_id, item_id) in stale {
796-
let error_message = format!("worker exceeded max runtime of {:?}", RUNNING_ITEM_TIMEOUT);
814+
let error_message = format!("worker exceeded max runtime of {runtime_timeout:?}");
797815
db.mark_agent_job_item_failed(job_id, item_id.as_str(), error_message.as_str())
798816
.await?;
799817
let _ = session
@@ -922,6 +940,12 @@ fn ensure_unique_headers(headers: &[String]) -> Result<(), FunctionCallError> {
922940
Ok(())
923941
}
924942

943+
fn job_runtime_timeout(job: &codex_state::AgentJob) -> Duration {
944+
job.max_runtime_seconds
945+
.map(Duration::from_secs)
946+
.unwrap_or(DEFAULT_AGENT_JOB_ITEM_TIMEOUT)
947+
}
948+
925949
fn started_at_from_item(item: &codex_state::AgentJobItem) -> Instant {
926950
let now = chrono::Utc::now();
927951
let age = now.signed_duration_since(item.updated_at);
@@ -932,10 +956,10 @@ fn started_at_from_item(item: &codex_state::AgentJobItem) -> Instant {
932956
}
933957
}
934958

935-
fn is_item_stale(item: &codex_state::AgentJobItem) -> bool {
959+
fn is_item_stale(item: &codex_state::AgentJobItem, runtime_timeout: Duration) -> bool {
936960
let now = chrono::Utc::now();
937961
if let Ok(age) = now.signed_duration_since(item.updated_at).to_std() {
938-
age >= RUNNING_ITEM_TIMEOUT
962+
age >= runtime_timeout
939963
} else {
940964
false
941965
}

codex-rs/core/src/tools/spec.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -549,6 +549,15 @@ fn create_spawn_agents_on_csv_tool() -> ToolSpec {
549549
),
550550
},
551551
);
552+
properties.insert(
553+
"max_runtime_seconds".to_string(),
554+
JsonSchema::Number {
555+
description: Some(
556+
"Maximum runtime per worker before it is failed. Defaults to 1800 seconds."
557+
.to_string(),
558+
),
559+
},
560+
);
552561
properties.insert(
553562
"output_schema".to_string(),
554563
JsonSchema::Object {

codex-rs/state/migrations/0009_agent_jobs.sql

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ CREATE TABLE agent_jobs (
88
input_csv_path TEXT NOT NULL,
99
output_csv_path TEXT NOT NULL,
1010
auto_export INTEGER NOT NULL DEFAULT 1,
11+
max_runtime_seconds INTEGER,
1112
created_at INTEGER NOT NULL,
1213
updated_at INTEGER NOT NULL,
1314
started_at INTEGER,

codex-rs/state/src/model/agent_job.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@ pub struct AgentJob {
8080
pub status: AgentJobStatus,
8181
pub instruction: String,
8282
pub auto_export: bool,
83+
pub max_runtime_seconds: Option<u64>,
8384
pub output_schema_json: Option<Value>,
8485
pub input_headers: Vec<String>,
8586
pub input_csv_path: String,
@@ -124,6 +125,7 @@ pub struct AgentJobCreateParams {
124125
pub name: String,
125126
pub instruction: String,
126127
pub auto_export: bool,
128+
pub max_runtime_seconds: Option<u64>,
127129
pub output_schema_json: Option<Value>,
128130
pub input_headers: Vec<String>,
129131
pub input_csv_path: String,
@@ -145,6 +147,7 @@ pub(crate) struct AgentJobRow {
145147
pub(crate) status: String,
146148
pub(crate) instruction: String,
147149
pub(crate) auto_export: i64,
150+
pub(crate) max_runtime_seconds: Option<i64>,
148151
pub(crate) output_schema_json: Option<String>,
149152
pub(crate) input_headers_json: String,
150153
pub(crate) input_csv_path: String,
@@ -164,6 +167,7 @@ impl AgentJobRow {
164167
status: row.try_get("status")?,
165168
instruction: row.try_get("instruction")?,
166169
auto_export: row.try_get("auto_export")?,
170+
max_runtime_seconds: row.try_get("max_runtime_seconds")?,
167171
output_schema_json: row.try_get("output_schema_json")?,
168172
input_headers_json: row.try_get("input_headers_json")?,
169173
input_csv_path: row.try_get("input_csv_path")?,
@@ -187,12 +191,18 @@ impl TryFrom<AgentJobRow> for AgentJob {
187191
.map(serde_json::from_str)
188192
.transpose()?;
189193
let input_headers = serde_json::from_str(value.input_headers_json.as_str())?;
194+
let max_runtime_seconds = value
195+
.max_runtime_seconds
196+
.map(u64::try_from)
197+
.transpose()
198+
.map_err(|_| anyhow::anyhow!("invalid max_runtime_seconds value"))?;
190199
Ok(Self {
191200
id: value.id,
192201
name: value.name,
193202
status: AgentJobStatus::parse(value.status.as_str())?,
194203
instruction: value.instruction,
195204
auto_export: value.auto_export != 0,
205+
max_runtime_seconds,
196206
output_schema_json,
197207
input_headers,
198208
input_csv_path: value.input_csv_path,

codex-rs/state/src/runtime.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -747,6 +747,7 @@ INSERT INTO agent_jobs (
747747
status,
748748
instruction,
749749
auto_export,
750+
max_runtime_seconds,
750751
output_schema_json,
751752
input_headers_json,
752753
input_csv_path,
@@ -764,6 +765,7 @@ INSERT INTO agent_jobs (
764765
.bind(AgentJobStatus::Pending.as_str())
765766
.bind(params.instruction.as_str())
766767
.bind(i64::from(params.auto_export))
768+
.bind(params.max_runtime_seconds.map(i64::try_from).transpose()?)
767769
.bind(output_schema_json)
768770
.bind(input_headers_json)
769771
.bind(params.input_csv_path.as_str())
@@ -824,6 +826,7 @@ SELECT
824826
status,
825827
instruction,
826828
auto_export,
829+
max_runtime_seconds,
827830
output_schema_json,
828831
input_headers_json,
829832
input_csv_path,

0 commit comments

Comments
 (0)