🔌 Asynchronous GRPO#5293
Conversation
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 495f9676da
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: d8c2908d81
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
|
@codex review |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: c22c8fa9dd
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| f"[generate] all {self.max_inflight_tasks} slots busy, " | ||
| f"pending_groups={len(pending_groups)}, waiting for completions..." | ||
| ) | ||
| continue |
There was a problem hiding this comment.
Generate loop never exits on stop with stuck tasks
Low Severity
When stop_event is set but inflight tasks are stuck in _generate_one_turn's infinite retry loop (e.g., vLLM server permanently down), _generate_loop's outer while True never exits. It repeatedly calls asyncio.wait with a 0.1s timeout, getting empty done sets, and loops back without ever checking stop_event on that path. The finally block that cancels tasks is never reached, causing the worker thread to hang.
Additional Locations (1)
| while True: | ||
| t0 = time.time() | ||
| qsize = self.queue.qsize() | ||
| if qsize == 0: |
There was a problem hiding this comment.
This might be redundant with the queue.get(timeout=self.timeout) . The timeout will raise a queue.Empty that we correctly catch
| "completions." | ||
| }, | ||
| ) | ||
| max_tool_calling_iterations: int | None = field( |
There was a problem hiding this comment.
Beautiful, definetly need this !
Nit: Maybe max_turns to be explicit ? In the future maybe we will have chatlike rollouts that don't need to call ?
| free_slots.add(slot) | ||
| logger.debug(f"[slot] freed slot={slot} group={group_id} free_after={len(free_slots)}") | ||
| if task.exception() is not None: | ||
| raise task.exception() |
There was a problem hiding this comment.
I think we shouldnt raise in main loop as it can stop worker_thread silently. But there is a discussion to have here.
In the opened PR #5299 . I opted for dropping the whole group with a warning.
| await self._groups_to_score.put(group) | ||
| while True: | ||
| try: | ||
| self._groups_to_score.put_nowait(group) |
There was a problem hiding this comment.
there is no need for a sync function here.
The while True loop is equivalent to await point. The scheduler will switch the current task is the queue is full and switch to some other coroutine
| # Use put_nowait: if the queue is full at shutdown, skip the sentinel — | ||
| # _score_loop will exit via stop_event check in its outer loop. | ||
| try: | ||
| self._groups_to_score.put_nowait(None) |
There was a problem hiding this comment.
Same as previous self._groups_to_score.put_nowait(group). The async queue mechanic with await point is correct and more efficient
| while not stop_event.is_set(): | ||
| t_wait = time.monotonic() | ||
| try: | ||
| group = await asyncio.wait_for(self._groups_to_score.get(), timeout=0.5) |
There was a problem hiding this comment.
I don't think we need asyncio.wait_for for .get() with Timeout error. We are running in a while True loop, so the await point is the correct implementation.
| rollout_queue=self.rollout_queue, | ||
| model_version_fn=lambda: self.model_version, | ||
| max_staleness=self.args.max_staleness, | ||
| timeout=self.args.vllm_server_timeout, |
There was a problem hiding this comment.
Server timeout reused as queue timeout causes premature stop
Medium Severity
vllm_server_timeout is reused as the RolloutQueueDataset queue poll timeout. This conflates two separate concerns: how long to wait for the vLLM server to start and how long to wait for rollout samples during training. A user who sets a short vllm_server_timeout (because their server is already running) risks premature epoch termination whenever the queue is temporarily empty, such as during weight sync pauses.
Additional Locations (1)
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
There are 3 total unresolved issues (including 2 from previous reviews).
Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
| } | ||
| if metrics_list and metrics_list[0] | ||
| else {} | ||
| ) |
There was a problem hiding this comment.
Metrics collator drops keys missing from first sample
Low Severity
DataCollatorForRollout.torch_call builds the batch metrics dict using only keys from metrics_list[0]. When a batch mixes samples from tool-calling groups (which include tools/call_frequency and tools/failure_frequency keys) and non-tool-calling groups (which lack those keys), any metric key absent from the first sample is silently dropped from the entire batch, leading to inaccurate logged metrics.


Add
AsyncGRPOTrainerNote
This is a first MVP! It doesn't have to be perfect. The goal is to have an initial implementation to build on so we can make improvements in subsequent pull requests:
GRPOTrainer)Adds an async variant of GRPO that decouples rollout generation from training. A background worker continuously streams completions from a vLLM server while the training loop consumes them, so generation and gradient updates overlap instead of alternating.
Architecture
weight_sync_stepssteps, updated weights are transferred to vLLM via NCCL.max_stalenessdiscards samples generated by an outdated policy.What's included
AsyncGRPOConfig/AsyncGRPOTrainerundertrl.experimental.async_grpoAsyncRolloutWorkerwith async generation, scoring, and NCCL weight transfermax_tool_calling_iterationsguard0.17.1Note
Medium Risk
Adds a new asynchronous training path with background rollout generation and NCCL weight transfer to a vLLM server, introducing concurrency and distributed synchronization behavior that may be fragile. Also expands vLLM dependency/version handling, which can affect environments that use
trl[vllm].Overview
Introduces
trl.experimental.async_grpowithAsyncGRPOConfig,AsyncGRPOTrainer, and anAsyncRolloutWorkerthat streams rollouts from an external vLLM server while training consumes a queue, periodically syncing updated weights back to vLLM via NCCL and discarding stale samples.Adds an example script and unit tests (with a stub rollout worker) plus documentation wired into the experimental docs to describe setup and constraints.
Updates vLLM integration to support up to
0.17.1, addsaiohttpto thevllmextra, extendsis_vllm_available()with an optionalmin_version, and ignoresuv.lockin.gitignore.Written by Cursor Bugbot for commit 00c802b. This will update automatically on new commits. Configure here.