Bridging perf from NeMo2 to Mbridge for certain configs#2199
Conversation
Signed-off-by: Gautham Kollu <gkollu@nvidia.com>
Signed-off-by: Gautham Kollu <gkollu@nvidia.com>
📝 WalkthroughWalkthroughThe changes introduce P2P communication support for training, add multi-model chunk data iterator handling, refactor logging and loss scaling logic, simplify data loader initialization, and add early-exit guards to fault-tolerance and profiling modules. A new numeric checks configuration option is also added. Changes
Sequence Diagram(s)sequenceDiagram
participant Train as Training Loop
participant P2P as P2PCommunicator
participant FB as forward_backward_func
participant MI as Model Iterators
participant Logger as Logger
Train->>P2P: Create P2PCommunicator(pp_group, config)
Train->>Train: Prepare forward_backward_data_iterator
alt Multi-model setup
Train->>MI: make_data_iterator_list(iterator)
MI->>Train: List of iterators (one per chunk)
end
Train->>FB: Call with p2p_communicator & data iterators
FB->>P2P: Use P2P for peer-to-peer communication
FB->>Train: Return loss & metrics
alt Logging enabled
Train->>Logger: Extract learning_rate from optimizer
Train->>Logger: Log loss_scale, learning_rate, params_norm
Logger->>Train: Logging complete
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~50 minutes 🚥 Pre-merge checks | ✅ 2 | ❌ 2❌ Failed checks (1 warning, 1 inconclusive)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Important Action Needed: IP Allowlist UpdateIf your organization protects your Git platform with IP whitelisting, please add the new CodeRabbit IP address to your allowlist:
Reviews will stop working after February 8, 2026 if the new IP is not added to your allowlist. Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
/ok to test d694715 |
There was a problem hiding this comment.
Actionable comments posted: 5
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
src/megatron/bridge/data/loaders.py (1)
360-382:⚠️ Potential issue | 🟡 MinorUpdate
setup_data_iteratorsdocstring to match single-pass behavior.The function no longer builds per-virtual-stage iterators, but the docstring still says it does. Please align the docstring (and return description) with the new behavior to avoid confusion.
📝 Suggested docstring update
- Calls `build_train_valid_test_data_iterators` potentially multiple times - if virtual pipeline parallelism is used, creating separate iterators for each - virtual stage. + Builds train/valid/test iterators in a single call and returns them directly. + Virtual pipeline parallelism is handled downstream by iterator wrapping. @@ - Each element can be a single iterator or a list of iterators if virtual - pipeline parallelism is enabled. + Each element is a single iterator; downstream code may wrap it for virtual + pipeline parallelism when needed.src/megatron/bridge/training/train.py (1)
713-721:⚠️ Potential issue | 🟠 MajorSkip
make_data_iterator_listwhen the iterator is already a list.External dataloaders may already provide a list of iterators for VPP. Wrapping a list will cause
make_data_iterator_listto callnext()on a list and fail.🐛 Suggested fix
- if len(model) > 1: + if len(model) > 1 and not isinstance(forward_backward_data_iterator, list): # As MLM, expects a list of iterators for virtual pipeline parallelism. One iterator per model chunk. forward_backward_data_iterator = make_data_iterator_list( model=model, data_iterator=forward_backward_data_iterator, )
🤖 Fix all issues with AI agents
In `@src/megatron/bridge/training/eval.py`:
- Around line 137-150: The guard around make_data_iterator_list should avoid
re-wrapping pre-built lists of iterators; change the branch that currently does
make_data_iterator_list(model=model, data_iterator=eval_data_iterator) to only
call make_data_iterator_list when eval_data_iterator is not already a sequence
of iterators (e.g., check isinstance(eval_data_iterator, (list, tuple)) or
similar) so that prepare_finetuning_batch’s returned eval_data_iterator is left
untouched when it is a list/tuple; reference prepare_finetuning_batch,
eval_data_iterator, make_data_iterator_list, and model when applying this
conditional check.
In `@src/megatron/bridge/training/train.py`:
- Line 221: Remove the commented-out debug toggles to eliminate dead code:
delete the lines containing the commented variable name
should_toggle_forward_pre_hook (and any duplicate/commented occurrences around
the other reported location) unless you intentionally want to keep them; if so,
replace the bare commented lines with a short rationale comment explaining
when/how to enable this debug toggle and why it is preserved (e.g., "kept for
temporary debugging of forward pre-hook behavior; remove before merge"). Ensure
you update any nearby comments in the same function or module (train.py) to
reflect the change.
- Around line 1166-1167: The early return guarded by "if not
state.cfg.checkpoint.save: return False" prevents the exit-signal, duration,
interval, and straggler checks from running; instead of returning immediately
when state.cfg.checkpoint.save is false, skip only the checkpoint save action
and still run the existing exit-condition checks (exit-signal, duration,
interval, straggler) before returning; locate the block that references
state.cfg.checkpoint.save in train.py and refactor so checkpoint saving is
conditional but the exit-checking logic (the functions/conditions that evaluate
exit signals, duration, interval and stragglers) always executes regardless of
checkpoint.save.
- Around line 770-778: The check for update_successful must always be
synchronized across model-parallel ranks: move the call to
logical_and_across_model_parallel_group(update_successful,
mp_group=pg_collection.mp) out of the train_config.numeric_checks conditional so
update_successful is reduced unconditionally; leave the grad_norm and
num_zeros_in_grad reductions (reduce_max_stat_across_model_parallel_group)
inside the if train_config.numeric_checks block so those stats are only gathered
when numeric checks are enabled.
- Around line 470-506: The current outer check uses
config.logger.tensorboard_dir to gate all logging, which unintentionally
disables WandB/MLflow/console logging; remove or loosen that gate so
training_log(...) is invoked regardless of tensorboard_dir (or replace the
condition with a check that any sink is enabled, e.g.,
config.logger.use_tensorboard or config.logger.use_wandb or
config.logger.use_mlflow), leaving the inner logic that computes loss_scale,
params_norm, learning_rate, etc., intact; keep the call to training_log(...) so
it can decide which sinks actually log.
| eval_data_iterator, seq_length = prepare_finetuning_batch( | ||
| data_iterator=data_iterator, | ||
| num_microbatches=eval_num_microbatches, | ||
| default_seq_length=state.cfg.model.seq_length, | ||
| seq_key="tokens", | ||
| ) | ||
|
|
||
| if len(model) > 1: | ||
| # Convert to list of iterators for virtual pipeline parallelism | ||
| # With virtual PP, each model chunk needs independent access to the same microbatch | ||
| eval_data_iterator = make_data_iterator_list( | ||
| model=model, | ||
| data_iterator=eval_microbatch_iterator, | ||
| data_iterator=eval_data_iterator, | ||
| ) |
There was a problem hiding this comment.
Avoid wrapping pre-built iterator lists in make_data_iterator_list.
When data_iterator is already a list (external dataloaders or pre-wrapped VPP), make_data_iterator_list will treat the list as an iterator and next() will fail. Guard the wrapping to avoid a TypeError.
🐛 Suggested fix
- if len(model) > 1:
+ if len(model) > 1 and not isinstance(eval_data_iterator, list):
# Convert to list of iterators for virtual pipeline parallelism
# With virtual PP, each model chunk needs independent access to the same microbatch
eval_data_iterator = make_data_iterator_list(
model=model,
data_iterator=eval_data_iterator,
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| eval_data_iterator, seq_length = prepare_finetuning_batch( | |
| data_iterator=data_iterator, | |
| num_microbatches=eval_num_microbatches, | |
| default_seq_length=state.cfg.model.seq_length, | |
| seq_key="tokens", | |
| ) | |
| if len(model) > 1: | |
| # Convert to list of iterators for virtual pipeline parallelism | |
| # With virtual PP, each model chunk needs independent access to the same microbatch | |
| eval_data_iterator = make_data_iterator_list( | |
| model=model, | |
| data_iterator=eval_microbatch_iterator, | |
| data_iterator=eval_data_iterator, | |
| ) | |
| eval_data_iterator, seq_length = prepare_finetuning_batch( | |
| data_iterator=data_iterator, | |
| num_microbatches=eval_num_microbatches, | |
| default_seq_length=state.cfg.model.seq_length, | |
| seq_key="tokens", | |
| ) | |
| if len(model) > 1 and not isinstance(eval_data_iterator, list): | |
| # Convert to list of iterators for virtual pipeline parallelism | |
| # With virtual PP, each model chunk needs independent access to the same microbatch | |
| eval_data_iterator = make_data_iterator_list( | |
| model=model, | |
| data_iterator=eval_data_iterator, | |
| ) |
🤖 Prompt for AI Agents
In `@src/megatron/bridge/training/eval.py` around lines 137 - 150, The guard
around make_data_iterator_list should avoid re-wrapping pre-built lists of
iterators; change the branch that currently does
make_data_iterator_list(model=model, data_iterator=eval_data_iterator) to only
call make_data_iterator_list when eval_data_iterator is not already a sequence
of iterators (e.g., check isinstance(eval_data_iterator, (list, tuple)) or
similar) so that prepare_finetuning_batch’s returned eval_data_iterator is left
untouched when it is a list/tuple; reference prepare_finetuning_batch,
eval_data_iterator, make_data_iterator_list, and model when applying this
conditional check.
| config.optimizer.use_distributed_optimizer, | ||
| config.ddp.overlap_param_gather, | ||
| ) | ||
| #should_toggle_forward_pre_hook = False |
There was a problem hiding this comment.
Remove commented-out debug toggles.
These commented lines add noise and violate the no-dead-code rule. Either remove them or add a rationale explaining why they must remain commented out.
🧹 Suggested cleanup
- `#should_toggle_forward_pre_hook` = False
@@
- `#if` config.train.rampup_batch_size is not None:As per coding guidelines: If code is commented out, include a comment describing its usage and why it is commented out; otherwise remove it as debug code before merging.
Also applies to: 309-309
🤖 Prompt for AI Agents
In `@src/megatron/bridge/training/train.py` at line 221, Remove the commented-out
debug toggles to eliminate dead code: delete the lines containing the commented
variable name should_toggle_forward_pre_hook (and any duplicate/commented
occurrences around the other reported location) unless you intentionally want to
keep them; if so, replace the bare commented lines with a short rationale
comment explaining when/how to enable this debug toggle and why it is preserved
(e.g., "kept for temporary debugging of forward pre-hook behavior; remove before
merge"). Ensure you update any nearby comments in the same function or module
(train.py) to reflect the change.
| if config.logger.tensorboard_dir is not None: # Skip logging as tensorboard logging is disabled. | ||
| if hasattr(optimizer, "is_stub_optimizer") and not optimizer.is_stub_optimizer: | ||
| loss_scale = optimizer.get_loss_scale().item() | ||
| else: | ||
| learning_rate = param_group["lr"] | ||
| report_memory_flag = training_log( | ||
| loss_dict, | ||
| total_loss_dict, | ||
| learning_rate, | ||
| decoupled_learning_rate, | ||
| loss_scale, | ||
| report_memory_flag, | ||
| skipped_iter, | ||
| grad_norm, | ||
| params_norm, | ||
| num_zeros_in_grad, | ||
| config, | ||
| global_state, | ||
| history_wct, | ||
| model, | ||
| log_max_attention_logit, | ||
| ) | ||
|
|
||
| loss_scale = 1.0 | ||
| params_norm = None | ||
|
|
||
| if config.logger.log_params_norm: | ||
| params_norm = calc_params_l2_norm(model, model_config, use_megatron_fsdp=config.dist.use_megatron_fsdp) | ||
|
|
||
| learning_rate = None | ||
| decoupled_learning_rate = None | ||
| for param_group in optimizer.param_groups: | ||
| if len(param_group) == 0: | ||
| continue | ||
| if param_group["is_decoupled_lr"]: | ||
| decoupled_learning_rate = param_group["lr"] | ||
| else: | ||
| learning_rate = param_group["lr"] | ||
|
|
||
| report_memory_flag = training_log( | ||
| loss_dict, | ||
| total_loss_dict, | ||
| learning_rate, | ||
| decoupled_learning_rate, | ||
| loss_scale, | ||
| report_memory_flag, | ||
| skipped_iter, | ||
| grad_norm, | ||
| params_norm, | ||
| num_zeros_in_grad, | ||
| config, | ||
| global_state, | ||
| history_wct, | ||
| model, | ||
| log_max_attention_logit, | ||
| ) |
There was a problem hiding this comment.
Don’t gate all logging on tensorboard_dir.
This disables WandB/MLflow and console logging when tensorboard_dir is unset (default), which is a behavior regression. Gate on any enabled sink (or keep the call and let training_log decide).
🔧 Suggested fix
- if config.logger.tensorboard_dir is not None: # Skip logging as tensorboard logging is disabled.
+ logging_enabled = (
+ config.logger.tensorboard_dir is not None
+ or global_state.wandb_logger is not None
+ or global_state.mlflow_logger is not None
+ or config.logger.log_progress
+ )
+ if logging_enabled:
if hasattr(optimizer, "is_stub_optimizer") and not optimizer.is_stub_optimizer:
loss_scale = optimizer.get_loss_scale().item()
else:
loss_scale = 1.0📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if config.logger.tensorboard_dir is not None: # Skip logging as tensorboard logging is disabled. | |
| if hasattr(optimizer, "is_stub_optimizer") and not optimizer.is_stub_optimizer: | |
| loss_scale = optimizer.get_loss_scale().item() | |
| else: | |
| learning_rate = param_group["lr"] | |
| report_memory_flag = training_log( | |
| loss_dict, | |
| total_loss_dict, | |
| learning_rate, | |
| decoupled_learning_rate, | |
| loss_scale, | |
| report_memory_flag, | |
| skipped_iter, | |
| grad_norm, | |
| params_norm, | |
| num_zeros_in_grad, | |
| config, | |
| global_state, | |
| history_wct, | |
| model, | |
| log_max_attention_logit, | |
| ) | |
| loss_scale = 1.0 | |
| params_norm = None | |
| if config.logger.log_params_norm: | |
| params_norm = calc_params_l2_norm(model, model_config, use_megatron_fsdp=config.dist.use_megatron_fsdp) | |
| learning_rate = None | |
| decoupled_learning_rate = None | |
| for param_group in optimizer.param_groups: | |
| if len(param_group) == 0: | |
| continue | |
| if param_group["is_decoupled_lr"]: | |
| decoupled_learning_rate = param_group["lr"] | |
| else: | |
| learning_rate = param_group["lr"] | |
| report_memory_flag = training_log( | |
| loss_dict, | |
| total_loss_dict, | |
| learning_rate, | |
| decoupled_learning_rate, | |
| loss_scale, | |
| report_memory_flag, | |
| skipped_iter, | |
| grad_norm, | |
| params_norm, | |
| num_zeros_in_grad, | |
| config, | |
| global_state, | |
| history_wct, | |
| model, | |
| log_max_attention_logit, | |
| ) | |
| logging_enabled = ( | |
| config.logger.tensorboard_dir is not None | |
| or global_state.wandb_logger is not None | |
| or global_state.mlflow_logger is not None | |
| or config.logger.log_progress | |
| ) | |
| if logging_enabled: | |
| if hasattr(optimizer, "is_stub_optimizer") and not optimizer.is_stub_optimizer: | |
| loss_scale = optimizer.get_loss_scale().item() | |
| else: | |
| loss_scale = 1.0 | |
| params_norm = None | |
| if config.logger.log_params_norm: | |
| params_norm = calc_params_l2_norm(model, model_config, use_megatron_fsdp=config.dist.use_megatron_fsdp) | |
| learning_rate = None | |
| decoupled_learning_rate = None | |
| for param_group in optimizer.param_groups: | |
| if len(param_group) == 0: | |
| continue | |
| if param_group["is_decoupled_lr"]: | |
| decoupled_learning_rate = param_group["lr"] | |
| else: | |
| learning_rate = param_group["lr"] | |
| report_memory_flag = training_log( | |
| loss_dict, | |
| total_loss_dict, | |
| learning_rate, | |
| decoupled_learning_rate, | |
| loss_scale, | |
| report_memory_flag, | |
| skipped_iter, | |
| grad_norm, | |
| params_norm, | |
| num_zeros_in_grad, | |
| config, | |
| global_state, | |
| history_wct, | |
| model, | |
| log_max_attention_logit, | |
| ) |
🤖 Prompt for AI Agents
In `@src/megatron/bridge/training/train.py` around lines 470 - 506, The current
outer check uses config.logger.tensorboard_dir to gate all logging, which
unintentionally disables WandB/MLflow/console logging; remove or loosen that
gate so training_log(...) is invoked regardless of tensorboard_dir (or replace
the condition with a check that any sink is enabled, e.g.,
config.logger.use_tensorboard or config.logger.use_wandb or
config.logger.use_mlflow), leaving the inner logic that computes loss_scale,
params_norm, learning_rate, etc., intact; keep the call to training_log(...) so
it can decide which sinks actually log.
| if train_config.numeric_checks: | ||
| update_successful = logical_and_across_model_parallel_group(update_successful, mp_group=pg_collection.mp) | ||
| # grad_norm and num_zeros_in_grad will be None on ranks without trainable params, | ||
| # so we must gather across mp ranks | ||
| grad_norm = reduce_max_stat_across_model_parallel_group(grad_norm, mp_group=pg_collection.mp) | ||
| if optim_config.log_num_zeros_in_grad: | ||
| num_zeros_in_grad = reduce_max_stat_across_model_parallel_group( | ||
| num_zeros_in_grad, mp_group=pg_collection.mp | ||
| ) |
There was a problem hiding this comment.
Keep update_successful synchronized regardless of numeric_checks.
If numeric checks are disabled, ranks can diverge on update_successful, causing inconsistent scheduler stepping. You can still gate grad/zero-count reductions while always synchronizing update_successful.
✅ Suggested fix
- if train_config.numeric_checks:
- update_successful = logical_and_across_model_parallel_group(update_successful, mp_group=pg_collection.mp)
+ update_successful = logical_and_across_model_parallel_group(update_successful, mp_group=pg_collection.mp)
+ if train_config.numeric_checks:
# grad_norm and num_zeros_in_grad will be None on ranks without trainable params,
# so we must gather across mp ranks
grad_norm = reduce_max_stat_across_model_parallel_group(grad_norm, mp_group=pg_collection.mp)
if optim_config.log_num_zeros_in_grad:
num_zeros_in_grad = reduce_max_stat_across_model_parallel_group(
num_zeros_in_grad, mp_group=pg_collection.mp
)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| if train_config.numeric_checks: | |
| update_successful = logical_and_across_model_parallel_group(update_successful, mp_group=pg_collection.mp) | |
| # grad_norm and num_zeros_in_grad will be None on ranks without trainable params, | |
| # so we must gather across mp ranks | |
| grad_norm = reduce_max_stat_across_model_parallel_group(grad_norm, mp_group=pg_collection.mp) | |
| if optim_config.log_num_zeros_in_grad: | |
| num_zeros_in_grad = reduce_max_stat_across_model_parallel_group( | |
| num_zeros_in_grad, mp_group=pg_collection.mp | |
| ) | |
| update_successful = logical_and_across_model_parallel_group(update_successful, mp_group=pg_collection.mp) | |
| if train_config.numeric_checks: | |
| # grad_norm and num_zeros_in_grad will be None on ranks without trainable params, | |
| # so we must gather across mp ranks | |
| grad_norm = reduce_max_stat_across_model_parallel_group(grad_norm, mp_group=pg_collection.mp) | |
| if optim_config.log_num_zeros_in_grad: | |
| num_zeros_in_grad = reduce_max_stat_across_model_parallel_group( | |
| num_zeros_in_grad, mp_group=pg_collection.mp | |
| ) |
🤖 Prompt for AI Agents
In `@src/megatron/bridge/training/train.py` around lines 770 - 778, The check for
update_successful must always be synchronized across model-parallel ranks: move
the call to logical_and_across_model_parallel_group(update_successful,
mp_group=pg_collection.mp) out of the train_config.numeric_checks conditional so
update_successful is reduced unconditionally; leave the grad_norm and
num_zeros_in_grad reductions (reduce_max_stat_across_model_parallel_group)
inside the if train_config.numeric_checks block so those stats are only gathered
when numeric checks are enabled.
| if not state.cfg.checkpoint.save: | ||
| return False |
There was a problem hiding this comment.
Exit conditions should still apply without checkpoint saving.
The early return skips exit-signal, duration, interval, and straggler checks when checkpoint.save is unset. That can cause runs to ignore exit conditions entirely.
🛠️ Suggested fix
- if not state.cfg.checkpoint.save:
- return False
-🤖 Prompt for AI Agents
In `@src/megatron/bridge/training/train.py` around lines 1166 - 1167, The early
return guarded by "if not state.cfg.checkpoint.save: return False" prevents the
exit-signal, duration, interval, and straggler checks from running; instead of
returning immediately when state.cfg.checkpoint.save is false, skip only the
checkpoint save action and still run the existing exit-condition checks
(exit-signal, duration, interval, straggler) before returning; locate the block
that references state.cfg.checkpoint.save in train.py and refactor so checkpoint
saving is conditional but the exit-checking logic (the functions/conditions that
evaluate exit signals, duration, interval and stragglers) always executes
regardless of checkpoint.save.
|
/ok to test 349d5b1 |
|
/ok to test 1719976 |
| continue | ||
| if param_group["is_decoupled_lr"]: | ||
| decoupled_learning_rate = param_group["lr"] | ||
| if config.logger.tensorboard_dir is not None: # Skip logging as tensorboard logging is disabled. |
There was a problem hiding this comment.
with changes proposed in #2153, this will no longer be true. after 2153, users could log to tensorboard, W&B, and mlflow independently vs the current state where tensorboard is required to log to w&b. we can have some check up front whether logging will be enabled. this will be constant throughout training and can be done outside of the loop
| Returns: | ||
| NVTX context if nsys profiling was started at this step, None otherwise | ||
| """ | ||
| if config is None: |
There was a problem hiding this comment.
this is the first thing checked in should_profile_rank - why do we need to repeat this here?
There was a problem hiding this comment.
There was a problem hiding this comment.
out of curiosity, when is it an omegaconf during training? shouldn't it already be resolved to a dataclass?
There was a problem hiding this comment.
The fields in the class could be python raw List or an OmegaConfig.ListConfig
| check_weight_hash_across_dp_replicas_interval: Optional[int] = None | ||
| """Interval to check weight hashes are same across DP replicas. If not specified, weight hashes not checked.""" | ||
|
|
||
| numeric_checks: bool = True |
There was a problem hiding this comment.
the config field name is really broad, so it's impossible to tell exactly which checks this disables based on the name alone
Signed-off-by: Gautham Kollu <gkollu@nvidia.com>
|
/ok to test 65ea63c |
|
/ok to test 0f4b5a7 |
…2199) Signed-off-by: Gautham Kollu <gkollu@nvidia.com> Signed-off-by: gautham-kollu <gkollu@nvidia.com> Co-authored-by: Ananth Subramaniam <ansubramania@nvidia.com> Signed-off-by: sowmen <sowmendipta@gmail.com>


What does this PR do ?
Code changes
Changelog
GitHub Actions CI
See the CI sectionin the Contributing doc for how to trigger the CI. A Nvidia developer will need to approve and trigger the CI for external contributors.
Before your PR is "Ready for review"
Pre checks:
If you haven't finished some of the above items you can still open "Draft" PR.
Additional Information
Summary by CodeRabbit
Release Notes
New Features
numeric_checksconfiguration option for training validation.Improvements