Add weight staleness control for fully async rollout#958
Conversation
…trics
Expose the existing Sample.weight_versions (one entry per generation call /
turn) to the training pipeline and logging, enabling downstream consumers
to reason about on-policy freshness.
- Add Sample.oldest_weight_version property (min version across turns)
- Forward weight_versions in _convert_samples_to_train_data and DP split
- Log weight_version/{min,max,mean,std} and mixed_version_ratio in
compute_metrics_from_samples
Made-with: Cursor
In fully async mode, groups may complete long after their generation started, using weights that are now several versions behind. This adds a configurable staleness filter that recycles such groups back to the data buffer instead of sending them to training. - Add --max-weight-staleness CLI arg (None = disabled by default) - group_oldest_weight_version helper for group-level staleness - _CachedWeightVersion: throttled async query to /model_info endpoint - Staleness filter in generate_rollout_async collection loop - End-of-rollout staleness summary (recycled count, avg/max staleness) Made-with: Cursor
There was a problem hiding this comment.
Code Review
This pull request introduces a weight staleness filter for fully asynchronous rollouts to ensure training data remains synchronized with the current model version. Key additions include a throttled engine version query mechanism, a sample recycling process for stale or aborted groups, and enhanced metrics logging for weight version statistics. The review feedback highlights the need to reset additional sample state fields during recycling, recommends optimizing HTTP session management to avoid resource exhaustion, and suggests more robust parsing of weight version values to prevent potential runtime errors.
| def reset_group_for_retry(group: list[Sample]) -> list[Sample]: | ||
| """Reset generated outputs so the original prompts can be sampled again.""" | ||
| for sample in group: | ||
| sample.tokens = [] | ||
| sample.multimodal_train_inputs = None | ||
| sample.response = "" | ||
| sample.response_length = 0 | ||
| sample.reward = None | ||
| sample.loss_mask = None | ||
| sample.weight_versions = [] | ||
| sample.rollout_log_probs = None | ||
| sample.rollout_routed_experts = None | ||
| sample.status = Sample.Status.ABORTED | ||
| sample.non_generation_time = 0.0 | ||
| sample.spec_info = Sample.SpecInfo() | ||
| sample.prefix_cache_info = Sample.PrefixCacheInfo() | ||
| return group |
There was a problem hiding this comment.
The reset_group_for_retry function should also reset the remove_sample and train_metadata fields. If remove_sample is not reset to False, a sample that was previously marked for removal by a filter will remain excluded from training even after a successful retry. Similarly, train_metadata should be cleared to avoid carrying over stale metadata from a previous failed or stale generation attempt.
| def reset_group_for_retry(group: list[Sample]) -> list[Sample]: | |
| """Reset generated outputs so the original prompts can be sampled again.""" | |
| for sample in group: | |
| sample.tokens = [] | |
| sample.multimodal_train_inputs = None | |
| sample.response = "" | |
| sample.response_length = 0 | |
| sample.reward = None | |
| sample.loss_mask = None | |
| sample.weight_versions = [] | |
| sample.rollout_log_probs = None | |
| sample.rollout_routed_experts = None | |
| sample.status = Sample.Status.ABORTED | |
| sample.non_generation_time = 0.0 | |
| sample.spec_info = Sample.SpecInfo() | |
| sample.prefix_cache_info = Sample.PrefixCacheInfo() | |
| return group | |
| def reset_group_for_retry(group: list[Sample]) -> list[Sample]: | |
| """Reset generated outputs so the original prompts can be sampled again.""" | |
| for sample in group: | |
| sample.tokens = [] | |
| sample.multimodal_train_inputs = None | |
| sample.response = "" | |
| sample.response_length = 0 | |
| sample.reward = None | |
| sample.loss_mask = None | |
| sample.weight_versions = [] | |
| sample.rollout_log_probs = None | |
| sample.rollout_routed_experts = None | |
| sample.status = Sample.Status.ABORTED | |
| sample.non_generation_time = 0.0 | |
| sample.spec_info = Sample.SpecInfo() | |
| sample.prefix_cache_info = Sample.PrefixCacheInfo() | |
| sample.remove_sample = False | |
| sample.train_metadata = None | |
| return group |
There was a problem hiding this comment.
This suggestion sounds reasonable ⬆️
| async with aiohttp.ClientSession() as session: | ||
| async with session.get(url, timeout=aiohttp.ClientTimeout(total=2)) as resp: |
There was a problem hiding this comment.
Creating a new aiohttp.ClientSession for every request is an anti-pattern that can lead to socket exhaustion and unnecessary overhead. It is highly recommended to reuse a single session instance. Since this script already depends on miles, you should consider using the shared HTTP utilities from miles.utils.http_utils which are already used in other parts of the codebase (e.g., sglang_rollout.py) and handle session management efficiently. Additionally, ensure that the session lifecycle is managed using a with statement or try...finally block to prevent resource leaks.
References
- To prevent resource leaks (e.g., counters that are not decremented), use constructs like
try...finallyor awithstatement to ensure cleanup logic is always executed, even in the case of exceptions or early returns.
| """Minimum weight version across all turns (generation calls) for this trajectory.""" | ||
| if not self.weight_versions: | ||
| return None | ||
| return min(int(v) for v in self.weight_versions) |
There was a problem hiding this comment.
The int(v) conversion is potentially fragile if the router returns an unexpected format or if weight_versions contains non-integer strings. Adding a check to ensure the value is a digit before conversion would make the property more robust and prevent the training process from crashing due to transient API issues.
| return min(int(v) for v in self.weight_versions) | |
| return min(int(v) for v in self.weight_versions if v is not None and str(v).isdigit()) |
fzyzcjy
left a comment
There was a problem hiding this comment.
general direction lgtm though not checked details
| # don't count as processed for training | ||
| continue | ||
|
|
||
| # Staleness filter: discard groups whose oldest weight version is too far behind |
There was a problem hiding this comment.
qq is it correct tto remove whole group
There was a problem hiding this comment.
Here I would like to be more conversative, also this filter is the same with no_abort filter in the filter_hub, which would drop the entire group as well.
And I think we can add a better filter later, that e.g. if over 50% is aborted, we can drop the group, o.w. we can use partial group in GRPO, or copy some finsihed samples.
| ), | ||
| ) | ||
| parser.add_argument( | ||
| "--max-weight-staleness", |
There was a problem hiding this comment.
qq this is only for that specific example, shall we not put it in arguments.py, but put in example' arguments (like what we do for agentic rollout etc)
There was a problem hiding this comment.
Here I am not fully sure TBH. The --max-weight-staleness is a general async-training concept (not example-specific) and could be reused by other async rollout implementations.
And at the same time, do you think we should move the Staleness filter to a global side? e.g. filter_hub
| def reset_group_for_retry(group: list[Sample]) -> list[Sample]: | ||
| """Reset generated outputs so the original prompts can be sampled again.""" | ||
| for sample in group: | ||
| sample.tokens = [] | ||
| sample.multimodal_train_inputs = None | ||
| sample.response = "" | ||
| sample.response_length = 0 | ||
| sample.reward = None | ||
| sample.loss_mask = None | ||
| sample.weight_versions = [] | ||
| sample.rollout_log_probs = None | ||
| sample.rollout_routed_experts = None | ||
| sample.status = Sample.Status.ABORTED | ||
| sample.non_generation_time = 0.0 | ||
| sample.spec_info = Sample.SpecInfo() | ||
| sample.prefix_cache_info = Sample.PrefixCacheInfo() | ||
| return group |
There was a problem hiding this comment.
This suggestion sounds reasonable ⬆️
| try: | ||
| # add back to buffer so it can be retried or handled by buffer policy | ||
| group = reset_group_for_retry(group) | ||
| data_buffer.add_samples([group]) |
There was a problem hiding this comment.
I'm curious what data structure data_buffer is. It's slightly weird that group is already a list of Samples, yet add_samples adds a list of groups to data_buffer. Might be better to add a type annotation for data_buffer in the function declaration?
There was a problem hiding this comment.
This is a little legacy naming:
If you look into the entire flow, data_buffer is the DataSource structure, and its a RolloutDataSourceWithBuffer, whose code pointer is here: https://github.com/radixark/miles/blob/feat/staleness_control/miles/rollout/data_source.py#L159
There was a problem hiding this comment.
I see. Then would it be better to add the type annotation data_buffer: DataSource?
| if any_aborted: | ||
| try: | ||
| # add back to buffer so it can be retried or handled by buffer policy | ||
| group = reset_group_for_retry(group) |
There was a problem hiding this comment.
Here if I understand correctly, if a group is aborted for any reason, the group is reset and sent for retry. I'm worried that this might give rise to a deadloop: suppose that the task of the group is corrupted (e.g. harbor cannot find the docker image of the task), then the group will fall into this abort -> retry -> abort loop forever.
There was a problem hiding this comment.
Do you think we should add a retry for the sample? But it would be a little complicated (as we don't have counter on every prompt/buffer)
| @property | ||
| def oldest_weight_version(self) -> int | None: | ||
| """Minimum weight version across all turns (generation calls) for this trajectory.""" | ||
| if not self.weight_versions: |
There was a problem hiding this comment.
Curious how the weight_versions of a sample is updated. Is it not in this PR?
There was a problem hiding this comment.
| return min(versions) if versions else None | ||
|
|
||
|
|
||
| def reset_group_for_retry(group: list[Sample]) -> list[Sample]: |
There was a problem hiding this comment.
Nit: could we put all sample self-modify behavior to types.py? Or clear it by an easier way. I am worried about maintance.
yueming-yuan
left a comment
There was a problem hiding this comment.
nit: agree with @guapisolo's comment, may move group reset to main logic? other parts lgtm
…region clusters (#10) * Revert "[BUGFIX] [P2PRDMA] Add rollout post-processing after P2PRDMA weight updates" (radixark#882) * [Fix] fix ci (radixark#894) * Avoid threading for ray getting object (radixark#886) * Add explicit errors for unsupported Megatron profiles (radixark#887) * Add nvfp4 quantizer files (radixark#907) * Bump flash-linear-attention version to 0.4.2 (radixark#892) * [BUGFIX] Invoke "post_process_quantization" by default after weight updating (radixark#890) Co-authored-by: Yueming Yuan <yym022502@gmail.com> * Add heartbeat and id to session server (radixark#866) * fix: adding thin glm5 image to docker build + latest tag sync (radixark#871) * Add consistent hashing routing policy for rollout (radixark#891) Co-authored-by: Yueming Yuan <yueming@Mac.attlocal.net> * [example] add retool v2 example with multi-turn framework interfaces (radixark#654) Co-authored-by: GuanxingLu <gxlu02@gmail.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Expose rollout-batch-size, n-samples-per-prompt, global-batch-size as CLI args in swe-agent-v2 (radixark#954) Co-authored-by: Shi Dong <shi.dong@radixark.ai> * chore: remove obsolete swe-agent server.py and run-qwen3.sh (radixark#952) Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * Add weight staleness control for fully async rollout (radixark#958) * Fix/pause generation mode (radixark#924) Co-authored-by: Yueming Yuan <yym022502@gmail.com> * [v0.5.10][1] Bump sglang to v0.5.10 (radixark#898) * [v0.5.10][2] Fix apply_chat_template behavior for transformers >=5.0 (radixark#926) Co-authored-by: guapisolo <guapisolo@gmail.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * [v0.5.10][3] Fix processor return_tensors duplicate kwarg for transformers >=5.0 (radixark#927) Co-authored-by: guapisolo <guapisolo@gmail.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * [v0.5.10][4] Fix _no_split_modules set not subscriptable in transformers >=5.0 (radixark#931) * [v0.5.10][5] Disable piecewise cuda graph to avoid NVLS oom (radixark#935) * [v0.5.10][6][FSDP] fix outdated weight update logic in FSDP (radixark#948) Co-authored-by: guapisolo <guapisolo@gmail.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Co-authored-by: maocheng23 <35615230+maocheng23@users.noreply.github.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> * [v0.5.10][7][FSDP] move FSDP to experimental and disable by default (radixark#961) * Add skiplist and more robust calculation on val (radixark#965) * [fix] tiny fix debug rollout only in weight version check (radixark#967) * feat: real cp support with relayout fix for qwen3.5 train/rollout mismatch (radixark#885) * [AMD] Upgrade to sglv0.5.10 (radixark#973) * switch model to actor (radixark#756) * [fix] support general logic to bypass fp32 downcast and fix qwen35 A_log dtype (radixark#975) Co-authored-by: yueming-yuan <yym022502@gmail.com> * fix: populate prefix_cache_info in OpenAI/session rollout path (radixark#960) * Remove prepare_harbor_tasks.py; use harbor-private adapters (radixark#982) * [fix] Skip flush_cache in in_place mode and add fully async example (radixark#974) Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * GLM47 full cmd for async and sync reasoning (radixark#986) Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix: handle non-tool appended messages in TITO incremental tokenization (radixark#949) Co-authored-by: Yanbin Jiang <jybsuper@gmail.com> * [docker] Add sgl-model-gateway install and download .tar.gz assets (radixark#895) * [ci] fix hf rate limit error by caching tokenizer loading (radixark#1014) Co-authored-by: maocheng23 <35615230+maocheng23@users.noreply.github.com> * Use load_generate_function in legacy sglang_rollout path (radixark#1016) * Update CODEOWNERS to add new reviewers (radixark#1021) * Support moe lora for gpt-oss (radixark#798) Co-authored-by: Ethan (Yusheng) Su <yushengsu.thu@gmail.com> * [fix] restore expert_bias to fp32 before bridge weight export (radixark#811) * [chore] drop legacy transformers upgrade pin for glm47-flash and qwen35 (radixark#1018) * [fix] Enforce param dtype before wrap ddp (radixark#992) Co-authored-by: Zhichen Zeng <zczeng@uw.edu> * [upgrade] update Megatron-Bridge source and LoRA CI to megatron e2e tests and (radixark#1023) * [CI] Drop --use-miles-router from R3 tests and add r3 comparasion test between sgl & miles router (radixark#1015) * wandb: raise init_timeout, add retry wrapper, fix shared-mode init for cross-region clusters In online + shared mode, both `init_wandb_primary` and `init_wandb_secondary` make HTTPS round-trips to wandb cloud (login + run create/attach). On high-latency cross-region clusters (e.g. Abu Dhabi MBZUAI ↔ wandb-cloud US-West) with concurrent actor bursts, a single round-trip can exceed the wandb SDK's 90s default `init_timeout` — tearing down the whole run with a silent handshake abort. Observed on RL360 job 1564420, which forced `WANDB_MODE=offline` as a global default ever since (see https://github.com/LLM360/RL360/issues/87). The issue's original diagnosis assumed a local primary↔secondary socket handshake race. That's not how shared mode works — per wandb's own feature PR (wandb/wandb#6882), each writer spawns an independent wandb-core that talks to the cloud directly; aggregation is server-side by run_id. No local socket exists. The failure mode is pure network/latency, not a local readiness race. Changes ------- - Bump `init_timeout` to 300s for primary and secondary Settings. Configurable via `WANDB_INIT_TIMEOUT_SECS` env var for tuning. - Wrap both init paths in a bounded exponential-backoff retry (`_wandb_init_with_retry`) that re-attempts on wandb.errors.CommError and wandb.errors.UsageError. 3 attempts with 5→10→20s backoff by default, tunable via `WANDB_INIT_RETRY_ATTEMPTS` / `WANDB_INIT_RETRY_BACKOFF_SECS`. - Add `x_label` tagging per wandb distributed-training docs: primary gets `rank_<rank>_primary`, secondaries get `rank_<rank>_secondary`. Enables per-rank console-log filtering in the wandb UI. - Drop `reinit=True` from secondary init_kwargs. Shared mode natively supports concurrent writers on a single run; `reinit=True` triggered stale-state warnings on secondary actors without functional benefit. Followups this change enables ----------------------------- - `WANDB_MODE=offline` can be removed from scale.yaml's extra_env default once a pilot run confirms online mode boots cleanly. - The tmux-based `~/bin/wandb-sync-rl360.sh` workaround on David's M2 account becomes obsolete (no more offline-only default). - Near-realtime wandb dashboards replace the ~2-minute-lag offline sync; per-rank system metrics via x_label filtering. --------- Co-authored-by: JD <jaedon.guo@gmail.com> Co-authored-by: Ethan (Yusheng) Su <yushengsu.thu@gmail.com> Co-authored-by: fzyzcjy <5236035+fzyzcjy@users.noreply.github.com> Co-authored-by: Ziang Li <ziangli@umich.edu> Co-authored-by: Zhichen Zeng <zczeng@uw.edu> Co-authored-by: JensenFire <xinji1@microsoft.com> Co-authored-by: Yueming Yuan <yym022502@gmail.com> Co-authored-by: maocheng23 <35615230+maocheng23@users.noreply.github.com> Co-authored-by: Douglas Yang <douglasyang88@gmail.com> Co-authored-by: Yueming Yuan <yueming@Mac.attlocal.net> Co-authored-by: Huapeng Zhou <73010314+PopSoda2002@users.noreply.github.com> Co-authored-by: GuanxingLu <gxlu02@gmail.com> Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com> Co-authored-by: Shi-Dong <Shi-Dong@users.noreply.github.com> Co-authored-by: Shi Dong <shi.dong@radixark.ai> Co-authored-by: Jiajun Li <48857426+guapisolo@users.noreply.github.com> Co-authored-by: guapisolo <guapisolo@gmail.com> Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> Co-authored-by: Yuzhen Zhou <82826991+zyzshishui@users.noreply.github.com> Co-authored-by: Yanbin Jiang <jybsuper@gmail.com> Co-authored-by: Ying Sheng <sqy1415@gmail.com> Co-authored-by: Yisheng Gong <yishenggong9437@gmail.com>
Note: rollout_ft/0 was already merged into main as PR #897 and deleted from origin, so the cascade starts from main → rollout_ft/1. Conflict 1: miles/ray/rollout.py (modify/delete) - HEAD (rollout_ft/1) deleted the file as part of a mechanical split: commit d3fc26e "mechanically move" splits the original 1298-line miles/ray/rollout.py into the directory miles/ray/rollout/{addr_allocator, metrics,observability,rollout_manager,rollout_server,router_manager, server_group}.py. - origin/main modified the file (4 commits since merge-base): a772c33 zero_std all_zero_ratio/all_one_ratio metrics (#1034) 41615af weight staleness control for fully async rollout (#958) c198efa consistent hashing routing policy (#891) eaa36a2 heartbeat and id to session server (#866) Resolution: removed miles/ray/rollout.py (preserve mechanical split) and re-applied main's 7 hunks to the new directory files: - miles/ray/rollout/router_manager.py: + import uuid + router_args.policy = args.sglang_router_policy (in start_router) + args.session_server_instance_id = uuid.uuid4().hex (in start_session_server) - miles/ray/rollout/train_data_conversion.py: + train_data["weight_versions"] population (after multimodal block) + "weight_versions" added to per-DP-split key whitelist - miles/ray/rollout/metrics.py: + oldest_weight_version statistics + mixed_version_ratio in _compute_metrics_from_samples + zero_std/all_zero_percentage and all_one_percentage in _compute_zero_std_metrics Verified: all dependent symbols (Sample.weight_versions, Sample.oldest_weight_version, args.sglang_router_policy, args.session_server_instance_id) are already present in the merged tree from cleanly-merged peer files (miles/utils/types.py, miles/backends/sglang_utils/arguments.py, etc.). Verified: merge_diff_check.py shows the only "lost" deviations are PR #897 skill files (already in main) — no real loss.
Summary
Sampleand rollout metrics, enabling tracking of which model version generated each trajectory--max-weight-stalenessargument to discard rollout groups whose oldest weight version is too far behind the current engine version, recycling them back to the data buffer/model_infoquery for engine weight version, group reset logic for retries, and staleness stats logging at rollout completionE2E Validation
Both runs completed 30 training steps on Qwen3-4B (4x H200, dapo-math-17k):
qwen3-4B-baseline-no-staleness--max-weight-staleness 2: wandb groupqwen3-4B-with-staleness-2miles-staleness-testTest plan
--max-weight-staleness(baseline)--max-weight-staleness 2Staleness filter enabled: max_weight_staleness=2)🤖 Generated with Claude Code