Skip to content

Add E2E support for THD format#3386

Open
xiaoyao0115 wants to merge 4 commits into
NVIDIA:mainfrom
xiaoyao0115:thd_e2e_main
Open

Add E2E support for THD format#3386
xiaoyao0115 wants to merge 4 commits into
NVIDIA:mainfrom
xiaoyao0115:thd_e2e_main

Conversation

@xiaoyao0115

@xiaoyao0115 xiaoyao0115 commented Feb 12, 2026

Copy link
Copy Markdown
Contributor

Description

This PR adds Sequence Packing (THD format) E2E support to MCore. Dev branch PR:#2924

The core missing functionalities of THD in MCore are:

  • data iterator cannot handle THD meta data, like cu_seqlens, max_seqlens.
  • num_microbatches is fixed.
  • PackParams are not passing between PP ranks.

Key Changes

1. Add a data_iterator wrapper (megatron/core/datasets/data_schedule.py::wrap_dataloader)

A wrapper function that intercepts the data iterator to perform scheduling and packing:

  • Schedule & Pack: Extracts data from the data iterator, schedules sequences across DP×CP ranks, and packs them into microbatches with cu_seqlens metadata.
  • Returns packing results: Returns the packed num_microbatches along with two parameters for FLOPs calculation: num_total_tokens_this_global_batch and sequence_square_sum_this_global_batch.
  • TP broadcast: Broadcasts num_microbatches and FLOPs parameters across TP ranks since only TP rank 0 has access to the data iterator.
  • PP broadcast: When using PP, middle PP stages (not first or last) require metadata (cu_seqlens, cu_seqlens_padded, max_seqlen, etc.) to be broadcast from PP rank 0 for correct computation.

2. Mock SFT Dataset Support

Supports mock datasets for testing and benchmarking with configurable sequence length distributions.
There are two modes of mock sft dataset:

  • File mode: Load sequence lengths from an external CSV, example json:
    {"mode": "file", "path": "/path/to/seqlens.csv"}
  • Distribution mode: Generate sequence lengths from a distribution (currently supports lognormal), example json:
    {"mode": "distribution", "type": "lognormal", "min_seq_len": 1024, "max_seq_len": 8192, "mean_seq_len": 4096, "lognormal_sigma": 1.1}

Architecture

Before vs After

graph LR
    subgraph Before
        A1[DataIterator] --> B1[get_batch]
        B1 --> C1[forward_backward]
        C1 --> D1[Fixed seq_len FLOPs]
    end
    subgraph After
        A2[DataIterator] --> W[wrap_dataloader]
        W -->|schedule + pack| B2[PackedDataIterator]
        W -->|broadcast| M[num_microbatches + flops_params]
        B2 --> C2[get_batch_for_sequence_packing]
        C2 --> D2[forward_backward]
        D2 --> E2[Dynamic FLOPs]
        M   --> E2
    end
Loading

Execution Flow

sequenceDiagram
    participant Train as training.py
    participant Schedule as schedules.py
    participant Wrap as wrap_iterator_helper
    participant DataSched as data_schedule.py
    participant GetBatch as get_batch_for_seq_packing

    Train->>Schedule: forward_backward_*(data_iterator)
    Schedule->>Wrap: wrap_iterator_helper(config, data_iterator)
    Wrap->>DataSched: wrap_dataloader(data_iterator, scheduler_type)
    
    Note over DataSched: 1. Gather global seqlens across DP
    Note over DataSched: 2. Scheduler assigns sequences to microbatches
    Note over DataSched: 3. All-to-all redistribute samples
    Note over DataSched: 4. Pack into microbatches
    Note over DataSched: 5. Broadcast to TP/PP ranks
    
    DataSched-->>Schedule: (packed_iter, num_mbs, total_tokens, seq_sq_sum)
    
    loop for each microbatch
        Schedule->>GetBatch: get_batch_on_this_rank_for_sequence_packing
        Note over GetBatch: Broadcast tokens/labels to TP group
        Note over GetBatch: Partition for CP if needed
        GetBatch-->>Schedule: (tokens, labels, loss_mask, pos_ids, packed_seq_params)
    end
    
    Schedule-->>Train: forward_data_store + [total_tokens, seq_sq_sum]
Loading

New Arguments

Argument Type Description
--sequence-packing flag Enable sequence packing (THD format) for training
--sequence-packing-scheduler str Scheduler type: default or empty
--sft-mock-dataset-config-json str JSON config for mock dataset

Changes

File Description
megatron/core/datasets/data_schedule.py Core scheduling and packing logic
megatron/core/pipeline_parallel/schedules.py Integration with forward/backward schedules
megatron/training/training.py Updated FLOPs calculation for variable-length sequences
megatron/training/datasets/sft_dataset.py Mock dataset support
megatron/training/arguments.py New CLI arguments
megatron/core/model_parallel_config.py Configuration options
tests/unit_tests/test_sequence_packing.py Unit tests

Code review

The following process is enforced via the CODEOWNERS file for changes into megatron/core. For changes outside of megatron/core, it is up to the PR author whether or not to tag the Final Reviewer team.

For MRs into `main` branch

Feel free to message or comment the @mcore-oncall to help accelerate your merge into main. The less complex your PR is, the faster it will be approved and merged!

(Step 1): Add PR label Expert Review

(Step 2): Collect the expert reviewers reviews

  1. Attach the Expert Review label when your PR is ready for review.
  2. GitHub auto-assigns expert reviewers based on your changes. They will get notified and pick up your PR soon.

⚠️ Only proceed to the next step once all reviewers have approved, merge-conflict are resolved and the CI is passing.
Final Review might get declined if these requirements are not fulfilled.

(Step 3): Final Review

  1. Add Final Review label
  2. GitHub auto-assigns final reviewers based on your changes. They will get notified and pick up your PR soon.

(Optional Step 4): Cherry-pick into release branch

If this PR also needs to be merged into core_r* release branches, after this PR has been merged, select Cherry-pick to open a new PR into the release branch.

For MRs into `dev` branch The proposed review process for `dev` branch is under active discussion.

MRs are mergable after one approval by either eharper@nvidia.com or zijiey@nvidia.com.

Merging your PR

Any member of core-adlr and core-nemo will be able to merge your PR.

@xiaoyao0115 xiaoyao0115 requested review from a team as code owners February 12, 2026 15:22
@copy-pr-bot

copy-pr-bot Bot commented Feb 12, 2026

Copy link
Copy Markdown

This pull request requires additional validation before any workflows can run on NVIDIA's runners.

Pull request vetters can view their responsibilities here.

Contributors can view more details about this message here.

@Phlip79

Phlip79 commented Mar 4, 2026

Copy link
Copy Markdown
Member

We are changing our review process and marking all open, unlabeled PRs as draft. This change will go in effect starting once #3659 is merged.

Moving forward, all PRs will be required to start as draft PRs. If you wish to get your PR merged, mark your PR as “Ready for review”. Read more about the new process at submit.md.

@Phlip79 Phlip79 marked this pull request as draft March 4, 2026 23:36
@xiaoyao0115 xiaoyao0115 marked this pull request as ready for review March 16, 2026 11:10
@xiaoyao0115

Copy link
Copy Markdown
Contributor Author

/ok to test e8846b3

@Phlip79

Phlip79 commented Apr 3, 2026

Copy link
Copy Markdown
Member

/claude review

Comment thread megatron/training/arguments.py Outdated
help='Enables hybrid context parallel. This is used to balance the workload '
'of each CP rank when we use packed samples with variable sequence lengths. '
'Requires --max-seqlen-per-dp-cp-rank to be set.')
group.add_argument('--sequence-packing-scheduler', type=str, default='default_sequence_packing', choices=['default_sequence_packing'])

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: The default value 'default_sequence_packing' and choices=['default_sequence_packing'] don't match any valid scheduler. The PackingSchedulerEnum only has DP_BALANCED = "dp_balanced", and transformer_config.py validates against supported_schedulers = ['dp_balanced']. Using this CLI arg will crash at PackingSchedulerEnum[scheduler_type.upper()] since "DEFAULT_SEQUENCE_PACKING" is not a valid enum member.

Suggested change
group.add_argument('--sequence-packing-scheduler', type=str, default='default_sequence_packing', choices=['default_sequence_packing'])
group.add_argument('--sequence-packing-scheduler', type=str, default=None, choices=['dp_balanced'])

Comment on lines +380 to +388
t = batch[gid2local_id[gid]][key].to(torch.cuda.current_device(), non_blocking=True)
flattened_tensors.append(t.reshape(-1))
return (
torch.cat(flattened_tensors, dim=0)
if flattened_tensors
else torch.empty(1, device=torch.cuda.current_device(), dtype=batch[0][key].dtype)
)

def _unpack_sample_by_key(key: str, recv_tensor: torch.Tensor):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Using np.where(info_numpy == 0) to find cu_seqlens / cu_seqlens_padded boundaries is fragile. If any of the header values (num_micro_batches, seqlen_sum, seqlen_squared_sum) or any max_seqlen value happens to be 0 (or very close to 0 after float32 cast), the zero-indexed boundary detection will produce wrong indices, leading to silent data corruption on middle PP stages.

Consider encoding the lengths of each cu_seqlens / cu_seqlens_padded tensor explicitly in the broadcast payload (e.g., prepend the number of sequences per microbatch) so that the receiver can unpack deterministically without relying on sentinel values.

import atexit, json
from collections import Counter
from typing import Any, Dict, Optional
import json

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: json is already imported on line 3 (import atexit, json). This is a duplicate import.

Suggested change
import json

@Phlip79 Phlip79 requested a review from asolergi-nv April 15, 2026 00:16
Comment thread pretrain_gpt.py Outdated
"data_parallel_size": args.data_parallel_size,
"sequence_parallel_size": args.tensor_model_parallel_size * args.sequence_parallel,
"hybrid_context_parallel": args.hybrid_context_parallel,
"sft_mock_dataset_config_json":args.sft_mock_dataset_config_json,

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

formatting

Comment on lines +269 to +270
return sample
class MockSFTDataset(SFTDataset):

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

formatting.

@Phlip79

Phlip79 commented Jun 9, 2026

Copy link
Copy Markdown
Member

@xiaoyao0115 it would be helpful with the merge process if you please split this PR up. I recommend using the split-pr skill. There are also unresolved comments from Claude, @jkamalu, and conflicts with main.

xiaoyao0115 and others added 4 commits June 9, 2026 01:40
Signed-off-by: xiaoyao0115 <1804647152@qq.com>
Signed-off-by: tailaim <tailaim@nvidia.com>
Signed-off-by: xiaoyao0115 <1804647152@qq.com>
Signed-off-by: tailaim <tailaim@nvidia.com>
Signed-off-by: xiaoyao0115 <1804647152@qq.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants