Agent jobs (spawn_agents_on_csv) + progress UI#10935
Conversation
|
All contributors have signed the CLA ✍️ ✅ |
jif-oai
left a comment
There was a problem hiding this comment.
A few global points:
- We need more integration tests
- We don't have a mechanism to recover a sub-agent that crashes. In those cases, the job will stay as "Running" forever I think
- Implementation is way cleaner than before. Thanks for this
- Is it on purpose that this is only supported by
codex execfor now? I think this might also be relevant for theapp-serverbut it can come in a follow-up
| csv_path: String, | ||
| instruction: String, | ||
| id_column: Option<String>, | ||
| job_name: Option<String>, |
There was a problem hiding this comment.
This does not seem to be used anywhere... so I'm not sure this is interesting
| } | ||
|
|
||
| fn normalize_concurrency(requested: Option<usize>, max_threads: Option<usize>) -> usize { | ||
| let requested = requested.unwrap_or(64).max(1); |
There was a problem hiding this comment.
- Can we make the 64 a
constsomewhere? - In a follow-up we should make this configurable
- 64 is gigantic. Any normal user would instantaneously get rate limited
| ToolSpec::Function(ResponsesApiTool { | ||
| name: "report_agent_job_result".to_string(), | ||
| description: | ||
| "Worker-only tool to report a result for an agent job item. Main agents should not call this." |
There was a problem hiding this comment.
This description is pretty strange from an agent point of view. If the main agent should not call this, we should just not give access to the tool to the main agent. Should be quite easy to do
| ToolSpec::Function(ResponsesApiTool { | ||
| name: "report_agent_job_result".to_string(), | ||
| description: | ||
| "Worker-only tool to report a result for an agent job item. Main agents should not call this." |
There was a problem hiding this comment.
What if the worker does not call this tool? We could just use a structured output if we want something more deterministic
codex-rs/core/src/tools/spec.rs
Outdated
| builder.register_handler("close_agent", collab_handler); | ||
| } | ||
|
|
||
| if config.collab_tools { |
There was a problem hiding this comment.
I just enabled collab globally so feel free to create a new feature flag if you want
| @@ -0,0 +1,2 @@ | |||
| ALTER TABLE agent_jobs | |||
There was a problem hiding this comment.
No need for a dedicated migration... this is not merged yet so just put everything in the same migration
Having tons of migrations just makes things harder to follow IMO
| .unwrap_or_else(|| format!("row-{row_index}")); | ||
| if !seen_ids.insert(item_id.clone()) { | ||
| item_id = format!("{item_id}-{row_index}"); | ||
| seen_ids.insert(item_id.clone()); |
There was a problem hiding this comment.
this can still violate uniqueness if you have duplicate item_id (unlikely but possible)
| let row_object = headers | ||
| .iter() | ||
| .zip(row.iter()) | ||
| .map(|(header, value)| (header.clone(), Value::String(value.clone()))) |
There was a problem hiding this comment.
are we 100% sure header is unique? I'm not sure the validation will never leak. A few tests would be nice here
codex-rs/state/src/runtime.rs
Outdated
| job_id = ? | ||
| AND item_id = ? | ||
| AND status = ? | ||
| AND (assigned_thread_id IS NULL OR assigned_thread_id = ?) |
There was a problem hiding this comment.
Why this? That sounds racy... what if report_agent_job_item_result accepts reports when assigned_thread_id is NULL? Any caller can claim an item before the worker thread is set. That can misattribute results in the Running -> set_thread window
|
Thanks for the thoughtful review, really appreciated. I went through each point and addressed them as follows:
Additional fixes since the review (from local failures while testing)
|
jif-oai
left a comment
There was a problem hiding this comment.
Thanks for the prompt work on the previous review. We are close and this is going in a way better direction IMO
codex-rs/core/src/config/mod.rs
Outdated
| pub(crate) const DEFAULT_AGENT_JOB_MAX_RUNTIME_SECONDS: Option<u64> = None; | ||
|
|
||
| pub const CONFIG_TOML_FILE: &str = "config.toml"; | ||
| const SQLITE_HOME_ENV: &str = "CODEX_SQLITE_HOME"; |
There was a problem hiding this comment.
I think this part should live in the codex-state crate but feel free to challenge. No very strong opinion tbh
| csv_path: String, | ||
| instruction: String, | ||
| id_column: Option<String>, | ||
| job_name: Option<String>, |
There was a problem hiding this comment.
My point here was not the whole struct but just job_name. I'm not sure this is relevant to have it if this is not surfaces. Would be cool later to be able to resume from it though
| input_csv_path.with_file_name(format!("{stem}.agent-job-{job_suffix}.csv")) | ||
| } | ||
|
|
||
| fn parse_csv(content: &str) -> Result<(Vec<String>, Vec<Vec<String>>), String> { |
There was a problem hiding this comment.
This is pretty cool but I would either:
- Check if there are any existing
cratesfor this if we don't want to re-invent the wheel - If not or nothing that suits our need, I would extract this in a small crate in
codex-utils-...as I'm quite sure others will need it one day (+ you can put this in a small self-contained PR)
| FunctionCallError::RespondToModel(format!("failed to create agent job: {err}")) | ||
| })?; | ||
|
|
||
| db.mark_agent_job_running(job_id.as_str()) |
There was a problem hiding this comment.
You first mark it as running but then later you have
let options = build_runner_options(&session, &turn, requested_concurrency).await?;
so this means that if build_runner_options fails, the job stays running forever
| pub row_json: Value, | ||
| } | ||
|
|
||
| #[derive(Debug)] |
There was a problem hiding this comment.
You can just derive FromRow as well (from SQLX) so that you don't need the try_from_row impl
| } | ||
| } | ||
|
|
||
| #[derive(Debug)] |
There was a problem hiding this comment.
Same comment for the FromRow (and same comment everywhere actually)
| pub(crate) instruction: String, | ||
| pub(crate) auto_export: i64, | ||
| pub(crate) max_runtime_seconds: Option<i64>, | ||
| pub(crate) output_schema_json: Option<String>, |
There was a problem hiding this comment.
This might be in a follow-up but this output_schema_json ticks me a bit. We should just transform it into a JSON-Schema (https://json-schema.org/) and used structured output. This will do a constraint sampling and ensure the schema is always respected. Ok for me not to do in PR but please add in a backlog somewhere (you can assign it to me if you don't want to do it)
| .await | ||
| { | ||
| Ok(thread_id) => thread_id, | ||
| Err(CodexErr::AgentLimitReached { .. }) => { |
There was a problem hiding this comment.
This means we gonna loop while waiting for agent spot to be available right? This can take a lot of time if the sub-agents are handled somewhere else. I don't have a way better solution though
| .list_agent_job_items(job_id, Some(codex_state::AgentJobItemStatus::Running), None) | ||
| .await?; | ||
| for item in running_items { | ||
| if is_item_stale(&item, runtime_timeout) { |
There was a problem hiding this comment.
Would be good to also try to kill the job just in case. To make sure we don't increase contention on max number of running agents
|
|
||
| if config.agent_jobs_tools { | ||
| let agent_jobs_handler = Arc::new(BatchJobHandler); | ||
| builder.push_spec(create_spawn_agents_on_csv_tool()); |
There was a problem hiding this comment.
OOC do you expect recursive jobs? Otherwise we could drop those for depth > 0 (opposite as spawn_agents_on_csv)
ceac6a0 to
8c706ec
Compare
jif-oai
left a comment
There was a problem hiding this comment.
Ok for me with the following comments:
- Enjoy the merge with main. Ping me if you want a sanity check after
- As follow-ups make sure to keep track of:
a. Use of a crate for CSV handling or extract in a dedicated crate
b. Discuss with the TUI and App team to see how can we render this feature
c. Add the documentation somewhere here https://github.com/openai/developers-website. You can ask @dkundel-openai for help
d. try to use structured output for the enforcement of the schema
e. find a solution to limit the looping ofAgentLimitReached
f. as this contain DB migration, make sure an alpha get cut and this alpha is used by VSCE and the app
8c706ec to
17e7f4d
Compare
When view_image returns an input_image, also enqueue it as a user message so nested tool calls (like js_repl) make the image available to the next model request. Log a warning if no active turn is present.
If cargo_bin(codex) fails, derive a nearby codex path from current_exe and use it for codex_linux_sandbox_exe. This keeps sandboxed test helpers working across build layouts.
17e7f4d to
db760f0
Compare
Rewrite the fallback for locating the codex binary to satisfy clippy::collapsible_if while preserving the existing behavior.
|
I have read the CLA Document and I hereby sign the CLA |
Thread tool call source through ToolInvocation so view_image only injects pending image input for js_repl calls. Update router/tests/handlers to carry the new field.
Replace eprint! with eprintln! for newline output and collapse the columns guard to satisfy clippy::print_with_newline and clippy::collapsible_if.
Wrap agent job progress stats in a struct and replace the newline-only eprint with eprintln to satisfy clippy::too_many_arguments and clippy::print_with_newline.
|
/merge |
Summary
Why
Enables map-reduce style workflows over arbitrarily large repos using the existing Codex orchestrator. This addresses review feedback about overly complex job controls and non-deterministic monitoring.
Demo (progress bar)
Review feedback addressed
Tests
cd codex-rs && cargo test -p codex-execcd codex-rs && cargo build -p codex-cli