[Dev] feat: Dynamic CP (part 2)#2000
Conversation
f33edcd to
48e91d2
Compare
|
Is there any difference between this and #2054? |
This is the second MR, we need to merge 2054 first, and then this 2000 (The reason the second MR is 2000, while the first one is 2054 (>2000), is because they were migrated from GitLab at different times) |
Got it, thanks! Could you please update the title to reflect this? |
983e5f3 to
11d9960
Compare
|
/ok to test e0c90c5 |
e0c90c5 to
501a5f6
Compare
|
/ok to test b4c4fe6 |
|
/ok to test b649d0b |
|
/ok to test f632053 |
|
/ok to test 4319109 |
|
/ok to test ec6b5f4 |
|
/ok to test 0cbcc94 |
|
/ok to test 49f9e79 |
|
/ok to test 45b1232 |
Signed-off-by: tailaim <tailaim@nvidia.com>
Signed-off-by: tailaim <tailaim@nvidia.com>
Signed-off-by: tailaim <tailaim@nvidia.com>
Co-authored-by: Yuzhong Wang <yuzhongw@nvidia.com> Update model_config.yaml Update model_config.yaml
|
/ok to test aa49993 |
|
🔄 Merge queue validation started! You can track the progress here: https://github.com/NVIDIA/Megatron-LM/actions/runs/24075960736 |
Victarry
left a comment
There was a problem hiding this comment.
Thanks for the great work on Dynamic CP Part 2 — the design of routing dynamic CP through the standard PP/VPP schedule is clean and well-motivated.
Left a mix of comments across CRITICAL / IMPORTANT / SUGGESTION levels. The most actionable ones are:
fill_emptynegative-index wrap-around (line 921) — condition order bug in VPP alignmentfill_empty_gpusassertion inverted (line 811) — allows overwriting non-empty GPU slots[[]] * Nshared mutable lists (line 818) — classic Python pitfall, latent data corruption
| This function recursively forms groups of sub-samples such that all DPxCP ranks | ||
| have a roughly balanced workload in the group. | ||
| """ | ||
| mslpr = self.max_seq_len_per_rank |
There was a problem hiding this comment.
The name of mslpr is kind of confused here. Just use the full name is fine.
| def dcp_make_buckets_equal( | ||
| sample_seqlens: List[Tuple[int, int]], | ||
| compute_estimator: Callable, | ||
| max_seq_len_per_rank: int, | ||
| min_cp_size: int = 1, | ||
| ) -> List[deque]: | ||
| """Split samples into buckets of roughly equal work, one per unique CP size.""" | ||
| seqlens = [seq_len for _, seq_len in sample_seqlens] | ||
| k = len({dcp_gpus_needed(L, max_seq_len_per_rank, min_cp_size) for L in seqlens}) | ||
|
|
||
| work = [] | ||
| for _, s in sample_seqlens: | ||
| cp_size = dcp_gpus_needed(s, max_seq_len_per_rank, min_cp_size) | ||
| work.append(compute_estimator(s, cp_size)) | ||
| total_work = sum(work) | ||
| target = total_work / k | ||
| buckets, cur, cur_work = [], [], 0.0 | ||
| remaining_k = k | ||
|
|
||
| for i, (sample_id, seq_len) in enumerate(sample_seqlens): | ||
| w = compute_estimator(seq_len) | ||
| projected = cur_work + w | ||
| if cur and ( | ||
| projected > target * 1.1 or len(sample_seqlens) - i <= remaining_k - len(buckets) | ||
| ): | ||
| buckets.append(deque(cur)) | ||
| cur, cur_work = [], 0.0 | ||
| remaining_k -= 1 | ||
| cur.append((sample_id, seq_len)) | ||
| cur_work += w | ||
|
|
||
| if cur: | ||
| buckets.append(deque(cur)) | ||
| return buckets |
There was a problem hiding this comment.
The readability of this function could be improved, example code refactored with GPT:
def dcp_make_buckets_equal(
sample_seqlens: List[Tuple[int, int]],
compute_estimator: Callable,
max_seq_len_per_rank: int,
min_cp_size: int = 1,
bucket_overfill_factor = 1.1,
) -> List[deque]:
"""Split samples into buckets of roughly equal work, one per unique CP size
Args:
sample_seqlens (List[Tuple[int, int]]): (sample_id, seq_len) tuples.
compute_estimator (Callable[[int, Optional[int]], float]): workload estimator function.
max_seq_len_per_rank (int): max tokens per rank for packing.
min_cp_size (int, optional): minimum CP size for dynamic CP.
"""
seqlens = [seq_len for _, seq_len in sample_seqlens]
unique_cp_sizes = {dcp_gpus_needed(seq_len, max_seq_len_per_rank, min_cp_size) for seq_len in seqlens}
target_bucket_count = len(unique_cp_sizes)
per_sample_work = []
for _, seq_len in sample_seqlens:
cp_size = dcp_gpus_needed(seq_len, max_seq_len_per_rank, min_cp_size)
per_sample_work.append(compute_estimator(seq_len, cp_size))
total_work = sum(per_sample_work)
target_work_per_bucket = total_work / target_bucket_count
buckets, current_bucket, current_bucket_work = [], [], 0.0
remaining_target_buckets = target_bucket_count
for sample_idx, (sample_id, seq_len) in enumerate(sample_seqlens):
sample_work = compute_estimator(seq_len)
projected_bucket_work = current_bucket_work + sample_work
need_reserve_buckets_for_remaining_samples = len(sample_seqlens) - sample_idx <= (
remaining_target_buckets - len(buckets)
)
exceeds_bucket_work_target = projected_bucket_work > (
target_work_per_bucket * bucket_overfill_factor
)
should_close_current_bucket = bool(current_bucket) and (
exceeds_bucket_work_target or need_reserve_buckets_for_remaining_samples
)
if should_close_current_bucket:
buckets.append(deque(current_bucket))
current_bucket, current_bucket_work = [], 0.0
remaining_target_buckets -= 1
current_bucket.append((sample_id, seq_len))
current_bucket_work += sample_work
if current_bucket:
buckets.append(deque(current_bucket))
return buckets
Some conventions for better readability:
- Use meaningful name with clarity.
- Simply complex condition
- Prevent hardcoded magic number like 1.1 here.
- Use consistent naming. In this PR,
workloadshould be preferred thanwork/compute.
| max_seq_len_per_rank: int, | ||
| min_cp_size: int = 1, | ||
| ) -> List[deque]: | ||
| """Split samples into buckets of roughly equal work, one per unique CP size.""" |
There was a problem hiding this comment.
It seems this function doesn't guard the number of buckets equal to the number of unique CP sizes?
If so, the comments should be updated.
| while i >= 0: | ||
| sid0 = sample_id_group[i][0] | ||
| cp_size = 0 | ||
| while sid0 in sample_id_group[i] and i >= 0: |
There was a problem hiding this comment.
[CRITICAL Correctness] fill_empty inner while loop condition order causes Python negative-index wrap-around.
When i = 0, the loop evaluates sample_id_group[0] (true), then decrements i to -1. On the next iteration, Python evaluates sample_id_group[-1] (wrapping to the last element) before checking i >= 0. This inflates cp_size, causing align_sample_id_groups to split micro-batches incorrectly under VPP.
Suggestion:
# Before:
while sid0 in sample_id_group[i] and i >= 0:
# After (short-circuit prevents negative index access):
while i >= 0 and sid0 in sample_id_group[i]:| assert not all( | ||
| work for work in micro_batches[empty_gpu : empty_gpu + needed_count] | ||
| ), "Empty GPUs were detected but not enough to expand." |
There was a problem hiding this comment.
[CRITICAL Correctness] Assertion logic is inverted — allows overwriting non-empty GPU slots.
not all(work for ...) passes when at least one slot is empty, but the intent is "all slots in this range must be empty". With the current logic, if only 1 of needed_count slots is empty and the others contain real data, the assertion passes and the expansion overwrites existing assignments silently.
Suggestion:
# Before:
assert not all(
work for work in micro_batches[empty_gpu : empty_gpu + needed_count]
), "Empty GPUs were detected but not enough to expand."
# After:
assert all(
not work for work in micro_batches[empty_gpu : empty_gpu + needed_count]
), "Empty GPUs were detected but not enough contiguous empty slots to expand."| assert ( | ||
| existing_group_sizes | ||
| ), "There should be at least one group existing, cannot redistribute, " | ||
| "try to increase 'max-seqlen-per-dp-cp-rank'." |
There was a problem hiding this comment.
[IMPORTANT Correctness] Assert message is silently truncated — the second string line is a standalone expression, not part of the message.
Without enclosing parentheses, the newline after "... cannot redistribute, " terminates the assert statement. "try to increase 'max-seqlen-per-dp-cp-rank'." on the next line becomes a no-op expression statement.
Suggestion:
assert existing_group_sizes, (
"There should be at least one group existing, cannot redistribute, "
"try to increase 'max-seqlen-per-dp-cp-rank'."
)| def next_hdp_group( | ||
| sample_seqlens: List[Tuple[int, int]], | ||
| compute_estimator: Callable[[int], float], | ||
| total_gpus: int, | ||
| gpus_needed_fn: Callable[[int], int], | ||
| make_buckets_equal_fn: Callable, | ||
| max_seq_len_per_rank: float, | ||
| get_total_workload_fn: Callable, | ||
| delta: float = 0.05, | ||
| strategy: str = "dp", | ||
| eps_bucket: float = 0.10, | ||
| ) -> Tuple[List[List[int]], List[Tuple[int, int]], List[float], List[List[int]]]: |
There was a problem hiding this comment.
[IMPORTANT Readability] next_hdp_group is 270 lines with 3 nested closures (trim_overload, fill_empty_gpus, inner fill_empty), mixing greedy scheduling, balance checking, empty-GPU filling, and overload trimming in a single scope.
The closures capture mutable outer state via nonlocal, making it hard to reason about which variables are modified where. fill_empty_gpus alone is ~70 lines of array-shifting logic that would be much easier to test and review as a standalone module-level function.
Suggestion: Extract fill_empty_gpus (and trim_overload when uncommented) into top-level functions with explicit input/output signatures. next_hdp_group should only contain the greedy main loop.
| micro_batches = [[] for _ in range(total_gpus)] | ||
| exec_times = [0.0 for _ in range(total_gpus)] | ||
| sample_ids_per_gpu = [[] for _ in range(total_gpus)] | ||
| packing_sequence_len = {} | ||
|
|
||
| gpu_group_id = [None] * total_gpus | ||
| group_members = {} | ||
| group_size = {} | ||
| next_gid = 0 | ||
|
|
||
| pp_cursor = 0 | ||
| prev_needed = None | ||
| check_balance = False |
There was a problem hiding this comment.
[SUGGESTION Naming] Core scheduling state variables are overly abbreviated for a 270-line function in distributed scheduling code.
In a function this long with multiple nested scopes, short names like gid, g_members, g_size, nc, next_pw force readers to keep a mental lookup table. Suggested renames:
| Current | Suggested |
|---|---|
gpu_group_id |
(OK) |
group_members |
(OK) |
group_size |
group_cp_size (disambiguate from len(group_members[gid])) |
next_gid |
next_group_id |
packing_sequence_len |
packed_tokens_per_rank |
pp_cursor |
consider removing (see strategy comment) |
| # Step4: Prepare "local_cp_size" if dynamic context parallel is enabled. | ||
| if dynamic_cp: | ||
| if is_tp_rank_0: | ||
| if type(batch['local_cp_size']) == int: |
There was a problem hiding this comment.
[SUGGESTION Readability] Prefer isinstance(batch['local_cp_size'], int) over type(...) == int.
isinstance is the Pythonic idiom and correctly handles int subclasses (e.g. np.int64). Same applies to the type(batch['max_seqlen']) == int check on line 595.
| scan_order = ( | ||
| range(len(buckets)) | ||
| if strategy == "dp" | ||
| else [(pp_cursor + i) % len(buckets) for i in range(len(buckets))] | ||
| ) |
There was a problem hiding this comment.
[SUGGESTION Simplification] strategy parameter is always "dp" — the "pp" branch and pp_cursor variable are dead code.
No caller passes strategy="pp", so the round-robin scan_order path is unreachable. The pp_cursor maintenance (lines 675-676, 732) and the strategy parameter add complexity without providing value.
If "pp" is planned future work, add a # TODO with a tracking issue. Otherwise, remove the strategy parameter and the pp-related code to simplify the already complex scheduling logic.
This PR is the second part of hybrid-cp. The first part is: #2054
(PR for main branch: #2304 )
This PR provides end-to-end Dynamic CP support, including PP, VPP. (PR for main branch: #2304 )
Major changes
default_dynamic_cpscheduler. Dynamic CP now only requires specifying--sequence-packing-scheduler default_dynamic_cp(auto-set when--dynamic-context-parallelis enabled). The previous standalonedynamic_context_parallel_forward_backwardfunction andDynamicCPDataLoaderWrapperare removed — Dynamic CP now goes through the standard pipeline schedule, making it compatible with PP / VPP / TP out of the box.MambaMixerand the MTP module now dynamically switch the CP group per micro-batch viapacked_seq_params.cp_group, and restore the original group after each forward pass.Other changes
--min-dynamic-context-parallel-sizeargument to control the minimum CP group size (default 1).DynamicCPMegatronPretrainingSampler; Dynamic CP now uses the standardMegatronPretrainingSampler.get_batch_on_this_dynamic_cp_rankutility; all CP slicing goes through the THD packed path.next_hdp_group,dcp_gpus_needed,dcp_make_buckets_equal, etc.) from the deleteddynamic_cp_schedule.pyintodata_schedule_utils.pyas standalone functions.broadcast_to_pp_groupandcreate_data_iteratornow propagatelocal_cp_sizefor PP / VPP stages.TEDotProductAttentionto lazily initializecp_streamon dynamic CP path.attention.pyto restorepg_collection.cpafter dynamic CP forward._DYNAMIC_DP_CP_GROUPSindestroy_model_parallel.gpt3_mcore_te_tp2_pp1_cp4_dcpand extended unit tests with DCP parameter combinations.Convergence and performance
Convergence has been verified on Qwen3-30B-A3B on 32 GPUs, with
max_seqlenset to 49152 andmax_seqlen_per_dp_cp_rankset to 3072. In the figure below, bshd refers to running with CP=16, where sequences are padded tomax_seqlenand executed in the same bshd format as in pretraining. thd-packing refers to using CP=16 while packing variable-length sequences. In dynamic-cp, the maximum CP group size is also 16.Known limitations
Contribution process
flowchart LR A[Pre-checks] --> B[PR Tests] subgraph Code Review/Approval C1[Expert Review] --> C2[Final Review] end B --> C1 C2 --> D[Merge]Pre-checks
Core 0.8)Code review
The following process is enforced via the CODEOWNERS file for changes into
megatron/core. For changes outside ofmegatron/core, it is up to the PR author whether or not to tag the Final Reviewer team.For MRs into `main` branch
(Step 1): Add PR label
Expert Review(Step 2): Collect the expert reviewers reviews
Expert Reviewlabel when your PR is ready for review.Final Review might get declined if these requirements are not fulfilled.
(Step 3): Final Review
Final Reviewlabel(Optional Step 4): Cherry-pick into release branch
If this PR also needs to be merged into
core_r*release branches, after this PR has been merged, selectCherry-pickto open a new PR into the release branch.For MRs into `dev` branch
The proposed review process for `dev` branch is under active discussion.MRs are mergable after one approval by either
eharper@nvidia.comorzijiey@nvidia.com.Merging your PR
Any member of core-adlr and
core-nemowill be able to merge your PR.