Skip to content

[Dev] feat: Dynamic CP (part 2)#2000

Merged
yuzhongw-nvidia merged 5 commits into
NVIDIA:devfrom
xiaoyao0115:hybrid-cp
Apr 7, 2026
Merged

[Dev] feat: Dynamic CP (part 2)#2000
yuzhongw-nvidia merged 5 commits into
NVIDIA:devfrom
xiaoyao0115:hybrid-cp

Conversation

@xiaoyao0115

@xiaoyao0115 xiaoyao0115 commented Oct 28, 2025

Copy link
Copy Markdown
Contributor

This PR is the second part of hybrid-cp. The first part is: #2054
(PR for main branch: #2304

This PR provides end-to-end Dynamic CP support, including PP, VPP. (PR for main branch: #2304

Major changes

  1. Added default_dynamic_cp scheduler. Dynamic CP now only requires specifying --sequence-packing-scheduler default_dynamic_cp (auto-set when --dynamic-context-parallel is enabled). The previous standalone dynamic_context_parallel_forward_backward function and DynamicCPDataLoaderWrapper are removed — Dynamic CP now goes through the standard pipeline schedule, making it compatible with PP / VPP / TP out of the box.
  2. Added Mamba and MTP (Multi-Token Prediction) support. Both MambaMixer and the MTP module now dynamically switch the CP group per micro-batch via packed_seq_params.cp_group, and restore the original group after each forward pass.

Other changes

  • Added --min-dynamic-context-parallel-size argument to control the minimum CP group size (default 1).
  • Removed DynamicCPMegatronPretrainingSampler; Dynamic CP now uses the standard MegatronPretrainingSampler.
  • Removed get_batch_on_this_dynamic_cp_rank utility; all CP slicing goes through the THD packed path.
  • Moved scheduling algorithms (next_hdp_group, dcp_gpus_needed, dcp_make_buckets_equal, etc.) from the deleted dynamic_cp_schedule.py into data_schedule_utils.py as standalone functions.
  • broadcast_to_pp_group and create_data_iterator now propagate local_cp_size for PP / VPP stages.
  • Fixed TEDotProductAttention to lazily initialize cp_stream on dynamic CP path.
  • Fixed attention.py to restore pg_collection.cp after dynamic CP forward.
  • Properly clean up _DYNAMIC_DP_CP_GROUPS in destroy_model_parallel.
  • Added functional test gpt3_mcore_te_tp2_pp1_cp4_dcp and extended unit tests with DCP parameter combinations.

Convergence and performance

Convergence has been verified on Qwen3-30B-A3B on 32 GPUs, with max_seqlen set to 49152 and max_seqlen_per_dp_cp_rank set to 3072. In the figure below, bshd refers to running with CP=16, where sequences are padded to max_seqlen and executed in the same bshd format as in pretraining. thd-packing refers to using CP=16 while packing variable-length sequences. In dynamic-cp, the maximum CP group size is also 16.

image image

Known limitations

  1. Dynamic CP group sizes are limited to powers of 2.
  2. CUDA Graphs are not supported.
  3. Works best with FlashAttention; cuDNN FusedAttention recompiles on every shape change, negating performance gains.
  4. FSDP + PP is not yet supported.

Contribution process

flowchart LR
    A[Pre-checks] --> B[PR Tests]
    subgraph Code Review/Approval
        C1[Expert Review] --> C2[Final Review]
    end
    B --> C1
    C2 --> D[Merge]
Loading

Pre-checks

  • I want this PR in a versioned release and have added the appropriate Milestone (e.g., Core 0.8)
  • I have added relevant unit tests
  • I have added relevant functional tests
  • I have added proper typing to my code Typing guidelines
  • I have added relevant documentation
  • I have run the autoformatter.sh on my PR

Code review

The following process is enforced via the CODEOWNERS file for changes into megatron/core. For changes outside of megatron/core, it is up to the PR author whether or not to tag the Final Reviewer team.

For MRs into `main` branch

(Step 1): Add PR label Expert Review

(Step 2): Collect the expert reviewers reviews

  1. Attach the Expert Review label when your PR is ready for review.
  2. GitHub auto-assigns expert reviewers based on your changes. They will get notified and pick up your PR soon.

⚠️ Only proceed to the next step once all reviewers have approved, merge-conflict are resolved and the CI is passing.
Final Review might get declined if these requirements are not fulfilled.

(Step 3): Final Review

  1. Add Final Review label
  2. GitHub auto-assigns final reviewers based on your changes. They will get notified and pick up your PR soon.

(Optional Step 4): Cherry-pick into release branch

If this PR also needs to be merged into core_r* release branches, after this PR has been merged, select Cherry-pick to open a new PR into the release branch.

For MRs into `dev` branch The proposed review process for `dev` branch is under active discussion.

MRs are mergable after one approval by either eharper@nvidia.com or zijiey@nvidia.com.

Merging your PR

Any member of core-adlr and core-nemo will be able to merge your PR.

@xiaoyao0115 xiaoyao0115 requested review from a team as code owners October 28, 2025 08:57
@xiaoyao0115 xiaoyao0115 added the enhancement New feature or request label Oct 28, 2025
@copy-pr-bot

copy-pr-bot Bot commented Oct 28, 2025

Copy link
Copy Markdown

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@xiaoyao0115 xiaoyao0115 force-pushed the hybrid-cp branch 3 times, most recently from f33edcd to 48e91d2 Compare November 2, 2025 09:33
@yanring yanring added module: moe dev branch Dev branch related issues and development labels Nov 5, 2025
@yanring

yanring commented Nov 7, 2025

Copy link
Copy Markdown
Contributor

Is there any difference between this and #2054?

@kunlunl

kunlunl commented Nov 7, 2025

Copy link
Copy Markdown
Contributor

Is there any difference between this and #2054?

This is the second MR, we need to merge 2054 first, and then this 2000 (The reason the second MR is 2000, while the first one is 2054 (>2000), is because they were migrated from GitLab at different times)

@yanring

yanring commented Nov 10, 2025

Copy link
Copy Markdown
Contributor

Is there any difference between this and #2054?

This is the second MR, we need to merge 2054 first, and then this 2000 (The reason the second MR is 2000, while the first one is 2054 (>2000), is because they were migrated from GitLab at different times)

Got it, thanks! Could you please update the title to reflect this?

@xiaoyao0115 xiaoyao0115 changed the title [Dev] feat: hybrid-cp feature for dev branch (Author: Parth Kunlun Tailai) [Dev] feat: hybrid-cp feature for dev branch (part 2) Nov 11, 2025
@xiaoyao0115 xiaoyao0115 changed the title [Dev] feat: hybrid-cp feature for dev branch (part 2) [Dev] feat: hybrid-cp for dev branch (part 2) Nov 11, 2025
Comment thread pretrain_gpt.py Outdated
Comment thread pretrain_gpt.py Outdated
@kunlunl

kunlunl commented Dec 1, 2025

Copy link
Copy Markdown
Contributor

/ok to test e0c90c5

@xiaoyao0115

Copy link
Copy Markdown
Contributor Author

/ok to test b4c4fe6

Comment thread tests/test_utils/recipes/h100/gpt.yaml Outdated
@yuzhongw-nvidia

Copy link
Copy Markdown
Contributor

/ok to test b649d0b

@yuzhongw-nvidia yuzhongw-nvidia self-requested a review April 3, 2026 09:51
@yuzhongw-nvidia

Copy link
Copy Markdown
Contributor

/ok to test f632053

@yuzhongw-nvidia

Copy link
Copy Markdown
Contributor

/ok to test 4319109

@yuzhongw-nvidia

Copy link
Copy Markdown
Contributor

/ok to test ec6b5f4

@yuzhongw-nvidia

Copy link
Copy Markdown
Contributor

/ok to test 0cbcc94

@yuzhongw-nvidia

Copy link
Copy Markdown
Contributor

/ok to test 49f9e79

@xiaoyao0115

Copy link
Copy Markdown
Contributor Author

/ok to test 45b1232

xiaoyao0115 and others added 5 commits April 7, 2026 00:18
Signed-off-by: tailaim <tailaim@nvidia.com>
Signed-off-by: tailaim <tailaim@nvidia.com>
Signed-off-by: tailaim <tailaim@nvidia.com>
Co-authored-by: Yuzhong Wang <yuzhongw@nvidia.com>

Update model_config.yaml

Update model_config.yaml
@yuzhongw-nvidia

Copy link
Copy Markdown
Contributor

/ok to test aa49993

@svcnvidia-nemo-ci

Copy link
Copy Markdown

🔄 Merge queue validation started!

You can track the progress here: https://github.com/NVIDIA/Megatron-LM/actions/runs/24075960736

@Victarry Victarry left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the great work on Dynamic CP Part 2 — the design of routing dynamic CP through the standard PP/VPP schedule is clean and well-motivated.

Left a mix of comments across CRITICAL / IMPORTANT / SUGGESTION levels. The most actionable ones are:

  1. fill_empty negative-index wrap-around (line 921) — condition order bug in VPP alignment
  2. fill_empty_gpus assertion inverted (line 811) — allows overwriting non-empty GPU slots
  3. [[]] * N shared mutable lists (line 818) — classic Python pitfall, latent data corruption

⚠️ Disclosure: Part of this review was assisted by AI (Claude). Please double-check the findings independently — especially the CRITICAL ones — and let me know if any are inaccurate or based on incorrect assumptions.

This function recursively forms groups of sub-samples such that all DPxCP ranks
have a roughly balanced workload in the group.
"""
mslpr = self.max_seq_len_per_rank

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The name of mslpr is kind of confused here. Just use the full name is fine.

Comment on lines +987 to +1020
def dcp_make_buckets_equal(
sample_seqlens: List[Tuple[int, int]],
compute_estimator: Callable,
max_seq_len_per_rank: int,
min_cp_size: int = 1,
) -> List[deque]:
"""Split samples into buckets of roughly equal work, one per unique CP size."""
seqlens = [seq_len for _, seq_len in sample_seqlens]
k = len({dcp_gpus_needed(L, max_seq_len_per_rank, min_cp_size) for L in seqlens})

work = []
for _, s in sample_seqlens:
cp_size = dcp_gpus_needed(s, max_seq_len_per_rank, min_cp_size)
work.append(compute_estimator(s, cp_size))
total_work = sum(work)
target = total_work / k
buckets, cur, cur_work = [], [], 0.0
remaining_k = k

for i, (sample_id, seq_len) in enumerate(sample_seqlens):
w = compute_estimator(seq_len)
projected = cur_work + w
if cur and (
projected > target * 1.1 or len(sample_seqlens) - i <= remaining_k - len(buckets)
):
buckets.append(deque(cur))
cur, cur_work = [], 0.0
remaining_k -= 1
cur.append((sample_id, seq_len))
cur_work += w

if cur:
buckets.append(deque(cur))
return buckets

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The readability of this function could be improved, example code refactored with GPT:

def dcp_make_buckets_equal(
    sample_seqlens: List[Tuple[int, int]],
    compute_estimator: Callable,
    max_seq_len_per_rank: int,
    min_cp_size: int = 1,
	bucket_overfill_factor = 1.1,
) -> List[deque]:
    """Split samples into buckets of roughly equal work, one per unique CP size
    Args:
        sample_seqlens (List[Tuple[int, int]]): (sample_id, seq_len) tuples.
        compute_estimator (Callable[[int, Optional[int]], float]): workload estimator function.
        max_seq_len_per_rank (int): max tokens per rank for packing.
        min_cp_size (int, optional): minimum CP size for dynamic CP.
    """
    seqlens = [seq_len for _, seq_len in sample_seqlens]
    unique_cp_sizes = {dcp_gpus_needed(seq_len, max_seq_len_per_rank, min_cp_size) for seq_len in seqlens}
    target_bucket_count = len(unique_cp_sizes)

    per_sample_work = []
    for _, seq_len in sample_seqlens:
        cp_size = dcp_gpus_needed(seq_len, max_seq_len_per_rank, min_cp_size)
        per_sample_work.append(compute_estimator(seq_len, cp_size))

    total_work = sum(per_sample_work)
    target_work_per_bucket = total_work / target_bucket_count

    buckets, current_bucket, current_bucket_work = [], [], 0.0
    remaining_target_buckets = target_bucket_count

    for sample_idx, (sample_id, seq_len) in enumerate(sample_seqlens):
        sample_work = compute_estimator(seq_len)
        projected_bucket_work = current_bucket_work + sample_work
        need_reserve_buckets_for_remaining_samples = len(sample_seqlens) - sample_idx <= (
            remaining_target_buckets - len(buckets)
        )
        exceeds_bucket_work_target = projected_bucket_work > (
            target_work_per_bucket * bucket_overfill_factor
        )
        should_close_current_bucket = bool(current_bucket) and (
            exceeds_bucket_work_target or need_reserve_buckets_for_remaining_samples
        )

        if should_close_current_bucket:
            buckets.append(deque(current_bucket))
            current_bucket, current_bucket_work = [], 0.0
            remaining_target_buckets -= 1

        current_bucket.append((sample_id, seq_len))
        current_bucket_work += sample_work

    if current_bucket:
        buckets.append(deque(current_bucket))
    return buckets

Some conventions for better readability:

  1. Use meaningful name with clarity.
  2. Simply complex condition
  3. Prevent hardcoded magic number like 1.1 here.
  4. Use consistent naming. In this PR, workload should be preferred than work/compute.

max_seq_len_per_rank: int,
min_cp_size: int = 1,
) -> List[deque]:
"""Split samples into buckets of roughly equal work, one per unique CP size."""

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems this function doesn't guard the number of buckets equal to the number of unique CP sizes?
If so, the comments should be updated.

while i >= 0:
sid0 = sample_id_group[i][0]
cp_size = 0
while sid0 in sample_id_group[i] and i >= 0:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[CRITICAL Correctness] fill_empty inner while loop condition order causes Python negative-index wrap-around.

When i = 0, the loop evaluates sample_id_group[0] (true), then decrements i to -1. On the next iteration, Python evaluates sample_id_group[-1] (wrapping to the last element) before checking i >= 0. This inflates cp_size, causing align_sample_id_groups to split micro-batches incorrectly under VPP.

Suggestion:

# Before:
while sid0 in sample_id_group[i] and i >= 0:
# After (short-circuit prevents negative index access):
while i >= 0 and sid0 in sample_id_group[i]:

Comment on lines +811 to +813
assert not all(
work for work in micro_batches[empty_gpu : empty_gpu + needed_count]
), "Empty GPUs were detected but not enough to expand."

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[CRITICAL Correctness] Assertion logic is inverted — allows overwriting non-empty GPU slots.

not all(work for ...) passes when at least one slot is empty, but the intent is "all slots in this range must be empty". With the current logic, if only 1 of needed_count slots is empty and the others contain real data, the assertion passes and the expansion overwrites existing assignments silently.

Suggestion:

# Before:
assert not all(
    work for work in micro_batches[empty_gpu : empty_gpu + needed_count]
), "Empty GPUs were detected but not enough to expand."
# After:
assert all(
    not work for work in micro_batches[empty_gpu : empty_gpu + needed_count]
), "Empty GPUs were detected but not enough contiguous empty slots to expand."

Comment on lines +796 to +799
assert (
existing_group_sizes
), "There should be at least one group existing, cannot redistribute, "
"try to increase 'max-seqlen-per-dp-cp-rank'."

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[IMPORTANT Correctness] Assert message is silently truncated — the second string line is a standalone expression, not part of the message.

Without enclosing parentheses, the newline after "... cannot redistribute, " terminates the assert statement. "try to increase 'max-seqlen-per-dp-cp-rank'." on the next line becomes a no-op expression statement.

Suggestion:

assert existing_group_sizes, (
    "There should be at least one group existing, cannot redistribute, "
    "try to increase 'max-seqlen-per-dp-cp-rank'."
)

Comment on lines +602 to +613
def next_hdp_group(
sample_seqlens: List[Tuple[int, int]],
compute_estimator: Callable[[int], float],
total_gpus: int,
gpus_needed_fn: Callable[[int], int],
make_buckets_equal_fn: Callable,
max_seq_len_per_rank: float,
get_total_workload_fn: Callable,
delta: float = 0.05,
strategy: str = "dp",
eps_bucket: float = 0.10,
) -> Tuple[List[List[int]], List[Tuple[int, int]], List[float], List[List[int]]]:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[IMPORTANT Readability] next_hdp_group is 270 lines with 3 nested closures (trim_overload, fill_empty_gpus, inner fill_empty), mixing greedy scheduling, balance checking, empty-GPU filling, and overload trimming in a single scope.

The closures capture mutable outer state via nonlocal, making it hard to reason about which variables are modified where. fill_empty_gpus alone is ~70 lines of array-shifting logic that would be much easier to test and review as a standalone module-level function.

Suggestion: Extract fill_empty_gpus (and trim_overload when uncommented) into top-level functions with explicit input/output signatures. next_hdp_group should only contain the greedy main loop.

Comment on lines +635 to +647
micro_batches = [[] for _ in range(total_gpus)]
exec_times = [0.0 for _ in range(total_gpus)]
sample_ids_per_gpu = [[] for _ in range(total_gpus)]
packing_sequence_len = {}

gpu_group_id = [None] * total_gpus
group_members = {}
group_size = {}
next_gid = 0

pp_cursor = 0
prev_needed = None
check_balance = False

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[SUGGESTION Naming] Core scheduling state variables are overly abbreviated for a 270-line function in distributed scheduling code.

In a function this long with multiple nested scopes, short names like gid, g_members, g_size, nc, next_pw force readers to keep a mental lookup table. Suggested renames:

Current Suggested
gpu_group_id (OK)
group_members (OK)
group_size group_cp_size (disambiguate from len(group_members[gid]))
next_gid next_group_id
packing_sequence_len packed_tokens_per_rank
pp_cursor consider removing (see strategy comment)

# Step4: Prepare "local_cp_size" if dynamic context parallel is enabled.
if dynamic_cp:
if is_tp_rank_0:
if type(batch['local_cp_size']) == int:

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[SUGGESTION Readability] Prefer isinstance(batch['local_cp_size'], int) over type(...) == int.

isinstance is the Pythonic idiom and correctly handles int subclasses (e.g. np.int64). Same applies to the type(batch['max_seqlen']) == int check on line 595.

Comment on lines +653 to +657
scan_order = (
range(len(buckets))
if strategy == "dp"
else [(pp_cursor + i) % len(buckets) for i in range(len(buckets))]
)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[SUGGESTION Simplification] strategy parameter is always "dp" — the "pp" branch and pp_cursor variable are dead code.

No caller passes strategy="pp", so the round-robin scan_order path is unreachable. The pp_cursor maintenance (lines 675-676, 732) and the strategy parameter add complexity without providing value.

If "pp" is planned future work, add a # TODO with a tracking issue. Otherwise, remove the strategy parameter and the pp-related code to simplify the already complex scheduling logic.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

dev branch Dev branch related issues and development enhancement New feature or request module: moe

Projects

None yet

Development

Successfully merging this pull request may close these issues.