Skip to content

Agent jobs (spawn_agents_on_csv) + progress UI#10935

Merged
daveaitel-openai merged 30 commits intomainfrom
feat/swarmmode-squash
Feb 24, 2026
Merged

Agent jobs (spawn_agents_on_csv) + progress UI#10935
daveaitel-openai merged 30 commits intomainfrom
feat/swarmmode-squash

Conversation

@daveaitel-openai
Copy link
Copy Markdown
Contributor

Summary

  • Add agent job support: spawn a batch of sub-agents from CSV, auto-run, auto-export, and store results in SQLite.
  • Simplify workflow: remove run/resume/get-status/export tools; spawn is deterministic and completes in one call.
  • Improve exec UX: stable, single-line progress bar with ETA; suppress sub-agent chatter in exec.

Why

Enables map-reduce style workflows over arbitrarily large repos using the existing Codex orchestrator. This addresses review feedback about overly complex job controls and non-deterministic monitoring.

Demo (progress bar)

./codex-rs/target/debug/codex exec \
  --enable collab \
  --enable sqlite \
  --full-auto \
  --progress-cursor \
  -c agents.max_threads=16 \
  -C /Users/daveaitel/code/codex \
  - <<'PROMPT'
Create /tmp/agent_job_progress_demo.csv with columns: path,area and 30 rows:
path = item-01..item-30, area = test.

Then call spawn_agents_on_csv with:
- csv_path: /tmp/agent_job_progress_demo.csv
- instruction: "Run `python - <<'PY'` to sleep a random 0.3–1.2s, then output JSON with keys: path, score (int). Set score = 1."
- output_csv_path: /tmp/agent_job_progress_demo_out.csv
PROMPT

Review feedback addressed

  • Auto-start jobs on spawn; removed run/resume/status/export tools.
  • Auto-export on success.
  • More descriptive tool spec + clearer prompts.
  • Avoid deadlocks on spawn failure; pending/running handled safely.
  • Progress bar no longer scrolls; stable single-line redraw.

Tests

  • cd codex-rs && cargo test -p codex-exec
  • cd codex-rs && cargo build -p codex-cli

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Feb 6, 2026

All contributors have signed the CLA ✍️ ✅
Posted by the CLA Assistant Lite bot.

Copy link
Copy Markdown
Collaborator

@jif-oai jif-oai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A few global points:

  • We need more integration tests
  • We don't have a mechanism to recover a sub-agent that crashes. In those cases, the job will stay as "Running" forever I think
  • Implementation is way cleaner than before. Thanks for this
  • Is it on purpose that this is only supported by codex exec for now? I think this might also be relevant for the app-server but it can come in a follow-up

csv_path: String,
instruction: String,
id_column: Option<String>,
job_name: Option<String>,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not seem to be used anywhere... so I'm not sure this is interesting

}

fn normalize_concurrency(requested: Option<usize>, max_threads: Option<usize>) -> usize {
let requested = requested.unwrap_or(64).max(1);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Can we make the 64 a const somewhere?
  • In a follow-up we should make this configurable
  • 64 is gigantic. Any normal user would instantaneously get rate limited

ToolSpec::Function(ResponsesApiTool {
name: "report_agent_job_result".to_string(),
description:
"Worker-only tool to report a result for an agent job item. Main agents should not call this."
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This description is pretty strange from an agent point of view. If the main agent should not call this, we should just not give access to the tool to the main agent. Should be quite easy to do

ToolSpec::Function(ResponsesApiTool {
name: "report_agent_job_result".to_string(),
description:
"Worker-only tool to report a result for an agent job item. Main agents should not call this."
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if the worker does not call this tool? We could just use a structured output if we want something more deterministic

builder.register_handler("close_agent", collab_handler);
}

if config.collab_tools {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just enabled collab globally so feel free to create a new feature flag if you want

@@ -0,0 +1,2 @@
ALTER TABLE agent_jobs
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need for a dedicated migration... this is not merged yet so just put everything in the same migration
Having tons of migrations just makes things harder to follow IMO

.unwrap_or_else(|| format!("row-{row_index}"));
if !seen_ids.insert(item_id.clone()) {
item_id = format!("{item_id}-{row_index}");
seen_ids.insert(item_id.clone());
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can still violate uniqueness if you have duplicate item_id (unlikely but possible)

let row_object = headers
.iter()
.zip(row.iter())
.map(|(header, value)| (header.clone(), Value::String(value.clone())))
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we 100% sure header is unique? I'm not sure the validation will never leak. A few tests would be nice here

job_id = ?
AND item_id = ?
AND status = ?
AND (assigned_thread_id IS NULL OR assigned_thread_id = ?)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this? That sounds racy... what if report_agent_job_item_result accepts reports when assigned_thread_id is NULL? Any caller can claim an item before the worker thread is set. That can misattribute results in the Running -> set_thread window

@etraut-openai etraut-openai added the oai PRs contributed by OpenAI employees label Feb 9, 2026
@daveaitel-openai
Copy link
Copy Markdown
Contributor Author

Thanks for the thoughtful review, really appreciated. I went through each point and addressed them as follows:

  1. “We need more integration tests.”
    Agreed. I added integration coverage for the agent job flow (spawn, report, export).
    Commit: 904d050
  2. “Job can get stuck running forever if a worker fails.”
    Fixed. We now mark spawn failures as failed immediately (so they don’t linger in Running), and we added a stale‑running reaper so jobs can’t hang indefinitely. The final job result includes a failure summary so it’s obvious which rows failed and why.
    Commits: 79e19fe, b457217
    Follow‑up improvement: the timeout is now configurable per job (max_runtime_seconds) and via config (agents.job_max_runtime_seconds).
    Commits: 8eef34d, 8177ffa
  3. “Exec‑only for now? app‑server later?”
    That’s intentional for this PR. I left app‑server support as a follow‑up to keep scope contained.
  4. “Spawn args struct seems unused.”
    It is used. SpawnAgentsOnCsvArgs is the deserialized input for spawn_agents_on_csv in codex-rs/core/src/tools/handlers/agent_jobs.rs. The handler reads csv_path, instruction, id_column, job_name, output_schema, max_runtime_seconds, etc. These fields directly drive CSV parsing, job metadata, output schema
    storage, and runtime limits. No code change needed here.
  5. “Make 64 a const; make it configurable; 64 is too high.”
    Done. Extracted a constant, lowered default to 16, and made it configurable.
    Commits: 79e19fe, 96a645e
  6. “report_agent_job_result shouldn’t be visible to main agent.”
    Done. The tool is now worker‑only (gated by session source), so it’s not exposed to the main agent.
    Commit: 9442a57
  7. “What if the worker doesn’t call this tool? Maybe structured output?”
    We decided not to add a fallback. Instead, we document that a missing report is treated as a failure.
    Commit: 7bac3af
  8. “Collab tools enabled globally.”
    No change needed. Existing gating handles the worker‑only tool exposure.
  9. “Rename AgentJobsHandler → BatchJob.”
    Done. Renamed to BatchJobHandler.
    Commit: 8c58fb3
  10. “Progress emitter init time is wrong.”
    Fixed emit timing so “last emit” and “start” aren’t tied together.
    Commit: 79e19fe
  11. “spawn_agents_on_csv needs docs.”
    Added doc comment and improved tool description/semantics.
    Commit: 8c58fb3
  12. “Missing state DB should be fatal.”
    Done. It’s now a fatal error rather than RespondToModel.
    Commit: 984a369
  13. “Migrations: fold 0010 into 0009.”
    Done. Auto‑export folded into the base migration.
    Commit: 174861c
  14. “Duplicate item_id / header uniqueness.”
    Added unique header validation + robust dedupe for item IDs.
    Commit: 79e19fe
  15. “Thread assignment race in report.”
    Fixed the SQL race around assigned_thread_id.
    Commit: 4d2f379
  16. “Tool descriptions unclear.”
    Updated descriptions and clarified reporting semantics.
    Commits: fae51af, 7bac3af

Additional fixes since the review (from local failures while testing)

  • SQLite state DB default for subagents
    Added agents.sqlite_home + CODEX_SQLITE_HOME and default to a temp dir when in workspace‑write mode (otherwise CODEX_HOME). State DB now uses this, so subagents don’t fail with “sqlite state db unavailable” in full‑auto runs.
    Commit: ceac6a0
  • Fix agent_jobs insert error
    The agent_jobs INSERT now has the correct number of values (was 14 for 15 columns).
    Commit: ceac6a0
  • spawn_agents_on_csv export guard
    Ensures output CSV is exported even if the tool doesn’t return an output path.
    Commit: ceac6a0

Copy link
Copy Markdown
Collaborator

@jif-oai jif-oai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the prompt work on the previous review. We are close and this is going in a way better direction IMO

pub(crate) const DEFAULT_AGENT_JOB_MAX_RUNTIME_SECONDS: Option<u64> = None;

pub const CONFIG_TOML_FILE: &str = "config.toml";
const SQLITE_HOME_ENV: &str = "CODEX_SQLITE_HOME";
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this part should live in the codex-state crate but feel free to challenge. No very strong opinion tbh

csv_path: String,
instruction: String,
id_column: Option<String>,
job_name: Option<String>,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point here was not the whole struct but just job_name. I'm not sure this is relevant to have it if this is not surfaces. Would be cool later to be able to resume from it though

input_csv_path.with_file_name(format!("{stem}.agent-job-{job_suffix}.csv"))
}

fn parse_csv(content: &str) -> Result<(Vec<String>, Vec<Vec<String>>), String> {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty cool but I would either:

  • Check if there are any existing crates for this if we don't want to re-invent the wheel
  • If not or nothing that suits our need, I would extract this in a small crate in codex-utils-... as I'm quite sure others will need it one day (+ you can put this in a small self-contained PR)

FunctionCallError::RespondToModel(format!("failed to create agent job: {err}"))
})?;

db.mark_agent_job_running(job_id.as_str())
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You first mark it as running but then later you have

let options = build_runner_options(&session, &turn, requested_concurrency).await?;

so this means that if build_runner_options fails, the job stays running forever

pub row_json: Value,
}

#[derive(Debug)]
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just derive FromRow as well (from SQLX) so that you don't need the try_from_row impl

}
}

#[derive(Debug)]
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment for the FromRow (and same comment everywhere actually)

pub(crate) instruction: String,
pub(crate) auto_export: i64,
pub(crate) max_runtime_seconds: Option<i64>,
pub(crate) output_schema_json: Option<String>,
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be in a follow-up but this output_schema_json ticks me a bit. We should just transform it into a JSON-Schema (https://json-schema.org/) and used structured output. This will do a constraint sampling and ensure the schema is always respected. Ok for me not to do in PR but please add in a backlog somewhere (you can assign it to me if you don't want to do it)

.await
{
Ok(thread_id) => thread_id,
Err(CodexErr::AgentLimitReached { .. }) => {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This means we gonna loop while waiting for agent spot to be available right? This can take a lot of time if the sub-agents are handled somewhere else. I don't have a way better solution though

.list_agent_job_items(job_id, Some(codex_state::AgentJobItemStatus::Running), None)
.await?;
for item in running_items {
if is_item_stale(&item, runtime_timeout) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would be good to also try to kill the job just in case. To make sure we don't increase contention on max number of running agents


if config.agent_jobs_tools {
let agent_jobs_handler = Arc::new(BatchJobHandler);
builder.push_spec(create_spawn_agents_on_csv_tool());
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OOC do you expect recursive jobs? Otherwise we could drop those for depth > 0 (opposite as spawn_agents_on_csv)

Copy link
Copy Markdown
Collaborator

@jif-oai jif-oai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok for me with the following comments:

  1. Enjoy the merge with main. Ping me if you want a sanity check after
  2. As follow-ups make sure to keep track of:
    a. Use of a crate for CSV handling or extract in a dedicated crate
    b. Discuss with the TUI and App team to see how can we render this feature
    c. Add the documentation somewhere here https://github.com/openai/developers-website. You can ask @dkundel-openai for help
    d. try to use structured output for the enforcement of the schema
    e. find a solution to limit the looping of AgentLimitReached
    f. as this contain DB migration, make sure an alpha get cut and this alpha is used by VSCE and the app

When view_image returns an input_image, also enqueue it as a user message so nested tool calls (like js_repl) make the image available to the next model request. Log a warning if no active turn is present.
If cargo_bin(codex) fails, derive a nearby codex path from current_exe and use it for codex_linux_sandbox_exe. This keeps sandboxed test helpers working across build layouts.
@daveaitel-openai daveaitel-openai enabled auto-merge (squash) February 24, 2026 19:27
Rewrite the fallback for locating the codex binary to satisfy clippy::collapsible_if while preserving the existing behavior.
@daveaitel-openai
Copy link
Copy Markdown
Contributor Author

I have read the CLA Document and I hereby sign the CLA

github-actions bot added a commit that referenced this pull request Feb 24, 2026
Thread tool call source through ToolInvocation so view_image only injects pending image input for js_repl calls. Update router/tests/handlers to carry the new field.
Replace eprint! with eprintln! for newline output and collapse the columns guard to satisfy clippy::print_with_newline and clippy::collapsible_if.
Wrap agent job progress stats in a struct and replace the newline-only eprint with eprintln to satisfy clippy::too_many_arguments and clippy::print_with_newline.
@daveaitel-openai
Copy link
Copy Markdown
Contributor Author

/merge

@daveaitel-openai daveaitel-openai merged commit dcab401 into main Feb 24, 2026
57 of 61 checks passed
@daveaitel-openai daveaitel-openai deleted the feat/swarmmode-squash branch February 24, 2026 21:00
@github-actions github-actions bot locked and limited conversation to collaborators Feb 24, 2026
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.

Labels

oai PRs contributed by OpenAI employees

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants