Add E2E support for THD format#3386
Conversation
|
We are changing our review process and marking all open, unlabeled PRs as draft. This change will go in effect starting once #3659 is merged. Moving forward, all PRs will be required to start as draft PRs. If you wish to get your PR merged, mark your PR as “Ready for review”. Read more about the new process at submit.md. |
c226649 to
e8846b3
Compare
|
/ok to test e8846b3 |
e8846b3 to
556f574
Compare
|
/claude review |
| help='Enables hybrid context parallel. This is used to balance the workload ' | ||
| 'of each CP rank when we use packed samples with variable sequence lengths. ' | ||
| 'Requires --max-seqlen-per-dp-cp-rank to be set.') | ||
| group.add_argument('--sequence-packing-scheduler', type=str, default='default_sequence_packing', choices=['default_sequence_packing']) |
There was a problem hiding this comment.
Bug: The default value 'default_sequence_packing' and choices=['default_sequence_packing'] don't match any valid scheduler. The PackingSchedulerEnum only has DP_BALANCED = "dp_balanced", and transformer_config.py validates against supported_schedulers = ['dp_balanced']. Using this CLI arg will crash at PackingSchedulerEnum[scheduler_type.upper()] since "DEFAULT_SEQUENCE_PACKING" is not a valid enum member.
| group.add_argument('--sequence-packing-scheduler', type=str, default='default_sequence_packing', choices=['default_sequence_packing']) | |
| group.add_argument('--sequence-packing-scheduler', type=str, default=None, choices=['dp_balanced']) |
| t = batch[gid2local_id[gid]][key].to(torch.cuda.current_device(), non_blocking=True) | ||
| flattened_tensors.append(t.reshape(-1)) | ||
| return ( | ||
| torch.cat(flattened_tensors, dim=0) | ||
| if flattened_tensors | ||
| else torch.empty(1, device=torch.cuda.current_device(), dtype=batch[0][key].dtype) | ||
| ) | ||
|
|
||
| def _unpack_sample_by_key(key: str, recv_tensor: torch.Tensor): |
There was a problem hiding this comment.
Bug: Using np.where(info_numpy == 0) to find cu_seqlens / cu_seqlens_padded boundaries is fragile. If any of the header values (num_micro_batches, seqlen_sum, seqlen_squared_sum) or any max_seqlen value happens to be 0 (or very close to 0 after float32 cast), the zero-indexed boundary detection will produce wrong indices, leading to silent data corruption on middle PP stages.
Consider encoding the lengths of each cu_seqlens / cu_seqlens_padded tensor explicitly in the broadcast payload (e.g., prepend the number of sequences per microbatch) so that the receiver can unpack deterministically without relying on sentinel values.
| import atexit, json | ||
| from collections import Counter | ||
| from typing import Any, Dict, Optional | ||
| import json |
There was a problem hiding this comment.
Nit: json is already imported on line 3 (import atexit, json). This is a duplicate import.
| import json |
| "data_parallel_size": args.data_parallel_size, | ||
| "sequence_parallel_size": args.tensor_model_parallel_size * args.sequence_parallel, | ||
| "hybrid_context_parallel": args.hybrid_context_parallel, | ||
| "sft_mock_dataset_config_json":args.sft_mock_dataset_config_json, |
| return sample | ||
| class MockSFTDataset(SFTDataset): |
|
@xiaoyao0115 it would be helpful with the merge process if you please split this PR up. I recommend using the |
Signed-off-by: xiaoyao0115 <1804647152@qq.com>
Signed-off-by: tailaim <tailaim@nvidia.com>
Signed-off-by: xiaoyao0115 <1804647152@qq.com>
Signed-off-by: tailaim <tailaim@nvidia.com> Signed-off-by: xiaoyao0115 <1804647152@qq.com>
Description
This PR adds Sequence Packing (THD format) E2E support to MCore. Dev branch PR:#2924
The core missing functionalities of THD in MCore are:
Key Changes
1. Add a data_iterator wrapper (megatron/core/datasets/data_schedule.py::wrap_dataloader)
A wrapper function that intercepts the data iterator to perform scheduling and packing:
cu_seqlensmetadata.num_microbatchesalong with two parameters for FLOPs calculation:num_total_tokens_this_global_batchandsequence_square_sum_this_global_batch.num_microbatchesand FLOPs parameters across TP ranks since only TP rank 0 has access to the data iterator.cu_seqlens,cu_seqlens_padded,max_seqlen, etc.) to be broadcast from PP rank 0 for correct computation.2. Mock SFT Dataset Support
Supports mock datasets for testing and benchmarking with configurable sequence length distributions.
There are two modes of mock sft dataset:
{"mode": "file", "path": "/path/to/seqlens.csv"}{"mode": "distribution", "type": "lognormal", "min_seq_len": 1024, "max_seq_len": 8192, "mean_seq_len": 4096, "lognormal_sigma": 1.1}Architecture
Before vs After
graph LR subgraph Before A1[DataIterator] --> B1[get_batch] B1 --> C1[forward_backward] C1 --> D1[Fixed seq_len FLOPs] end subgraph After A2[DataIterator] --> W[wrap_dataloader] W -->|schedule + pack| B2[PackedDataIterator] W -->|broadcast| M[num_microbatches + flops_params] B2 --> C2[get_batch_for_sequence_packing] C2 --> D2[forward_backward] D2 --> E2[Dynamic FLOPs] M --> E2 endExecution Flow
sequenceDiagram participant Train as training.py participant Schedule as schedules.py participant Wrap as wrap_iterator_helper participant DataSched as data_schedule.py participant GetBatch as get_batch_for_seq_packing Train->>Schedule: forward_backward_*(data_iterator) Schedule->>Wrap: wrap_iterator_helper(config, data_iterator) Wrap->>DataSched: wrap_dataloader(data_iterator, scheduler_type) Note over DataSched: 1. Gather global seqlens across DP Note over DataSched: 2. Scheduler assigns sequences to microbatches Note over DataSched: 3. All-to-all redistribute samples Note over DataSched: 4. Pack into microbatches Note over DataSched: 5. Broadcast to TP/PP ranks DataSched-->>Schedule: (packed_iter, num_mbs, total_tokens, seq_sq_sum) loop for each microbatch Schedule->>GetBatch: get_batch_on_this_rank_for_sequence_packing Note over GetBatch: Broadcast tokens/labels to TP group Note over GetBatch: Partition for CP if needed GetBatch-->>Schedule: (tokens, labels, loss_mask, pos_ids, packed_seq_params) end Schedule-->>Train: forward_data_store + [total_tokens, seq_sq_sum]New Arguments
--sequence-packing--sequence-packing-schedulerdefaultorempty--sft-mock-dataset-config-jsonChanges
megatron/core/datasets/data_schedule.pymegatron/core/pipeline_parallel/schedules.pymegatron/training/training.pymegatron/training/datasets/sft_dataset.pymegatron/training/arguments.pymegatron/core/model_parallel_config.pytests/unit_tests/test_sequence_packing.pyCode review
The following process is enforced via the CODEOWNERS file for changes into
megatron/core. For changes outside ofmegatron/core, it is up to the PR author whether or not to tag the Final Reviewer team.For MRs into `main` branch
Feel free to message or comment the @mcore-oncall to help accelerate your merge into main. The less complex your PR is, the faster it will be approved and merged!
(Step 1): Add PR label
Expert Review(Step 2): Collect the expert reviewers reviews
Expert Reviewlabel when your PR is ready for review.Final Review might get declined if these requirements are not fulfilled.
(Step 3): Final Review
Final Reviewlabel(Optional Step 4): Cherry-pick into release branch
If this PR also needs to be merged into
core_r*release branches, after this PR has been merged, selectCherry-pickto open a new PR into the release branch.For MRs into `dev` branch
The proposed review process for `dev` branch is under active discussion.MRs are mergable after one approval by either
eharper@nvidia.comorzijiey@nvidia.com.Merging your PR
Any member of core-adlr and
core-nemowill be able to merge your PR.