feat: reinforcer initial commit#3
Merged
Merged
Conversation
Co-authored-by: Sahil Jain <sahil.jain5125@gmail.com> Co-authored-by: Parth Chadha <parth29@gmail.com> Co-authored-by: Terry Kong <terryk@nvidia.com> Co-authored-by: Anna Shors <ashors@nvidia.com> Co-authored-by: Gerald Shen <geshen@nvidia.com> Co-authored-by: Yuki Huang <yukih@nvidia.com> Co-authored-by: Hemil Desai <hemild@nvidia.com> Co-authored-by: Yi-Fu Wu <yifuw@nvidia.com> Co-authored-by: ahmadki <ahmadki@users.noreply.github.com> Co-authored-by: Nathan McKimpson <nmckimpson@nvidia.com> Co-authored-by: Charlie Truong <chtruong@nvidia.com> Signed-off-by: Terry Kong <terryk@nvidia.com>
Signed-off-by: Terry Kong <terryk@nvidia.com>
parthchadha
reviewed
Mar 20, 2025
Signed-off-by: Terry Kong <terryk@nvidia.com>
SahilJain314
approved these changes
Mar 20, 2025
parthchadha
approved these changes
Mar 20, 2025
KiddoZhu
pushed a commit
that referenced
this pull request
May 6, 2025
Co-authored-by: Sahil Jain <sahil.jain5125@gmail.com> Co-authored-by: Parth Chadha <parth29@gmail.com> Co-authored-by: Anna Shors <ashors@nvidia.com> Co-authored-by: Gerald Shen <geshen@nvidia.com> Co-authored-by: Yuki Huang <yukih@nvidia.com> Co-authored-by: Hemil Desai <hemild@nvidia.com> Co-authored-by: Yi-Fu Wu <yifuw@nvidia.com> Co-authored-by: ahmadki <ahmadki@users.noreply.github.com> Co-authored-by: Nathan McKimpson <nmckimpson@nvidia.com> Co-authored-by: Charlie Truong <chtruong@nvidia.com> Signed-off-by: Terry Kong <terryk@nvidia.com>
copy-pr-bot Bot
pushed a commit
that referenced
this pull request
Apr 2, 2026
* add Qwen3.5-35B megatron nightly tests (LLM + VLM) * set PYTORCH_CUDA_ALLOC_CONF expandable_segments to True to reduce OOM * change test reward threshold --------- Signed-off-by: alexchiu <qiuzhaopeng@foxmail.com> Co-authored-by: alexchiu <qiuzhaopeng@foxmail.com> Signed-off-by: jQizhang <larkz@nvidia.com>
copy-pr-bot Bot
pushed a commit
that referenced
this pull request
Apr 7, 2026
* add Qwen3.5-35B megatron nightly tests (LLM + VLM) * set PYTORCH_CUDA_ALLOC_CONF expandable_segments to True to reduce OOM * change test reward threshold --------- Signed-off-by: alexchiu <qiuzhaopeng@foxmail.com> Co-authored-by: alexchiu <qiuzhaopeng@foxmail.com> Signed-off-by: jQizhang <larkz@nvidia.com>
ZhiyuLi-Nvidia
added a commit
that referenced
this pull request
May 5, 2026
…patch TQ path Closes Issues #3 and #4 raised in PR review of the data-plane stack. Issue #3 — single-``KVBatchMeta`` path returned rows in scrambled order. ``shard_keys_by_seqlen`` sorts by sequence length and strides (``order[r::dp_world_size]``) to balance per-rank token totals. The worker logprob aggregators (``_aggregate_logprob_results``) then concatenate per-rank outputs in rank order via ``BatchedDataDict.from_batches`` — without inverting the seqlen- strided permutation. Result: ``policy.get_logprobs(KVBatchMeta(...))`` returned rows in [order[0], order[d], order[2d], …, order[1], order[1+d], …] order, not the caller's ``meta.keys`` order. Silent correctness bug (test_seqpack_legacy_equals_tq didn't catch it because the sync path calls ``policy.get_logprobs(BatchedDataDict)`` — legacy passthrough, no sharder). Fix: * ``shard_keys_by_seqlen`` records ``_dp_original_indices`` per shard in ``extra_info`` (the ``idx`` list it computed). * ``dp_dispatch`` reconstructs the concat-position → input-index permutation from the shards' ``extra_info``, then applies the inverse via ``BatchedDataDict.reorder_data`` after ``aggregate``. * The reorder is gated on ``is_meta and not is_meta_list`` — for ``list[KVBatchMeta]`` the driver controls ordering (PR 0 ``fan_out_per_rank_metas``) and the decorator must not undo it. * Skipped silently if the result isn't a BatchedDataDict (e.g. ``train`` returns a plain dict — order doesn't apply). Issue #4 — TQ path silently dropped legacy training semantics. The decorator's TQ branch returns ``aggregate(results)`` directly and never enters ``Policy.train``'s body — so the FLOPs accumulation at lm_policy.py around the ``flops_tracker`` block, plus the ``num_ranks`` and ``theoretical_tflops`` fields, were missing from results when the trainer called ``policy.train(KVBatchMeta)`` or ``policy.train(list[KVBatchMeta])``. Same gap for the missing GBS / DP divisibility assertion. Fix (additive — no signature changes to the existing aggregate callables): * ``dp_dispatch`` adds a basic divisibility assertion on the TQ path: ``total_meta_size % dp_size == 0`` (legacy path enforces this via ``shard_by_batch_size(batch_size=gbs)``; TQ path skips that call site). * ``dp_dispatch`` looks up ``self._dp_post_<method_name>`` after ``aggregate``. If defined, calls ``post(aggregated, raw_results, shards=shards)`` and uses its return value. Convention-based — opt-in per Policy method, no decorator boilerplate. * ``Policy._dp_post_train`` recovers FLOPs from ``meta.sequence_lengths`` on each shard (driver-pre-balanced for ``list[KVBatchMeta]``, sharder-strided for single ``KVBatchMeta``), records ``total_flops``, ``num_ranks``, ``theoretical_tflops`` — same fields the legacy body produces. Backward-compat: existing tests in tests/data_plane/unit/test_shard_parity.py and test_dispatch.py don't check ``extra_info`` shape on sharder output or assert on aggregate-method return type other than what's already returned, so the additive fields and gated reorder are transparent. The legacy ``policy.train(BatchedDataDict)`` path is unchanged — it keeps building results inline and never enters the new hook. Async-on-TQ (PR 4) and grpo_sync (PR 0) both use the ``list[KVBatchMeta]`` path, so they inherit the FLOPs fix automatically via the post-hook. The reorder fix is only meaningful for callers that pass single ``KVBatchMeta`` — primarily future logprob/reference- logprob TQ wiring; flagged in commit message of #3 above. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
ZhiyuLi-Nvidia
added a commit
that referenced
this pull request
May 9, 2026
…patch TQ path Closes Issues #3 and #4 raised in PR review of the data-plane stack. Issue #3 — single-``KVBatchMeta`` path returned rows in scrambled order. ``shard_keys_by_seqlen`` sorts by sequence length and strides (``order[r::dp_world_size]``) to balance per-rank token totals. The worker logprob aggregators (``_aggregate_logprob_results``) then concatenate per-rank outputs in rank order via ``BatchedDataDict.from_batches`` — without inverting the seqlen- strided permutation. Result: ``policy.get_logprobs(KVBatchMeta(...))`` returned rows in [order[0], order[d], order[2d], …, order[1], order[1+d], …] order, not the caller's ``meta.keys`` order. Silent correctness bug (test_seqpack_legacy_equals_tq didn't catch it because the sync path calls ``policy.get_logprobs(BatchedDataDict)`` — legacy passthrough, no sharder). Fix: * ``shard_keys_by_seqlen`` records ``_dp_original_indices`` per shard in ``extra_info`` (the ``idx`` list it computed). * ``dp_dispatch`` reconstructs the concat-position → input-index permutation from the shards' ``extra_info``, then applies the inverse via ``BatchedDataDict.reorder_data`` after ``aggregate``. * The reorder is gated on ``is_meta and not is_meta_list`` — for ``list[KVBatchMeta]`` the driver controls ordering (PR 0 ``fan_out_per_rank_metas``) and the decorator must not undo it. * Skipped silently if the result isn't a BatchedDataDict (e.g. ``train`` returns a plain dict — order doesn't apply). Issue #4 — TQ path silently dropped legacy training semantics. The decorator's TQ branch returns ``aggregate(results)`` directly and never enters ``Policy.train``'s body — so the FLOPs accumulation at lm_policy.py around the ``flops_tracker`` block, plus the ``num_ranks`` and ``theoretical_tflops`` fields, were missing from results when the trainer called ``policy.train(KVBatchMeta)`` or ``policy.train(list[KVBatchMeta])``. Same gap for the missing GBS / DP divisibility assertion. Fix (additive — no signature changes to the existing aggregate callables): * ``dp_dispatch`` adds a basic divisibility assertion on the TQ path: ``total_meta_size % dp_size == 0`` (legacy path enforces this via ``shard_by_batch_size(batch_size=gbs)``; TQ path skips that call site). * ``dp_dispatch`` looks up ``self._dp_post_<method_name>`` after ``aggregate``. If defined, calls ``post(aggregated, raw_results, shards=shards)`` and uses its return value. Convention-based — opt-in per Policy method, no decorator boilerplate. * ``Policy._dp_post_train`` recovers FLOPs from ``meta.sequence_lengths`` on each shard (driver-pre-balanced for ``list[KVBatchMeta]``, sharder-strided for single ``KVBatchMeta``), records ``total_flops``, ``num_ranks``, ``theoretical_tflops`` — same fields the legacy body produces. Backward-compat: existing tests in tests/data_plane/unit/test_shard_parity.py and test_dispatch.py don't check ``extra_info`` shape on sharder output or assert on aggregate-method return type other than what's already returned, so the additive fields and gated reorder are transparent. The legacy ``policy.train(BatchedDataDict)`` path is unchanged — it keeps building results inline and never enters the new hook. Async-on-TQ (PR 4) and grpo_sync (PR 0) both use the ``list[KVBatchMeta]`` path, so they inherit the FLOPs fix automatically via the post-hook. The reorder fix is only meaningful for callers that pass single ``KVBatchMeta`` — primarily future logprob/reference- logprob TQ wiring; flagged in commit message of #3 above. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
ZhiyuLi-Nvidia
added a commit
that referenced
this pull request
May 9, 2026
…patch TQ path Closes Issues #3 and #4 raised in PR review of the data-plane stack. Issue #3 — single-``KVBatchMeta`` path returned rows in scrambled order. ``shard_keys_by_seqlen`` sorts by sequence length and strides (``order[r::dp_world_size]``) to balance per-rank token totals. The worker logprob aggregators (``_aggregate_logprob_results``) then concatenate per-rank outputs in rank order via ``BatchedDataDict.from_batches`` — without inverting the seqlen- strided permutation. Result: ``policy.get_logprobs(KVBatchMeta(...))`` returned rows in [order[0], order[d], order[2d], …, order[1], order[1+d], …] order, not the caller's ``meta.keys`` order. Silent correctness bug (test_seqpack_legacy_equals_tq didn't catch it because the sync path calls ``policy.get_logprobs(BatchedDataDict)`` — legacy passthrough, no sharder). Fix: * ``shard_keys_by_seqlen`` records ``_dp_original_indices`` per shard in ``extra_info`` (the ``idx`` list it computed). * ``dp_dispatch`` reconstructs the concat-position → input-index permutation from the shards' ``extra_info``, then applies the inverse via ``BatchedDataDict.reorder_data`` after ``aggregate``. * The reorder is gated on ``is_meta and not is_meta_list`` — for ``list[KVBatchMeta]`` the driver controls ordering (PR 0 ``fan_out_per_rank_metas``) and the decorator must not undo it. * Skipped silently if the result isn't a BatchedDataDict (e.g. ``train`` returns a plain dict — order doesn't apply). Issue #4 — TQ path silently dropped legacy training semantics. The decorator's TQ branch returns ``aggregate(results)`` directly and never enters ``Policy.train``'s body — so the FLOPs accumulation at lm_policy.py around the ``flops_tracker`` block, plus the ``num_ranks`` and ``theoretical_tflops`` fields, were missing from results when the trainer called ``policy.train(KVBatchMeta)`` or ``policy.train(list[KVBatchMeta])``. Same gap for the missing GBS / DP divisibility assertion. Fix (additive — no signature changes to the existing aggregate callables): * ``dp_dispatch`` adds a basic divisibility assertion on the TQ path: ``total_meta_size % dp_size == 0`` (legacy path enforces this via ``shard_by_batch_size(batch_size=gbs)``; TQ path skips that call site). * ``dp_dispatch`` looks up ``self._dp_post_<method_name>`` after ``aggregate``. If defined, calls ``post(aggregated, raw_results, shards=shards)`` and uses its return value. Convention-based — opt-in per Policy method, no decorator boilerplate. * ``Policy._dp_post_train`` recovers FLOPs from ``meta.sequence_lengths`` on each shard (driver-pre-balanced for ``list[KVBatchMeta]``, sharder-strided for single ``KVBatchMeta``), records ``total_flops``, ``num_ranks``, ``theoretical_tflops`` — same fields the legacy body produces. Backward-compat: existing tests in tests/data_plane/unit/test_shard_parity.py and test_dispatch.py don't check ``extra_info`` shape on sharder output or assert on aggregate-method return type other than what's already returned, so the additive fields and gated reorder are transparent. The legacy ``policy.train(BatchedDataDict)`` path is unchanged — it keeps building results inline and never enters the new hook. Async-on-TQ (PR 4) and grpo_sync (PR 0) both use the ``list[KVBatchMeta]`` path, so they inherit the FLOPs fix automatically via the post-hook. The reorder fix is only meaningful for callers that pass single ``KVBatchMeta`` — primarily future logprob/reference- logprob TQ wiring; flagged in commit message of #3 above. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
ZhiyuLi-Nvidia
added a commit
that referenced
this pull request
May 14, 2026
…patch TQ path Closes Issues #3 and #4 raised in PR review of the data-plane stack. Issue #3 — single-``KVBatchMeta`` path returned rows in scrambled order. ``shard_keys_by_seqlen`` sorts by sequence length and strides (``order[r::dp_world_size]``) to balance per-rank token totals. The worker logprob aggregators (``_aggregate_logprob_results``) then concatenate per-rank outputs in rank order via ``BatchedDataDict.from_batches`` — without inverting the seqlen- strided permutation. Result: ``policy.get_logprobs(KVBatchMeta(...))`` returned rows in [order[0], order[d], order[2d], …, order[1], order[1+d], …] order, not the caller's ``meta.keys`` order. Silent correctness bug (test_seqpack_legacy_equals_tq didn't catch it because the sync path calls ``policy.get_logprobs(BatchedDataDict)`` — legacy passthrough, no sharder). Fix: * ``shard_keys_by_seqlen`` records ``_dp_original_indices`` per shard in ``extra_info`` (the ``idx`` list it computed). * ``dp_dispatch`` reconstructs the concat-position → input-index permutation from the shards' ``extra_info``, then applies the inverse via ``BatchedDataDict.reorder_data`` after ``aggregate``. * The reorder is gated on ``is_meta and not is_meta_list`` — for ``list[KVBatchMeta]`` the driver controls ordering (PR 0 ``fan_out_per_rank_metas``) and the decorator must not undo it. * Skipped silently if the result isn't a BatchedDataDict (e.g. ``train`` returns a plain dict — order doesn't apply). Issue #4 — TQ path silently dropped legacy training semantics. The decorator's TQ branch returns ``aggregate(results)`` directly and never enters ``Policy.train``'s body — so the FLOPs accumulation at lm_policy.py around the ``flops_tracker`` block, plus the ``num_ranks`` and ``theoretical_tflops`` fields, were missing from results when the trainer called ``policy.train(KVBatchMeta)`` or ``policy.train(list[KVBatchMeta])``. Same gap for the missing GBS / DP divisibility assertion. Fix (additive — no signature changes to the existing aggregate callables): * ``dp_dispatch`` adds a basic divisibility assertion on the TQ path: ``total_meta_size % dp_size == 0`` (legacy path enforces this via ``shard_by_batch_size(batch_size=gbs)``; TQ path skips that call site). * ``dp_dispatch`` looks up ``self._dp_post_<method_name>`` after ``aggregate``. If defined, calls ``post(aggregated, raw_results, shards=shards)`` and uses its return value. Convention-based — opt-in per Policy method, no decorator boilerplate. * ``Policy._dp_post_train`` recovers FLOPs from ``meta.sequence_lengths`` on each shard (driver-pre-balanced for ``list[KVBatchMeta]``, sharder-strided for single ``KVBatchMeta``), records ``total_flops``, ``num_ranks``, ``theoretical_tflops`` — same fields the legacy body produces. Backward-compat: existing tests in tests/data_plane/unit/test_shard_parity.py and test_dispatch.py don't check ``extra_info`` shape on sharder output or assert on aggregate-method return type other than what's already returned, so the additive fields and gated reorder are transparent. The legacy ``policy.train(BatchedDataDict)`` path is unchanged — it keeps building results inline and never enters the new hook. Async-on-TQ (PR 4) and grpo_sync (PR 0) both use the ``list[KVBatchMeta]`` path, so they inherit the FLOPs fix automatically via the post-hook. The reorder fix is only meaningful for callers that pass single ``KVBatchMeta`` — primarily future logprob/reference- logprob TQ wiring; flagged in commit message of #3 above. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
ZhiyuLi-Nvidia
added a commit
that referenced
this pull request
May 15, 2026
* grpo_sync.py: remove unused batch_cache = None (leftover from grpo.py-style dynamic sampling; grpo_sync threads survivors through pending_meta / pending_slice). * TQPolicy: rename _dp_client -> dp_client and _tq_partition_id -> tq_partition_id. They are read from grpo_sync.py in 7 places, so the underscore prefix was misleading. Constructor kwarg tq_partition_id already matched the new attribute name. * Update README + data_plane_api_lifecycle docs example snippets. Per yuki-97 PR review (#3, #4). Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
ZhiyuLi-Nvidia
added a commit
that referenced
this pull request
May 19, 2026
…patch TQ path Closes Issues #3 and #4 raised in PR review of the data-plane stack. Issue #3 — single-``KVBatchMeta`` path returned rows in scrambled order. ``shard_keys_by_seqlen`` sorts by sequence length and strides (``order[r::dp_world_size]``) to balance per-rank token totals. The worker logprob aggregators (``_aggregate_logprob_results``) then concatenate per-rank outputs in rank order via ``BatchedDataDict.from_batches`` — without inverting the seqlen- strided permutation. Result: ``policy.get_logprobs(KVBatchMeta(...))`` returned rows in [order[0], order[d], order[2d], …, order[1], order[1+d], …] order, not the caller's ``meta.keys`` order. Silent correctness bug (test_seqpack_legacy_equals_tq didn't catch it because the sync path calls ``policy.get_logprobs(BatchedDataDict)`` — legacy passthrough, no sharder). Fix: * ``shard_keys_by_seqlen`` records ``_dp_original_indices`` per shard in ``extra_info`` (the ``idx`` list it computed). * ``dp_dispatch`` reconstructs the concat-position → input-index permutation from the shards' ``extra_info``, then applies the inverse via ``BatchedDataDict.reorder_data`` after ``aggregate``. * The reorder is gated on ``is_meta and not is_meta_list`` — for ``list[KVBatchMeta]`` the driver controls ordering (PR 0 ``fan_out_per_rank_metas``) and the decorator must not undo it. * Skipped silently if the result isn't a BatchedDataDict (e.g. ``train`` returns a plain dict — order doesn't apply). Issue #4 — TQ path silently dropped legacy training semantics. The decorator's TQ branch returns ``aggregate(results)`` directly and never enters ``Policy.train``'s body — so the FLOPs accumulation at lm_policy.py around the ``flops_tracker`` block, plus the ``num_ranks`` and ``theoretical_tflops`` fields, were missing from results when the trainer called ``policy.train(KVBatchMeta)`` or ``policy.train(list[KVBatchMeta])``. Same gap for the missing GBS / DP divisibility assertion. Fix (additive — no signature changes to the existing aggregate callables): * ``dp_dispatch`` adds a basic divisibility assertion on the TQ path: ``total_meta_size % dp_size == 0`` (legacy path enforces this via ``shard_by_batch_size(batch_size=gbs)``; TQ path skips that call site). * ``dp_dispatch`` looks up ``self._dp_post_<method_name>`` after ``aggregate``. If defined, calls ``post(aggregated, raw_results, shards=shards)`` and uses its return value. Convention-based — opt-in per Policy method, no decorator boilerplate. * ``Policy._dp_post_train`` recovers FLOPs from ``meta.sequence_lengths`` on each shard (driver-pre-balanced for ``list[KVBatchMeta]``, sharder-strided for single ``KVBatchMeta``), records ``total_flops``, ``num_ranks``, ``theoretical_tflops`` — same fields the legacy body produces. Backward-compat: existing tests in tests/data_plane/unit/test_shard_parity.py and test_dispatch.py don't check ``extra_info`` shape on sharder output or assert on aggregate-method return type other than what's already returned, so the additive fields and gated reorder are transparent. The legacy ``policy.train(BatchedDataDict)`` path is unchanged — it keeps building results inline and never enters the new hook. Async-on-TQ (PR 4) and grpo_sync (PR 0) both use the ``list[KVBatchMeta]`` path, so they inherit the FLOPs fix automatically via the post-hook. The reorder fix is only meaningful for callers that pass single ``KVBatchMeta`` — primarily future logprob/reference- logprob TQ wiring; flagged in commit message of #3 above. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
ZhiyuLi-Nvidia
added a commit
that referenced
this pull request
May 19, 2026
* grpo_sync.py: remove unused batch_cache = None (leftover from grpo.py-style dynamic sampling; grpo_sync threads survivors through pending_meta / pending_slice). * TQPolicy: rename _dp_client -> dp_client and _tq_partition_id -> tq_partition_id. They are read from grpo_sync.py in 7 places, so the underscore prefix was misleading. Constructor kwarg tq_partition_id already matched the new attribute name. * Update README + data_plane_api_lifecycle docs example snippets. Per yuki-97 PR review (#3, #4). Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
ZhiyuLi-Nvidia
added a commit
that referenced
this pull request
May 19, 2026
…patch TQ path Closes Issues #3 and #4 raised in PR review of the data-plane stack. Issue #3 — single-``KVBatchMeta`` path returned rows in scrambled order. ``shard_keys_by_seqlen`` sorts by sequence length and strides (``order[r::dp_world_size]``) to balance per-rank token totals. The worker logprob aggregators (``_aggregate_logprob_results``) then concatenate per-rank outputs in rank order via ``BatchedDataDict.from_batches`` — without inverting the seqlen- strided permutation. Result: ``policy.get_logprobs(KVBatchMeta(...))`` returned rows in [order[0], order[d], order[2d], …, order[1], order[1+d], …] order, not the caller's ``meta.keys`` order. Silent correctness bug (test_seqpack_legacy_equals_tq didn't catch it because the sync path calls ``policy.get_logprobs(BatchedDataDict)`` — legacy passthrough, no sharder). Fix: * ``shard_keys_by_seqlen`` records ``_dp_original_indices`` per shard in ``extra_info`` (the ``idx`` list it computed). * ``dp_dispatch`` reconstructs the concat-position → input-index permutation from the shards' ``extra_info``, then applies the inverse via ``BatchedDataDict.reorder_data`` after ``aggregate``. * The reorder is gated on ``is_meta and not is_meta_list`` — for ``list[KVBatchMeta]`` the driver controls ordering (PR 0 ``fan_out_per_rank_metas``) and the decorator must not undo it. * Skipped silently if the result isn't a BatchedDataDict (e.g. ``train`` returns a plain dict — order doesn't apply). Issue #4 — TQ path silently dropped legacy training semantics. The decorator's TQ branch returns ``aggregate(results)`` directly and never enters ``Policy.train``'s body — so the FLOPs accumulation at lm_policy.py around the ``flops_tracker`` block, plus the ``num_ranks`` and ``theoretical_tflops`` fields, were missing from results when the trainer called ``policy.train(KVBatchMeta)`` or ``policy.train(list[KVBatchMeta])``. Same gap for the missing GBS / DP divisibility assertion. Fix (additive — no signature changes to the existing aggregate callables): * ``dp_dispatch`` adds a basic divisibility assertion on the TQ path: ``total_meta_size % dp_size == 0`` (legacy path enforces this via ``shard_by_batch_size(batch_size=gbs)``; TQ path skips that call site). * ``dp_dispatch`` looks up ``self._dp_post_<method_name>`` after ``aggregate``. If defined, calls ``post(aggregated, raw_results, shards=shards)`` and uses its return value. Convention-based — opt-in per Policy method, no decorator boilerplate. * ``Policy._dp_post_train`` recovers FLOPs from ``meta.sequence_lengths`` on each shard (driver-pre-balanced for ``list[KVBatchMeta]``, sharder-strided for single ``KVBatchMeta``), records ``total_flops``, ``num_ranks``, ``theoretical_tflops`` — same fields the legacy body produces. Backward-compat: existing tests in tests/data_plane/unit/test_shard_parity.py and test_dispatch.py don't check ``extra_info`` shape on sharder output or assert on aggregate-method return type other than what's already returned, so the additive fields and gated reorder are transparent. The legacy ``policy.train(BatchedDataDict)`` path is unchanged — it keeps building results inline and never enters the new hook. Async-on-TQ (PR 4) and grpo_sync (PR 0) both use the ``list[KVBatchMeta]`` path, so they inherit the FLOPs fix automatically via the post-hook. The reorder fix is only meaningful for callers that pass single ``KVBatchMeta`` — primarily future logprob/reference- logprob TQ wiring; flagged in commit message of #3 above. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
ZhiyuLi-Nvidia
added a commit
that referenced
this pull request
May 19, 2026
* grpo_sync.py: remove unused batch_cache = None (leftover from grpo.py-style dynamic sampling; grpo_sync threads survivors through pending_meta / pending_slice). * TQPolicy: rename _dp_client -> dp_client and _tq_partition_id -> tq_partition_id. They are read from grpo_sync.py in 7 places, so the underscore prefix was misleading. Constructor kwarg tq_partition_id already matched the new attribute name. * Update README + data_plane_api_lifecycle docs example snippets. Per yuki-97 PR review (#3, #4). Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
ZhiyuLi-Nvidia
added a commit
that referenced
this pull request
May 22, 2026
…patch TQ path Closes Issues #3 and #4 raised in PR review of the data-plane stack. Issue #3 — single-``KVBatchMeta`` path returned rows in scrambled order. ``shard_keys_by_seqlen`` sorts by sequence length and strides (``order[r::dp_world_size]``) to balance per-rank token totals. The worker logprob aggregators (``_aggregate_logprob_results``) then concatenate per-rank outputs in rank order via ``BatchedDataDict.from_batches`` — without inverting the seqlen- strided permutation. Result: ``policy.get_logprobs(KVBatchMeta(...))`` returned rows in [order[0], order[d], order[2d], …, order[1], order[1+d], …] order, not the caller's ``meta.keys`` order. Silent correctness bug (test_seqpack_legacy_equals_tq didn't catch it because the sync path calls ``policy.get_logprobs(BatchedDataDict)`` — legacy passthrough, no sharder). Fix: * ``shard_keys_by_seqlen`` records ``_dp_original_indices`` per shard in ``extra_info`` (the ``idx`` list it computed). * ``dp_dispatch`` reconstructs the concat-position → input-index permutation from the shards' ``extra_info``, then applies the inverse via ``BatchedDataDict.reorder_data`` after ``aggregate``. * The reorder is gated on ``is_meta and not is_meta_list`` — for ``list[KVBatchMeta]`` the driver controls ordering (PR 0 ``fan_out_per_rank_metas``) and the decorator must not undo it. * Skipped silently if the result isn't a BatchedDataDict (e.g. ``train`` returns a plain dict — order doesn't apply). Issue #4 — TQ path silently dropped legacy training semantics. The decorator's TQ branch returns ``aggregate(results)`` directly and never enters ``Policy.train``'s body — so the FLOPs accumulation at lm_policy.py around the ``flops_tracker`` block, plus the ``num_ranks`` and ``theoretical_tflops`` fields, were missing from results when the trainer called ``policy.train(KVBatchMeta)`` or ``policy.train(list[KVBatchMeta])``. Same gap for the missing GBS / DP divisibility assertion. Fix (additive — no signature changes to the existing aggregate callables): * ``dp_dispatch`` adds a basic divisibility assertion on the TQ path: ``total_meta_size % dp_size == 0`` (legacy path enforces this via ``shard_by_batch_size(batch_size=gbs)``; TQ path skips that call site). * ``dp_dispatch`` looks up ``self._dp_post_<method_name>`` after ``aggregate``. If defined, calls ``post(aggregated, raw_results, shards=shards)`` and uses its return value. Convention-based — opt-in per Policy method, no decorator boilerplate. * ``Policy._dp_post_train`` recovers FLOPs from ``meta.sequence_lengths`` on each shard (driver-pre-balanced for ``list[KVBatchMeta]``, sharder-strided for single ``KVBatchMeta``), records ``total_flops``, ``num_ranks``, ``theoretical_tflops`` — same fields the legacy body produces. Backward-compat: existing tests in tests/data_plane/unit/test_shard_parity.py and test_dispatch.py don't check ``extra_info`` shape on sharder output or assert on aggregate-method return type other than what's already returned, so the additive fields and gated reorder are transparent. The legacy ``policy.train(BatchedDataDict)`` path is unchanged — it keeps building results inline and never enters the new hook. Async-on-TQ (PR 4) and grpo_sync (PR 0) both use the ``list[KVBatchMeta]`` path, so they inherit the FLOPs fix automatically via the post-hook. The reorder fix is only meaningful for callers that pass single ``KVBatchMeta`` — primarily future logprob/reference- logprob TQ wiring; flagged in commit message of #3 above. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
ZhiyuLi-Nvidia
added a commit
that referenced
this pull request
May 22, 2026
* grpo_sync.py: remove unused batch_cache = None (leftover from grpo.py-style dynamic sampling; grpo_sync threads survivors through pending_meta / pending_slice). * TQPolicy: rename _dp_client -> dp_client and _tq_partition_id -> tq_partition_id. They are read from grpo_sync.py in 7 places, so the underscore prefix was misleading. Constructor kwarg tq_partition_id already matched the new attribute name. * Update README + data_plane_api_lifecycle docs example snippets. Per yuki-97 PR review (#3, #4). Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
ZhiyuLi-Nvidia
added a commit
that referenced
this pull request
May 23, 2026
…patch TQ path Closes Issues #3 and #4 raised in PR review of the data-plane stack. Issue #3 — single-``KVBatchMeta`` path returned rows in scrambled order. ``shard_keys_by_seqlen`` sorts by sequence length and strides (``order[r::dp_world_size]``) to balance per-rank token totals. The worker logprob aggregators (``_aggregate_logprob_results``) then concatenate per-rank outputs in rank order via ``BatchedDataDict.from_batches`` — without inverting the seqlen- strided permutation. Result: ``policy.get_logprobs(KVBatchMeta(...))`` returned rows in [order[0], order[d], order[2d], …, order[1], order[1+d], …] order, not the caller's ``meta.keys`` order. Silent correctness bug (test_seqpack_legacy_equals_tq didn't catch it because the sync path calls ``policy.get_logprobs(BatchedDataDict)`` — legacy passthrough, no sharder). Fix: * ``shard_keys_by_seqlen`` records ``_dp_original_indices`` per shard in ``extra_info`` (the ``idx`` list it computed). * ``dp_dispatch`` reconstructs the concat-position → input-index permutation from the shards' ``extra_info``, then applies the inverse via ``BatchedDataDict.reorder_data`` after ``aggregate``. * The reorder is gated on ``is_meta and not is_meta_list`` — for ``list[KVBatchMeta]`` the driver controls ordering (PR 0 ``fan_out_per_rank_metas``) and the decorator must not undo it. * Skipped silently if the result isn't a BatchedDataDict (e.g. ``train`` returns a plain dict — order doesn't apply). Issue #4 — TQ path silently dropped legacy training semantics. The decorator's TQ branch returns ``aggregate(results)`` directly and never enters ``Policy.train``'s body — so the FLOPs accumulation at lm_policy.py around the ``flops_tracker`` block, plus the ``num_ranks`` and ``theoretical_tflops`` fields, were missing from results when the trainer called ``policy.train(KVBatchMeta)`` or ``policy.train(list[KVBatchMeta])``. Same gap for the missing GBS / DP divisibility assertion. Fix (additive — no signature changes to the existing aggregate callables): * ``dp_dispatch`` adds a basic divisibility assertion on the TQ path: ``total_meta_size % dp_size == 0`` (legacy path enforces this via ``shard_by_batch_size(batch_size=gbs)``; TQ path skips that call site). * ``dp_dispatch`` looks up ``self._dp_post_<method_name>`` after ``aggregate``. If defined, calls ``post(aggregated, raw_results, shards=shards)`` and uses its return value. Convention-based — opt-in per Policy method, no decorator boilerplate. * ``Policy._dp_post_train`` recovers FLOPs from ``meta.sequence_lengths`` on each shard (driver-pre-balanced for ``list[KVBatchMeta]``, sharder-strided for single ``KVBatchMeta``), records ``total_flops``, ``num_ranks``, ``theoretical_tflops`` — same fields the legacy body produces. Backward-compat: existing tests in tests/data_plane/unit/test_shard_parity.py and test_dispatch.py don't check ``extra_info`` shape on sharder output or assert on aggregate-method return type other than what's already returned, so the additive fields and gated reorder are transparent. The legacy ``policy.train(BatchedDataDict)`` path is unchanged — it keeps building results inline and never enters the new hook. Async-on-TQ (PR 4) and grpo_sync (PR 0) both use the ``list[KVBatchMeta]`` path, so they inherit the FLOPs fix automatically via the post-hook. The reorder fix is only meaningful for callers that pass single ``KVBatchMeta`` — primarily future logprob/reference- logprob TQ wiring; flagged in commit message of #3 above. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com> Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
ZhiyuLi-Nvidia
added a commit
that referenced
this pull request
May 23, 2026
* grpo_sync.py: remove unused batch_cache = None (leftover from grpo.py-style dynamic sampling; grpo_sync threads survivors through pending_meta / pending_slice). * TQPolicy: rename _dp_client -> dp_client and _tq_partition_id -> tq_partition_id. They are read from grpo_sync.py in 7 places, so the underscore prefix was misleading. Constructor kwarg tq_partition_id already matched the new attribute name. * Update README + data_plane_api_lifecycle docs example snippets. Per yuki-97 PR review (#3, #4). Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
copy-pr-bot Bot
pushed a commit
that referenced
this pull request
Jun 7, 2026
Comments addressed: #3, #5, #7, #8, #9, #10, #11. - Rename _load_M -> _get_sparse_projection_matrix and _load_dense_projection -> _get_topk_projection (later removed in favor of module-level cache helpers below). - Drop unused alignment_student_spans / alignment_teacher_spans from the cross-tokenizer batch payload. - Remove NRL_XTOKEN_LOSS_DUMP_DIR debug-dump side effect. - Move Fp32SparseMM, chunk_average_log_probs, valid_chunk_mask to a new shared module nemo_rl/algorithms/x_token/utils.py. - Extract projection-file parsing into utils.parse_projection_file; tokenalign.py and loss_functions.py both go through it. - Move per-instance projection-matrix caches to process-local caches in utils.get_sparse_projection_matrix / get_topk_projection. The driver no longer holds large CUDA tensors; each Ray worker fills its own cache on first loss call. Signed-off-by: Adithya Hanasoge <avenkateshha@nvidia.com>
copy-pr-bot Bot
pushed a commit
that referenced
this pull request
Jun 7, 2026
PR #2508 review (@RayenTian): - #2: Fold data["sample_mask"] into the gold-loss path's valid-chunk mask (chunk_mask & sample_mask.bool().unsqueeze(-1)) so samples with loss_multiplier=0 stop contributing to KL-on-common, L1-on-uncommon, top-1 accuracy, and the returned valid-count. Mirrors the P-KL path. - #3: Size both projection-matrix axes from the configured tokenizer vocabs (student + teacher), not max(observed_idx) + 1. CrossTokenizerDistillationLossConfig declares student_vocab_size and teacher_vocab_size; xtoken_distillation.setup() injects both at runtime from len(student_tokenizer) / len(teacher_tokenizer). get_sparse_projection_matrix now takes both as keyword-only args and clamps V_s / V_t up against the projection's observed maxima as a defensive fallback. Same-magnitude-int positional swap is guarded by the keyword-only signature. Signed-off-by: Adithya Hanasoge <avenkateshha@nvidia.com>
copy-pr-bot Bot
pushed a commit
that referenced
this pull request
Jun 7, 2026
Comments addressed: #3, #5, #7, #8, #9, #10, #11. - Rename _load_M -> _get_sparse_projection_matrix and _load_dense_projection -> _get_topk_projection (later removed in favor of module-level cache helpers below). - Drop unused alignment_student_spans / alignment_teacher_spans from the cross-tokenizer batch payload. - Remove NRL_XTOKEN_LOSS_DUMP_DIR debug-dump side effect. - Move Fp32SparseMM, chunk_average_log_probs, valid_chunk_mask to a new shared module nemo_rl/algorithms/x_token/utils.py. - Extract projection-file parsing into utils.parse_projection_file; tokenalign.py and loss_functions.py both go through it. - Move per-instance projection-matrix caches to process-local caches in utils.get_sparse_projection_matrix / get_topk_projection. The driver no longer holds large CUDA tensors; each Ray worker fills its own cache on first loss call. Signed-off-by: Adithya Hanasoge <avenkateshha@nvidia.com>
copy-pr-bot Bot
pushed a commit
that referenced
this pull request
Jun 7, 2026
PR #2508 review (@RayenTian): - #2: Fold data["sample_mask"] into the gold-loss path's valid-chunk mask (chunk_mask & sample_mask.bool().unsqueeze(-1)) so samples with loss_multiplier=0 stop contributing to KL-on-common, L1-on-uncommon, top-1 accuracy, and the returned valid-count. Mirrors the P-KL path. - #3: Size both projection-matrix axes from the configured tokenizer vocabs (student + teacher), not max(observed_idx) + 1. CrossTokenizerDistillationLossConfig declares student_vocab_size and teacher_vocab_size; xtoken_distillation.setup() injects both at runtime from len(student_tokenizer) / len(teacher_tokenizer). get_sparse_projection_matrix now takes both as keyword-only args and clamps V_s / V_t up against the projection's observed maxima as a defensive fallback. Same-magnitude-int positional swap is guarded by the keyword-only signature. Signed-off-by: Adithya Hanasoge <avenkateshha@nvidia.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What does this PR do ?
Add a one line overview of what this PR aims to accomplish.
Changelog
Usage
# Add a code snippet demonstrating how to use thisBefore your PR is "Ready for review"
Pre checks:
Checklist when contributing
Additional Information