Skip to content

Add weight staleness control for fully async rollout#958

Merged
maocheng23 merged 8 commits intomainfrom
feat/staleness_control
Apr 9, 2026
Merged

Add weight staleness control for fully async rollout#958
maocheng23 merged 8 commits intomainfrom
feat/staleness_control

Conversation

@maocheng23
Copy link
Copy Markdown
Contributor

Summary

  • Surface per-trajectory weight versions from SGLang into Sample and rollout metrics, enabling tracking of which model version generated each trajectory
  • Add --max-weight-staleness argument to discard rollout groups whose oldest weight version is too far behind the current engine version, recycling them back to the data buffer
  • Includes cached /model_info query for engine weight version, group reset logic for retries, and staleness stats logging at rollout completion

E2E Validation

Both runs completed 30 training steps on Qwen3-4B (4x H200, dapo-math-17k):

  • Baseline (no staleness): wandb group qwen3-4B-baseline-no-staleness
  • With --max-weight-staleness 2: wandb group qwen3-4B-with-staleness-2
  • Wandb project: miles-staleness-test

Test plan

  • E2E fully async training without --max-weight-staleness (baseline)
  • E2E fully async training with --max-weight-staleness 2
  • Verified staleness filter activates (Staleness filter enabled: max_weight_staleness=2)
  • Both runs complete 30 steps with valid grad_norm and metrics

🤖 Generated with Claude Code

…trics

Expose the existing Sample.weight_versions (one entry per generation call /
turn) to the training pipeline and logging, enabling downstream consumers
to reason about on-policy freshness.

- Add Sample.oldest_weight_version property (min version across turns)
- Forward weight_versions in _convert_samples_to_train_data and DP split
- Log weight_version/{min,max,mean,std} and mixed_version_ratio in
  compute_metrics_from_samples

Made-with: Cursor
In fully async mode, groups may complete long after their generation
started, using weights that are now several versions behind. This adds
a configurable staleness filter that recycles such groups back to the
data buffer instead of sending them to training.

- Add --max-weight-staleness CLI arg (None = disabled by default)
- group_oldest_weight_version helper for group-level staleness
- _CachedWeightVersion: throttled async query to /model_info endpoint
- Staleness filter in generate_rollout_async collection loop
- End-of-rollout staleness summary (recycled count, avg/max staleness)

Made-with: Cursor
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a weight staleness filter for fully asynchronous rollouts to ensure training data remains synchronized with the current model version. Key additions include a throttled engine version query mechanism, a sample recycling process for stale or aborted groups, and enhanced metrics logging for weight version statistics. The review feedback highlights the need to reset additional sample state fields during recycling, recommends optimizing HTTP session management to avoid resource exhaustion, and suggests more robust parsing of weight version values to prevent potential runtime errors.

Comment on lines +23 to +39
def reset_group_for_retry(group: list[Sample]) -> list[Sample]:
"""Reset generated outputs so the original prompts can be sampled again."""
for sample in group:
sample.tokens = []
sample.multimodal_train_inputs = None
sample.response = ""
sample.response_length = 0
sample.reward = None
sample.loss_mask = None
sample.weight_versions = []
sample.rollout_log_probs = None
sample.rollout_routed_experts = None
sample.status = Sample.Status.ABORTED
sample.non_generation_time = 0.0
sample.spec_info = Sample.SpecInfo()
sample.prefix_cache_info = Sample.PrefixCacheInfo()
return group
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The reset_group_for_retry function should also reset the remove_sample and train_metadata fields. If remove_sample is not reset to False, a sample that was previously marked for removal by a filter will remain excluded from training even after a successful retry. Similarly, train_metadata should be cleared to avoid carrying over stale metadata from a previous failed or stale generation attempt.

Suggested change
def reset_group_for_retry(group: list[Sample]) -> list[Sample]:
"""Reset generated outputs so the original prompts can be sampled again."""
for sample in group:
sample.tokens = []
sample.multimodal_train_inputs = None
sample.response = ""
sample.response_length = 0
sample.reward = None
sample.loss_mask = None
sample.weight_versions = []
sample.rollout_log_probs = None
sample.rollout_routed_experts = None
sample.status = Sample.Status.ABORTED
sample.non_generation_time = 0.0
sample.spec_info = Sample.SpecInfo()
sample.prefix_cache_info = Sample.PrefixCacheInfo()
return group
def reset_group_for_retry(group: list[Sample]) -> list[Sample]:
"""Reset generated outputs so the original prompts can be sampled again."""
for sample in group:
sample.tokens = []
sample.multimodal_train_inputs = None
sample.response = ""
sample.response_length = 0
sample.reward = None
sample.loss_mask = None
sample.weight_versions = []
sample.rollout_log_probs = None
sample.rollout_routed_experts = None
sample.status = Sample.Status.ABORTED
sample.non_generation_time = 0.0
sample.spec_info = Sample.SpecInfo()
sample.prefix_cache_info = Sample.PrefixCacheInfo()
sample.remove_sample = False
sample.train_metadata = None
return group

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This suggestion sounds reasonable ⬆️

Comment on lines +56 to +57
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=2)) as resp:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Creating a new aiohttp.ClientSession for every request is an anti-pattern that can lead to socket exhaustion and unnecessary overhead. It is highly recommended to reuse a single session instance. Since this script already depends on miles, you should consider using the shared HTTP utilities from miles.utils.http_utils which are already used in other parts of the codebase (e.g., sglang_rollout.py) and handle session management efficiently. Additionally, ensure that the session lifecycle is managed using a with statement or try...finally block to prevent resource leaks.

References
  1. To prevent resource leaks (e.g., counters that are not decremented), use constructs like try...finally or a with statement to ensure cleanup logic is always executed, even in the case of exceptions or early returns.

Comment thread miles/utils/types.py
"""Minimum weight version across all turns (generation calls) for this trajectory."""
if not self.weight_versions:
return None
return min(int(v) for v in self.weight_versions)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The int(v) conversion is potentially fragile if the router returns an unexpected format or if weight_versions contains non-integer strings. Adding a check to ensure the value is a digit before conversion would make the property more robust and prevent the training process from crashing due to transient API issues.

Suggested change
return min(int(v) for v in self.weight_versions)
return min(int(v) for v in self.weight_versions if v is not None and str(v).isdigit())

Copy link
Copy Markdown
Collaborator

@fzyzcjy fzyzcjy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

general direction lgtm though not checked details

# don't count as processed for training
continue

# Staleness filter: discard groups whose oldest weight version is too far behind
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq is it correct tto remove whole group

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I would like to be more conversative, also this filter is the same with no_abort filter in the filter_hub, which would drop the entire group as well.

And I think we can add a better filter later, that e.g. if over 50% is aborted, we can drop the group, o.w. we can use partial group in GRPO, or copy some finsihed samples.

Comment thread miles/utils/arguments.py
),
)
parser.add_argument(
"--max-weight-staleness",
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq this is only for that specific example, shall we not put it in arguments.py, but put in example' arguments (like what we do for agentic rollout etc)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here I am not fully sure TBH. The --max-weight-staleness is a general async-training concept (not example-specific) and could be reused by other async rollout implementations.

And at the same time, do you think we should move the Staleness filter to a global side? e.g. filter_hub

Comment on lines +23 to +39
def reset_group_for_retry(group: list[Sample]) -> list[Sample]:
"""Reset generated outputs so the original prompts can be sampled again."""
for sample in group:
sample.tokens = []
sample.multimodal_train_inputs = None
sample.response = ""
sample.response_length = 0
sample.reward = None
sample.loss_mask = None
sample.weight_versions = []
sample.rollout_log_probs = None
sample.rollout_routed_experts = None
sample.status = Sample.Status.ABORTED
sample.non_generation_time = 0.0
sample.spec_info = Sample.SpecInfo()
sample.prefix_cache_info = Sample.PrefixCacheInfo()
return group
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This suggestion sounds reasonable ⬆️

try:
# add back to buffer so it can be retried or handled by buffer policy
group = reset_group_for_retry(group)
data_buffer.add_samples([group])
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm curious what data structure data_buffer is. It's slightly weird that group is already a list of Samples, yet add_samples adds a list of groups to data_buffer. Might be better to add a type annotation for data_buffer in the function declaration?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a little legacy naming:

If you look into the entire flow, data_buffer is the DataSource structure, and its a RolloutDataSourceWithBuffer, whose code pointer is here: https://github.com/radixark/miles/blob/feat/staleness_control/miles/rollout/data_source.py#L159

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. Then would it be better to add the type annotation data_buffer: DataSource?

if any_aborted:
try:
# add back to buffer so it can be retried or handled by buffer policy
group = reset_group_for_retry(group)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here if I understand correctly, if a group is aborted for any reason, the group is reset and sent for retry. I'm worried that this might give rise to a deadloop: suppose that the task of the group is corrupted (e.g. harbor cannot find the docker image of the task), then the group will fall into this abort -> retry -> abort loop forever.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think we should add a retry for the sample? But it would be a little complicated (as we don't have counter on every prompt/buffer)

Comment thread miles/utils/types.py
@property
def oldest_weight_version(self) -> int | None:
"""Minimum weight version across all turns (generation calls) for this trajectory."""
if not self.weight_versions:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curious how the weight_versions of a sample is updated. Is it not in this PR?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Contributor

@Shi-Dong Shi-Dong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM!

return min(versions) if versions else None


def reset_group_for_retry(group: list[Sample]) -> list[Sample]:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: could we put all sample self-modify behavior to types.py? Or clear it by an easier way. I am worried about maintance.

Comment thread miles/rollout/generate_utils/openai_endpoint_utils.py
Comment thread miles/rollout/generate_utils/openai_endpoint_utils.py
Copy link
Copy Markdown
Collaborator

@yueming-yuan yueming-yuan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: agree with @guapisolo's comment, may move group reset to main logic? other parts lgtm

@maocheng23 maocheng23 merged commit 41615af into main Apr 9, 2026
18 checks passed
@maocheng23 maocheng23 deleted the feat/staleness_control branch April 9, 2026 05:29
GuanxingLu pushed a commit to GuanxingLu/miles that referenced this pull request Apr 21, 2026
DavidBellamy added a commit to LLM360/miles that referenced this pull request Apr 21, 2026
…region clusters (#10)

* Revert "[BUGFIX] [P2PRDMA] Add rollout post-processing after P2PRDMA weight updates" (radixark#882)

* [Fix] fix ci (radixark#894)

* Avoid threading for ray getting object (radixark#886)

* Add explicit errors for unsupported Megatron profiles (radixark#887)

* Add nvfp4 quantizer files (radixark#907)

* Bump flash-linear-attention version to 0.4.2 (radixark#892)

* [BUGFIX] Invoke "post_process_quantization" by default after weight updating (radixark#890)

Co-authored-by: Yueming Yuan <yym022502@gmail.com>

* Add heartbeat and id to session server (radixark#866)

* fix: adding thin glm5 image to docker build + latest tag sync (radixark#871)

* Add consistent hashing routing policy for rollout (radixark#891)

Co-authored-by: Yueming Yuan <yueming@Mac.attlocal.net>

* [example] add retool v2 example with multi-turn framework interfaces (radixark#654)

Co-authored-by: GuanxingLu <gxlu02@gmail.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Expose rollout-batch-size, n-samples-per-prompt, global-batch-size as CLI args in swe-agent-v2 (radixark#954)

Co-authored-by: Shi Dong <shi.dong@radixark.ai>

* chore: remove obsolete swe-agent server.py and run-qwen3.sh (radixark#952)

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* Add weight staleness control for fully async rollout (radixark#958)

* Fix/pause generation mode (radixark#924)

Co-authored-by: Yueming Yuan <yym022502@gmail.com>

* [v0.5.10][1] Bump sglang to v0.5.10 (radixark#898)

* [v0.5.10][2] Fix apply_chat_template behavior for transformers >=5.0 (radixark#926)

Co-authored-by: guapisolo <guapisolo@gmail.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* [v0.5.10][3] Fix processor return_tensors duplicate kwarg for transformers >=5.0 (radixark#927)

Co-authored-by: guapisolo <guapisolo@gmail.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* [v0.5.10][4] Fix _no_split_modules set not subscriptable in transformers >=5.0 (radixark#931)

* [v0.5.10][5] Disable piecewise cuda graph to avoid NVLS oom (radixark#935)

* [v0.5.10][6][FSDP] fix outdated weight update logic in FSDP (radixark#948)

Co-authored-by: guapisolo <guapisolo@gmail.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-authored-by: maocheng23 <35615230+maocheng23@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>

* [v0.5.10][7][FSDP] move FSDP to experimental and disable by default (radixark#961)

* Add skiplist and more robust calculation on val (radixark#965)

* [fix] tiny fix debug rollout only in weight version check (radixark#967)

* feat: real cp support with relayout fix for qwen3.5 train/rollout mismatch (radixark#885)

* [AMD] Upgrade to sglv0.5.10 (radixark#973)

* switch model to actor (radixark#756)

* [fix] support general logic to bypass fp32 downcast and fix qwen35 A_log dtype (radixark#975)

Co-authored-by: yueming-yuan <yym022502@gmail.com>

* fix: populate prefix_cache_info in OpenAI/session rollout path (radixark#960)

* Remove prepare_harbor_tasks.py; use harbor-private adapters (radixark#982)

* [fix] Skip flush_cache in in_place mode and add fully async example (radixark#974)

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* GLM47 full cmd for async and sync reasoning (radixark#986)

Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

* fix: handle non-tool appended messages in TITO incremental tokenization (radixark#949)

Co-authored-by: Yanbin Jiang <jybsuper@gmail.com>

* [docker] Add sgl-model-gateway install and download .tar.gz assets (radixark#895)

* [ci] fix hf rate limit error by caching tokenizer loading (radixark#1014)

Co-authored-by: maocheng23 <35615230+maocheng23@users.noreply.github.com>

* Use load_generate_function in legacy sglang_rollout path (radixark#1016)

* Update CODEOWNERS to add new reviewers (radixark#1021)

* Support moe lora for gpt-oss (radixark#798)

Co-authored-by: Ethan (Yusheng) Su <yushengsu.thu@gmail.com>

* [fix] restore expert_bias to fp32 before bridge weight export (radixark#811)

* [chore] drop legacy transformers upgrade pin for glm47-flash and qwen35 (radixark#1018)

* [fix] Enforce param dtype before wrap ddp (radixark#992)

Co-authored-by: Zhichen Zeng <zczeng@uw.edu>

* [upgrade] update Megatron-Bridge source and LoRA CI to megatron e2e tests and  (radixark#1023)

* [CI] Drop --use-miles-router from R3 tests and add r3 comparasion test between sgl & miles router (radixark#1015)

* wandb: raise init_timeout, add retry wrapper, fix shared-mode init for cross-region clusters

In online + shared mode, both `init_wandb_primary` and `init_wandb_secondary`
make HTTPS round-trips to wandb cloud (login + run create/attach). On
high-latency cross-region clusters (e.g. Abu Dhabi MBZUAI ↔ wandb-cloud
US-West) with concurrent actor bursts, a single round-trip can exceed the
wandb SDK's 90s default `init_timeout` — tearing down the whole run
with a silent handshake abort. Observed on RL360 job 1564420, which
forced `WANDB_MODE=offline` as a global default ever since (see
https://github.com/LLM360/RL360/issues/87).

The issue's original diagnosis assumed a local primary↔secondary socket
handshake race. That's not how shared mode works — per wandb's own
feature PR (wandb/wandb#6882), each writer spawns
an independent wandb-core that talks to the cloud directly; aggregation
is server-side by run_id. No local socket exists. The failure mode is
pure network/latency, not a local readiness race.

Changes
-------

- Bump `init_timeout` to 300s for primary and secondary Settings.
  Configurable via `WANDB_INIT_TIMEOUT_SECS` env var for tuning.
- Wrap both init paths in a bounded exponential-backoff retry
  (`_wandb_init_with_retry`) that re-attempts on wandb.errors.CommError
  and wandb.errors.UsageError. 3 attempts with 5→10→20s backoff by
  default, tunable via `WANDB_INIT_RETRY_ATTEMPTS` /
  `WANDB_INIT_RETRY_BACKOFF_SECS`.
- Add `x_label` tagging per wandb distributed-training docs: primary
  gets `rank_<rank>_primary`, secondaries get `rank_<rank>_secondary`.
  Enables per-rank console-log filtering in the wandb UI.
- Drop `reinit=True` from secondary init_kwargs. Shared mode natively
  supports concurrent writers on a single run; `reinit=True` triggered
  stale-state warnings on secondary actors without functional benefit.

Followups this change enables
-----------------------------

- `WANDB_MODE=offline` can be removed from scale.yaml's extra_env
  default once a pilot run confirms online mode boots cleanly.
- The tmux-based `~/bin/wandb-sync-rl360.sh` workaround on David's M2
  account becomes obsolete (no more offline-only default).
- Near-realtime wandb dashboards replace the ~2-minute-lag offline
  sync; per-rank system metrics via x_label filtering.

---------

Co-authored-by: JD <jaedon.guo@gmail.com>
Co-authored-by: Ethan (Yusheng) Su <yushengsu.thu@gmail.com>
Co-authored-by: fzyzcjy <5236035+fzyzcjy@users.noreply.github.com>
Co-authored-by: Ziang Li <ziangli@umich.edu>
Co-authored-by: Zhichen Zeng <zczeng@uw.edu>
Co-authored-by: JensenFire <xinji1@microsoft.com>
Co-authored-by: Yueming Yuan <yym022502@gmail.com>
Co-authored-by: maocheng23 <35615230+maocheng23@users.noreply.github.com>
Co-authored-by: Douglas Yang <douglasyang88@gmail.com>
Co-authored-by: Yueming Yuan <yueming@Mac.attlocal.net>
Co-authored-by: Huapeng Zhou <73010314+PopSoda2002@users.noreply.github.com>
Co-authored-by: GuanxingLu <gxlu02@gmail.com>
Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-authored-by: Shi-Dong <Shi-Dong@users.noreply.github.com>
Co-authored-by: Shi Dong <shi.dong@radixark.ai>
Co-authored-by: Jiajun Li <48857426+guapisolo@users.noreply.github.com>
Co-authored-by: guapisolo <guapisolo@gmail.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: Yuzhen Zhou <82826991+zyzshishui@users.noreply.github.com>
Co-authored-by: Yanbin Jiang <jybsuper@gmail.com>
Co-authored-by: Ying Sheng <sqy1415@gmail.com>
Co-authored-by: Yisheng Gong <yishenggong9437@gmail.com>
fzyzcjy added a commit that referenced this pull request May 5, 2026
Note: rollout_ft/0 was already merged into main as PR #897 and deleted
from origin, so the cascade starts from main → rollout_ft/1.

Conflict 1: miles/ray/rollout.py (modify/delete)
  - HEAD (rollout_ft/1) deleted the file as part of a mechanical split:
    commit d3fc26e "mechanically move" splits the original 1298-line
    miles/ray/rollout.py into the directory miles/ray/rollout/{addr_allocator,
    metrics,observability,rollout_manager,rollout_server,router_manager,
    server_group}.py.
  - origin/main modified the file (4 commits since merge-base):
    a772c33 zero_std all_zero_ratio/all_one_ratio metrics (#1034)
    41615af weight staleness control for fully async rollout (#958)
    c198efa consistent hashing routing policy (#891)
    eaa36a2 heartbeat and id to session server (#866)

  Resolution: removed miles/ray/rollout.py (preserve mechanical split)
  and re-applied main's 7 hunks to the new directory files:

  - miles/ray/rollout/router_manager.py:
      + import uuid
      + router_args.policy = args.sglang_router_policy (in start_router)
      + args.session_server_instance_id = uuid.uuid4().hex (in
        start_session_server)
  - miles/ray/rollout/train_data_conversion.py:
      + train_data["weight_versions"] population (after multimodal block)
      + "weight_versions" added to per-DP-split key whitelist
  - miles/ray/rollout/metrics.py:
      + oldest_weight_version statistics + mixed_version_ratio in
        _compute_metrics_from_samples
      + zero_std/all_zero_percentage and all_one_percentage in
        _compute_zero_std_metrics

  Verified: all dependent symbols (Sample.weight_versions,
  Sample.oldest_weight_version, args.sglang_router_policy,
  args.session_server_instance_id) are already present in the merged
  tree from cleanly-merged peer files (miles/utils/types.py,
  miles/backends/sglang_utils/arguments.py, etc.).

  Verified: merge_diff_check.py shows the only "lost" deviations are
  PR #897 skill files (already in main) — no real loss.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants