Skip to content

feat: reinforcer initial commit#3

Merged
terrykong merged 3 commits into
mainfrom
initial-commit-reparented
Mar 20, 2025
Merged

feat: reinforcer initial commit#3
terrykong merged 3 commits into
mainfrom
initial-commit-reparented

Conversation

@terrykong

Copy link
Copy Markdown
Collaborator

What does this PR do ?

Add a one line overview of what this PR aims to accomplish.

Changelog

  • Please update the CHANGELOG.md under next version with high level changes in this PR.

Usage

  • You can potentially add a usage example below
# Add a code snippet demonstrating how to use this 

Before your PR is "Ready for review"

Pre checks:

Checklist when contributing

  • TBD

Additional Information

  • Related to # (issue)

Co-authored-by: Sahil Jain <sahil.jain5125@gmail.com>
Co-authored-by: Parth Chadha <parth29@gmail.com>
Co-authored-by: Terry Kong <terryk@nvidia.com>
Co-authored-by: Anna Shors <ashors@nvidia.com>
Co-authored-by: Gerald Shen <geshen@nvidia.com>
Co-authored-by: Yuki Huang <yukih@nvidia.com>
Co-authored-by: Hemil Desai <hemild@nvidia.com>
Co-authored-by: Yi-Fu Wu <yifuw@nvidia.com>
Co-authored-by: ahmadki <ahmadki@users.noreply.github.com>
Co-authored-by: Nathan McKimpson <nmckimpson@nvidia.com>
Co-authored-by: Charlie Truong <chtruong@nvidia.com>
Signed-off-by: Terry Kong <terryk@nvidia.com>
@github-actions github-actions Bot added the Documentation Improvements or additions to documentation label Mar 20, 2025
Signed-off-by: Terry Kong <terryk@nvidia.com>
Comment thread .gitlab-ci.yml Outdated
Signed-off-by: Terry Kong <terryk@nvidia.com>
@SahilJain314 SahilJain314 self-requested a review March 20, 2025 22:36
@terrykong terrykong merged commit b1ba714 into main Mar 20, 2025
@terrykong terrykong deleted the initial-commit-reparented branch March 20, 2025 22:40
KiddoZhu pushed a commit that referenced this pull request May 6, 2025
Co-authored-by: Sahil Jain <sahil.jain5125@gmail.com>
Co-authored-by: Parth Chadha <parth29@gmail.com>
Co-authored-by: Anna Shors <ashors@nvidia.com>
Co-authored-by: Gerald Shen <geshen@nvidia.com>
Co-authored-by: Yuki Huang <yukih@nvidia.com>
Co-authored-by: Hemil Desai <hemild@nvidia.com>
Co-authored-by: Yi-Fu Wu <yifuw@nvidia.com>
Co-authored-by: ahmadki <ahmadki@users.noreply.github.com>
Co-authored-by: Nathan McKimpson <nmckimpson@nvidia.com>
Co-authored-by: Charlie Truong <chtruong@nvidia.com>
Signed-off-by: Terry Kong <terryk@nvidia.com>
copy-pr-bot Bot pushed a commit that referenced this pull request Apr 2, 2026
* add Qwen3.5-35B megatron nightly tests (LLM + VLM)

* set PYTORCH_CUDA_ALLOC_CONF expandable_segments to True to reduce OOM

* change test reward threshold

---------

Signed-off-by: alexchiu <qiuzhaopeng@foxmail.com>
Co-authored-by: alexchiu <qiuzhaopeng@foxmail.com>

Signed-off-by: jQizhang <larkz@nvidia.com>
copy-pr-bot Bot pushed a commit that referenced this pull request Apr 7, 2026
* add Qwen3.5-35B megatron nightly tests (LLM + VLM)

* set PYTORCH_CUDA_ALLOC_CONF expandable_segments to True to reduce OOM

* change test reward threshold

---------

Signed-off-by: alexchiu <qiuzhaopeng@foxmail.com>
Co-authored-by: alexchiu <qiuzhaopeng@foxmail.com>

Signed-off-by: jQizhang <larkz@nvidia.com>
ZhiyuLi-Nvidia added a commit that referenced this pull request May 5, 2026
…patch TQ path

Closes Issues #3 and #4 raised in PR review of the data-plane stack.

Issue #3 — single-``KVBatchMeta`` path returned rows in scrambled order.
``shard_keys_by_seqlen`` sorts by sequence length and strides
(``order[r::dp_world_size]``) to balance per-rank token totals. The
worker logprob aggregators (``_aggregate_logprob_results``) then
concatenate per-rank outputs in rank order via
``BatchedDataDict.from_batches`` — without inverting the seqlen-
strided permutation. Result: ``policy.get_logprobs(KVBatchMeta(...))``
returned rows in
[order[0], order[d], order[2d], …, order[1], order[1+d], …]
order, not the caller's ``meta.keys`` order. Silent correctness bug
(test_seqpack_legacy_equals_tq didn't catch it because the sync path
calls ``policy.get_logprobs(BatchedDataDict)`` — legacy passthrough,
no sharder).

Fix:
  * ``shard_keys_by_seqlen`` records ``_dp_original_indices`` per
    shard in ``extra_info`` (the ``idx`` list it computed).
  * ``dp_dispatch`` reconstructs the concat-position → input-index
    permutation from the shards' ``extra_info``, then applies the
    inverse via ``BatchedDataDict.reorder_data`` after ``aggregate``.
  * The reorder is gated on ``is_meta and not is_meta_list`` — for
    ``list[KVBatchMeta]`` the driver controls ordering (PR 0
    ``fan_out_per_rank_metas``) and the decorator must not undo it.
  * Skipped silently if the result isn't a BatchedDataDict (e.g.
    ``train`` returns a plain dict — order doesn't apply).

Issue #4 — TQ path silently dropped legacy training semantics.
The decorator's TQ branch returns ``aggregate(results)`` directly
and never enters ``Policy.train``'s body — so the FLOPs accumulation
at lm_policy.py around the ``flops_tracker`` block, plus the
``num_ranks`` and ``theoretical_tflops`` fields, were missing from
results when the trainer called ``policy.train(KVBatchMeta)`` or
``policy.train(list[KVBatchMeta])``. Same gap for the missing GBS /
DP divisibility assertion.

Fix (additive — no signature changes to the existing aggregate
callables):
  * ``dp_dispatch`` adds a basic divisibility assertion on the TQ path:
    ``total_meta_size % dp_size == 0`` (legacy path enforces this via
    ``shard_by_batch_size(batch_size=gbs)``; TQ path skips that call
    site).
  * ``dp_dispatch`` looks up ``self._dp_post_<method_name>`` after
    ``aggregate``. If defined, calls
    ``post(aggregated, raw_results, shards=shards)`` and uses its
    return value. Convention-based — opt-in per Policy method, no
    decorator boilerplate.
  * ``Policy._dp_post_train`` recovers FLOPs from ``meta.sequence_lengths``
    on each shard (driver-pre-balanced for ``list[KVBatchMeta]``,
    sharder-strided for single ``KVBatchMeta``), records ``total_flops``,
    ``num_ranks``, ``theoretical_tflops`` — same fields the legacy
    body produces.

Backward-compat: existing tests in tests/data_plane/unit/test_shard_parity.py
and test_dispatch.py don't check ``extra_info`` shape on sharder output
or assert on aggregate-method return type other than what's already
returned, so the additive fields and gated reorder are transparent.
The legacy ``policy.train(BatchedDataDict)`` path is unchanged — it
keeps building results inline and never enters the new hook.

Async-on-TQ (PR 4) and grpo_sync (PR 0) both use the
``list[KVBatchMeta]`` path, so they inherit the FLOPs fix automatically
via the post-hook. The reorder fix is only meaningful for callers
that pass single ``KVBatchMeta`` — primarily future logprob/reference-
logprob TQ wiring; flagged in commit message of #3 above.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
ZhiyuLi-Nvidia added a commit that referenced this pull request May 9, 2026
…patch TQ path

Closes Issues #3 and #4 raised in PR review of the data-plane stack.

Issue #3 — single-``KVBatchMeta`` path returned rows in scrambled order.
``shard_keys_by_seqlen`` sorts by sequence length and strides
(``order[r::dp_world_size]``) to balance per-rank token totals. The
worker logprob aggregators (``_aggregate_logprob_results``) then
concatenate per-rank outputs in rank order via
``BatchedDataDict.from_batches`` — without inverting the seqlen-
strided permutation. Result: ``policy.get_logprobs(KVBatchMeta(...))``
returned rows in
[order[0], order[d], order[2d], …, order[1], order[1+d], …]
order, not the caller's ``meta.keys`` order. Silent correctness bug
(test_seqpack_legacy_equals_tq didn't catch it because the sync path
calls ``policy.get_logprobs(BatchedDataDict)`` — legacy passthrough,
no sharder).

Fix:
  * ``shard_keys_by_seqlen`` records ``_dp_original_indices`` per
    shard in ``extra_info`` (the ``idx`` list it computed).
  * ``dp_dispatch`` reconstructs the concat-position → input-index
    permutation from the shards' ``extra_info``, then applies the
    inverse via ``BatchedDataDict.reorder_data`` after ``aggregate``.
  * The reorder is gated on ``is_meta and not is_meta_list`` — for
    ``list[KVBatchMeta]`` the driver controls ordering (PR 0
    ``fan_out_per_rank_metas``) and the decorator must not undo it.
  * Skipped silently if the result isn't a BatchedDataDict (e.g.
    ``train`` returns a plain dict — order doesn't apply).

Issue #4 — TQ path silently dropped legacy training semantics.
The decorator's TQ branch returns ``aggregate(results)`` directly
and never enters ``Policy.train``'s body — so the FLOPs accumulation
at lm_policy.py around the ``flops_tracker`` block, plus the
``num_ranks`` and ``theoretical_tflops`` fields, were missing from
results when the trainer called ``policy.train(KVBatchMeta)`` or
``policy.train(list[KVBatchMeta])``. Same gap for the missing GBS /
DP divisibility assertion.

Fix (additive — no signature changes to the existing aggregate
callables):
  * ``dp_dispatch`` adds a basic divisibility assertion on the TQ path:
    ``total_meta_size % dp_size == 0`` (legacy path enforces this via
    ``shard_by_batch_size(batch_size=gbs)``; TQ path skips that call
    site).
  * ``dp_dispatch`` looks up ``self._dp_post_<method_name>`` after
    ``aggregate``. If defined, calls
    ``post(aggregated, raw_results, shards=shards)`` and uses its
    return value. Convention-based — opt-in per Policy method, no
    decorator boilerplate.
  * ``Policy._dp_post_train`` recovers FLOPs from ``meta.sequence_lengths``
    on each shard (driver-pre-balanced for ``list[KVBatchMeta]``,
    sharder-strided for single ``KVBatchMeta``), records ``total_flops``,
    ``num_ranks``, ``theoretical_tflops`` — same fields the legacy
    body produces.

Backward-compat: existing tests in tests/data_plane/unit/test_shard_parity.py
and test_dispatch.py don't check ``extra_info`` shape on sharder output
or assert on aggregate-method return type other than what's already
returned, so the additive fields and gated reorder are transparent.
The legacy ``policy.train(BatchedDataDict)`` path is unchanged — it
keeps building results inline and never enters the new hook.

Async-on-TQ (PR 4) and grpo_sync (PR 0) both use the
``list[KVBatchMeta]`` path, so they inherit the FLOPs fix automatically
via the post-hook. The reorder fix is only meaningful for callers
that pass single ``KVBatchMeta`` — primarily future logprob/reference-
logprob TQ wiring; flagged in commit message of #3 above.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
ZhiyuLi-Nvidia added a commit that referenced this pull request May 9, 2026
…patch TQ path

Closes Issues #3 and #4 raised in PR review of the data-plane stack.

Issue #3 — single-``KVBatchMeta`` path returned rows in scrambled order.
``shard_keys_by_seqlen`` sorts by sequence length and strides
(``order[r::dp_world_size]``) to balance per-rank token totals. The
worker logprob aggregators (``_aggregate_logprob_results``) then
concatenate per-rank outputs in rank order via
``BatchedDataDict.from_batches`` — without inverting the seqlen-
strided permutation. Result: ``policy.get_logprobs(KVBatchMeta(...))``
returned rows in
[order[0], order[d], order[2d], …, order[1], order[1+d], …]
order, not the caller's ``meta.keys`` order. Silent correctness bug
(test_seqpack_legacy_equals_tq didn't catch it because the sync path
calls ``policy.get_logprobs(BatchedDataDict)`` — legacy passthrough,
no sharder).

Fix:
  * ``shard_keys_by_seqlen`` records ``_dp_original_indices`` per
    shard in ``extra_info`` (the ``idx`` list it computed).
  * ``dp_dispatch`` reconstructs the concat-position → input-index
    permutation from the shards' ``extra_info``, then applies the
    inverse via ``BatchedDataDict.reorder_data`` after ``aggregate``.
  * The reorder is gated on ``is_meta and not is_meta_list`` — for
    ``list[KVBatchMeta]`` the driver controls ordering (PR 0
    ``fan_out_per_rank_metas``) and the decorator must not undo it.
  * Skipped silently if the result isn't a BatchedDataDict (e.g.
    ``train`` returns a plain dict — order doesn't apply).

Issue #4 — TQ path silently dropped legacy training semantics.
The decorator's TQ branch returns ``aggregate(results)`` directly
and never enters ``Policy.train``'s body — so the FLOPs accumulation
at lm_policy.py around the ``flops_tracker`` block, plus the
``num_ranks`` and ``theoretical_tflops`` fields, were missing from
results when the trainer called ``policy.train(KVBatchMeta)`` or
``policy.train(list[KVBatchMeta])``. Same gap for the missing GBS /
DP divisibility assertion.

Fix (additive — no signature changes to the existing aggregate
callables):
  * ``dp_dispatch`` adds a basic divisibility assertion on the TQ path:
    ``total_meta_size % dp_size == 0`` (legacy path enforces this via
    ``shard_by_batch_size(batch_size=gbs)``; TQ path skips that call
    site).
  * ``dp_dispatch`` looks up ``self._dp_post_<method_name>`` after
    ``aggregate``. If defined, calls
    ``post(aggregated, raw_results, shards=shards)`` and uses its
    return value. Convention-based — opt-in per Policy method, no
    decorator boilerplate.
  * ``Policy._dp_post_train`` recovers FLOPs from ``meta.sequence_lengths``
    on each shard (driver-pre-balanced for ``list[KVBatchMeta]``,
    sharder-strided for single ``KVBatchMeta``), records ``total_flops``,
    ``num_ranks``, ``theoretical_tflops`` — same fields the legacy
    body produces.

Backward-compat: existing tests in tests/data_plane/unit/test_shard_parity.py
and test_dispatch.py don't check ``extra_info`` shape on sharder output
or assert on aggregate-method return type other than what's already
returned, so the additive fields and gated reorder are transparent.
The legacy ``policy.train(BatchedDataDict)`` path is unchanged — it
keeps building results inline and never enters the new hook.

Async-on-TQ (PR 4) and grpo_sync (PR 0) both use the
``list[KVBatchMeta]`` path, so they inherit the FLOPs fix automatically
via the post-hook. The reorder fix is only meaningful for callers
that pass single ``KVBatchMeta`` — primarily future logprob/reference-
logprob TQ wiring; flagged in commit message of #3 above.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
ZhiyuLi-Nvidia added a commit that referenced this pull request May 14, 2026
…patch TQ path

Closes Issues #3 and #4 raised in PR review of the data-plane stack.

Issue #3 — single-``KVBatchMeta`` path returned rows in scrambled order.
``shard_keys_by_seqlen`` sorts by sequence length and strides
(``order[r::dp_world_size]``) to balance per-rank token totals. The
worker logprob aggregators (``_aggregate_logprob_results``) then
concatenate per-rank outputs in rank order via
``BatchedDataDict.from_batches`` — without inverting the seqlen-
strided permutation. Result: ``policy.get_logprobs(KVBatchMeta(...))``
returned rows in
[order[0], order[d], order[2d], …, order[1], order[1+d], …]
order, not the caller's ``meta.keys`` order. Silent correctness bug
(test_seqpack_legacy_equals_tq didn't catch it because the sync path
calls ``policy.get_logprobs(BatchedDataDict)`` — legacy passthrough,
no sharder).

Fix:
  * ``shard_keys_by_seqlen`` records ``_dp_original_indices`` per
    shard in ``extra_info`` (the ``idx`` list it computed).
  * ``dp_dispatch`` reconstructs the concat-position → input-index
    permutation from the shards' ``extra_info``, then applies the
    inverse via ``BatchedDataDict.reorder_data`` after ``aggregate``.
  * The reorder is gated on ``is_meta and not is_meta_list`` — for
    ``list[KVBatchMeta]`` the driver controls ordering (PR 0
    ``fan_out_per_rank_metas``) and the decorator must not undo it.
  * Skipped silently if the result isn't a BatchedDataDict (e.g.
    ``train`` returns a plain dict — order doesn't apply).

Issue #4 — TQ path silently dropped legacy training semantics.
The decorator's TQ branch returns ``aggregate(results)`` directly
and never enters ``Policy.train``'s body — so the FLOPs accumulation
at lm_policy.py around the ``flops_tracker`` block, plus the
``num_ranks`` and ``theoretical_tflops`` fields, were missing from
results when the trainer called ``policy.train(KVBatchMeta)`` or
``policy.train(list[KVBatchMeta])``. Same gap for the missing GBS /
DP divisibility assertion.

Fix (additive — no signature changes to the existing aggregate
callables):
  * ``dp_dispatch`` adds a basic divisibility assertion on the TQ path:
    ``total_meta_size % dp_size == 0`` (legacy path enforces this via
    ``shard_by_batch_size(batch_size=gbs)``; TQ path skips that call
    site).
  * ``dp_dispatch`` looks up ``self._dp_post_<method_name>`` after
    ``aggregate``. If defined, calls
    ``post(aggregated, raw_results, shards=shards)`` and uses its
    return value. Convention-based — opt-in per Policy method, no
    decorator boilerplate.
  * ``Policy._dp_post_train`` recovers FLOPs from ``meta.sequence_lengths``
    on each shard (driver-pre-balanced for ``list[KVBatchMeta]``,
    sharder-strided for single ``KVBatchMeta``), records ``total_flops``,
    ``num_ranks``, ``theoretical_tflops`` — same fields the legacy
    body produces.

Backward-compat: existing tests in tests/data_plane/unit/test_shard_parity.py
and test_dispatch.py don't check ``extra_info`` shape on sharder output
or assert on aggregate-method return type other than what's already
returned, so the additive fields and gated reorder are transparent.
The legacy ``policy.train(BatchedDataDict)`` path is unchanged — it
keeps building results inline and never enters the new hook.

Async-on-TQ (PR 4) and grpo_sync (PR 0) both use the
``list[KVBatchMeta]`` path, so they inherit the FLOPs fix automatically
via the post-hook. The reorder fix is only meaningful for callers
that pass single ``KVBatchMeta`` — primarily future logprob/reference-
logprob TQ wiring; flagged in commit message of #3 above.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
ZhiyuLi-Nvidia added a commit that referenced this pull request May 15, 2026
* grpo_sync.py: remove unused batch_cache = None (leftover from
  grpo.py-style dynamic sampling; grpo_sync threads survivors through
  pending_meta / pending_slice).
* TQPolicy: rename _dp_client -> dp_client and _tq_partition_id ->
  tq_partition_id. They are read from grpo_sync.py in 7 places, so
  the underscore prefix was misleading. Constructor kwarg
  tq_partition_id already matched the new attribute name.
* Update README + data_plane_api_lifecycle docs example snippets.

Per yuki-97 PR review (#3, #4).

Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
ZhiyuLi-Nvidia added a commit that referenced this pull request May 19, 2026
…patch TQ path

Closes Issues #3 and #4 raised in PR review of the data-plane stack.

Issue #3 — single-``KVBatchMeta`` path returned rows in scrambled order.
``shard_keys_by_seqlen`` sorts by sequence length and strides
(``order[r::dp_world_size]``) to balance per-rank token totals. The
worker logprob aggregators (``_aggregate_logprob_results``) then
concatenate per-rank outputs in rank order via
``BatchedDataDict.from_batches`` — without inverting the seqlen-
strided permutation. Result: ``policy.get_logprobs(KVBatchMeta(...))``
returned rows in
[order[0], order[d], order[2d], …, order[1], order[1+d], …]
order, not the caller's ``meta.keys`` order. Silent correctness bug
(test_seqpack_legacy_equals_tq didn't catch it because the sync path
calls ``policy.get_logprobs(BatchedDataDict)`` — legacy passthrough,
no sharder).

Fix:
  * ``shard_keys_by_seqlen`` records ``_dp_original_indices`` per
    shard in ``extra_info`` (the ``idx`` list it computed).
  * ``dp_dispatch`` reconstructs the concat-position → input-index
    permutation from the shards' ``extra_info``, then applies the
    inverse via ``BatchedDataDict.reorder_data`` after ``aggregate``.
  * The reorder is gated on ``is_meta and not is_meta_list`` — for
    ``list[KVBatchMeta]`` the driver controls ordering (PR 0
    ``fan_out_per_rank_metas``) and the decorator must not undo it.
  * Skipped silently if the result isn't a BatchedDataDict (e.g.
    ``train`` returns a plain dict — order doesn't apply).

Issue #4 — TQ path silently dropped legacy training semantics.
The decorator's TQ branch returns ``aggregate(results)`` directly
and never enters ``Policy.train``'s body — so the FLOPs accumulation
at lm_policy.py around the ``flops_tracker`` block, plus the
``num_ranks`` and ``theoretical_tflops`` fields, were missing from
results when the trainer called ``policy.train(KVBatchMeta)`` or
``policy.train(list[KVBatchMeta])``. Same gap for the missing GBS /
DP divisibility assertion.

Fix (additive — no signature changes to the existing aggregate
callables):
  * ``dp_dispatch`` adds a basic divisibility assertion on the TQ path:
    ``total_meta_size % dp_size == 0`` (legacy path enforces this via
    ``shard_by_batch_size(batch_size=gbs)``; TQ path skips that call
    site).
  * ``dp_dispatch`` looks up ``self._dp_post_<method_name>`` after
    ``aggregate``. If defined, calls
    ``post(aggregated, raw_results, shards=shards)`` and uses its
    return value. Convention-based — opt-in per Policy method, no
    decorator boilerplate.
  * ``Policy._dp_post_train`` recovers FLOPs from ``meta.sequence_lengths``
    on each shard (driver-pre-balanced for ``list[KVBatchMeta]``,
    sharder-strided for single ``KVBatchMeta``), records ``total_flops``,
    ``num_ranks``, ``theoretical_tflops`` — same fields the legacy
    body produces.

Backward-compat: existing tests in tests/data_plane/unit/test_shard_parity.py
and test_dispatch.py don't check ``extra_info`` shape on sharder output
or assert on aggregate-method return type other than what's already
returned, so the additive fields and gated reorder are transparent.
The legacy ``policy.train(BatchedDataDict)`` path is unchanged — it
keeps building results inline and never enters the new hook.

Async-on-TQ (PR 4) and grpo_sync (PR 0) both use the
``list[KVBatchMeta]`` path, so they inherit the FLOPs fix automatically
via the post-hook. The reorder fix is only meaningful for callers
that pass single ``KVBatchMeta`` — primarily future logprob/reference-
logprob TQ wiring; flagged in commit message of #3 above.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
ZhiyuLi-Nvidia added a commit that referenced this pull request May 19, 2026
* grpo_sync.py: remove unused batch_cache = None (leftover from
  grpo.py-style dynamic sampling; grpo_sync threads survivors through
  pending_meta / pending_slice).
* TQPolicy: rename _dp_client -> dp_client and _tq_partition_id ->
  tq_partition_id. They are read from grpo_sync.py in 7 places, so
  the underscore prefix was misleading. Constructor kwarg
  tq_partition_id already matched the new attribute name.
* Update README + data_plane_api_lifecycle docs example snippets.

Per yuki-97 PR review (#3, #4).

Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
ZhiyuLi-Nvidia added a commit that referenced this pull request May 19, 2026
…patch TQ path

Closes Issues #3 and #4 raised in PR review of the data-plane stack.

Issue #3 — single-``KVBatchMeta`` path returned rows in scrambled order.
``shard_keys_by_seqlen`` sorts by sequence length and strides
(``order[r::dp_world_size]``) to balance per-rank token totals. The
worker logprob aggregators (``_aggregate_logprob_results``) then
concatenate per-rank outputs in rank order via
``BatchedDataDict.from_batches`` — without inverting the seqlen-
strided permutation. Result: ``policy.get_logprobs(KVBatchMeta(...))``
returned rows in
[order[0], order[d], order[2d], …, order[1], order[1+d], …]
order, not the caller's ``meta.keys`` order. Silent correctness bug
(test_seqpack_legacy_equals_tq didn't catch it because the sync path
calls ``policy.get_logprobs(BatchedDataDict)`` — legacy passthrough,
no sharder).

Fix:
  * ``shard_keys_by_seqlen`` records ``_dp_original_indices`` per
    shard in ``extra_info`` (the ``idx`` list it computed).
  * ``dp_dispatch`` reconstructs the concat-position → input-index
    permutation from the shards' ``extra_info``, then applies the
    inverse via ``BatchedDataDict.reorder_data`` after ``aggregate``.
  * The reorder is gated on ``is_meta and not is_meta_list`` — for
    ``list[KVBatchMeta]`` the driver controls ordering (PR 0
    ``fan_out_per_rank_metas``) and the decorator must not undo it.
  * Skipped silently if the result isn't a BatchedDataDict (e.g.
    ``train`` returns a plain dict — order doesn't apply).

Issue #4 — TQ path silently dropped legacy training semantics.
The decorator's TQ branch returns ``aggregate(results)`` directly
and never enters ``Policy.train``'s body — so the FLOPs accumulation
at lm_policy.py around the ``flops_tracker`` block, plus the
``num_ranks`` and ``theoretical_tflops`` fields, were missing from
results when the trainer called ``policy.train(KVBatchMeta)`` or
``policy.train(list[KVBatchMeta])``. Same gap for the missing GBS /
DP divisibility assertion.

Fix (additive — no signature changes to the existing aggregate
callables):
  * ``dp_dispatch`` adds a basic divisibility assertion on the TQ path:
    ``total_meta_size % dp_size == 0`` (legacy path enforces this via
    ``shard_by_batch_size(batch_size=gbs)``; TQ path skips that call
    site).
  * ``dp_dispatch`` looks up ``self._dp_post_<method_name>`` after
    ``aggregate``. If defined, calls
    ``post(aggregated, raw_results, shards=shards)`` and uses its
    return value. Convention-based — opt-in per Policy method, no
    decorator boilerplate.
  * ``Policy._dp_post_train`` recovers FLOPs from ``meta.sequence_lengths``
    on each shard (driver-pre-balanced for ``list[KVBatchMeta]``,
    sharder-strided for single ``KVBatchMeta``), records ``total_flops``,
    ``num_ranks``, ``theoretical_tflops`` — same fields the legacy
    body produces.

Backward-compat: existing tests in tests/data_plane/unit/test_shard_parity.py
and test_dispatch.py don't check ``extra_info`` shape on sharder output
or assert on aggregate-method return type other than what's already
returned, so the additive fields and gated reorder are transparent.
The legacy ``policy.train(BatchedDataDict)`` path is unchanged — it
keeps building results inline and never enters the new hook.

Async-on-TQ (PR 4) and grpo_sync (PR 0) both use the
``list[KVBatchMeta]`` path, so they inherit the FLOPs fix automatically
via the post-hook. The reorder fix is only meaningful for callers
that pass single ``KVBatchMeta`` — primarily future logprob/reference-
logprob TQ wiring; flagged in commit message of #3 above.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
ZhiyuLi-Nvidia added a commit that referenced this pull request May 19, 2026
* grpo_sync.py: remove unused batch_cache = None (leftover from
  grpo.py-style dynamic sampling; grpo_sync threads survivors through
  pending_meta / pending_slice).
* TQPolicy: rename _dp_client -> dp_client and _tq_partition_id ->
  tq_partition_id. They are read from grpo_sync.py in 7 places, so
  the underscore prefix was misleading. Constructor kwarg
  tq_partition_id already matched the new attribute name.
* Update README + data_plane_api_lifecycle docs example snippets.

Per yuki-97 PR review (#3, #4).

Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
ZhiyuLi-Nvidia added a commit that referenced this pull request May 22, 2026
…patch TQ path

Closes Issues #3 and #4 raised in PR review of the data-plane stack.

Issue #3 — single-``KVBatchMeta`` path returned rows in scrambled order.
``shard_keys_by_seqlen`` sorts by sequence length and strides
(``order[r::dp_world_size]``) to balance per-rank token totals. The
worker logprob aggregators (``_aggregate_logprob_results``) then
concatenate per-rank outputs in rank order via
``BatchedDataDict.from_batches`` — without inverting the seqlen-
strided permutation. Result: ``policy.get_logprobs(KVBatchMeta(...))``
returned rows in
[order[0], order[d], order[2d], …, order[1], order[1+d], …]
order, not the caller's ``meta.keys`` order. Silent correctness bug
(test_seqpack_legacy_equals_tq didn't catch it because the sync path
calls ``policy.get_logprobs(BatchedDataDict)`` — legacy passthrough,
no sharder).

Fix:
  * ``shard_keys_by_seqlen`` records ``_dp_original_indices`` per
    shard in ``extra_info`` (the ``idx`` list it computed).
  * ``dp_dispatch`` reconstructs the concat-position → input-index
    permutation from the shards' ``extra_info``, then applies the
    inverse via ``BatchedDataDict.reorder_data`` after ``aggregate``.
  * The reorder is gated on ``is_meta and not is_meta_list`` — for
    ``list[KVBatchMeta]`` the driver controls ordering (PR 0
    ``fan_out_per_rank_metas``) and the decorator must not undo it.
  * Skipped silently if the result isn't a BatchedDataDict (e.g.
    ``train`` returns a plain dict — order doesn't apply).

Issue #4 — TQ path silently dropped legacy training semantics.
The decorator's TQ branch returns ``aggregate(results)`` directly
and never enters ``Policy.train``'s body — so the FLOPs accumulation
at lm_policy.py around the ``flops_tracker`` block, plus the
``num_ranks`` and ``theoretical_tflops`` fields, were missing from
results when the trainer called ``policy.train(KVBatchMeta)`` or
``policy.train(list[KVBatchMeta])``. Same gap for the missing GBS /
DP divisibility assertion.

Fix (additive — no signature changes to the existing aggregate
callables):
  * ``dp_dispatch`` adds a basic divisibility assertion on the TQ path:
    ``total_meta_size % dp_size == 0`` (legacy path enforces this via
    ``shard_by_batch_size(batch_size=gbs)``; TQ path skips that call
    site).
  * ``dp_dispatch`` looks up ``self._dp_post_<method_name>`` after
    ``aggregate``. If defined, calls
    ``post(aggregated, raw_results, shards=shards)`` and uses its
    return value. Convention-based — opt-in per Policy method, no
    decorator boilerplate.
  * ``Policy._dp_post_train`` recovers FLOPs from ``meta.sequence_lengths``
    on each shard (driver-pre-balanced for ``list[KVBatchMeta]``,
    sharder-strided for single ``KVBatchMeta``), records ``total_flops``,
    ``num_ranks``, ``theoretical_tflops`` — same fields the legacy
    body produces.

Backward-compat: existing tests in tests/data_plane/unit/test_shard_parity.py
and test_dispatch.py don't check ``extra_info`` shape on sharder output
or assert on aggregate-method return type other than what's already
returned, so the additive fields and gated reorder are transparent.
The legacy ``policy.train(BatchedDataDict)`` path is unchanged — it
keeps building results inline and never enters the new hook.

Async-on-TQ (PR 4) and grpo_sync (PR 0) both use the
``list[KVBatchMeta]`` path, so they inherit the FLOPs fix automatically
via the post-hook. The reorder fix is only meaningful for callers
that pass single ``KVBatchMeta`` — primarily future logprob/reference-
logprob TQ wiring; flagged in commit message of #3 above.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
ZhiyuLi-Nvidia added a commit that referenced this pull request May 22, 2026
* grpo_sync.py: remove unused batch_cache = None (leftover from
  grpo.py-style dynamic sampling; grpo_sync threads survivors through
  pending_meta / pending_slice).
* TQPolicy: rename _dp_client -> dp_client and _tq_partition_id ->
  tq_partition_id. They are read from grpo_sync.py in 7 places, so
  the underscore prefix was misleading. Constructor kwarg
  tq_partition_id already matched the new attribute name.
* Update README + data_plane_api_lifecycle docs example snippets.

Per yuki-97 PR review (#3, #4).

Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
ZhiyuLi-Nvidia added a commit that referenced this pull request May 23, 2026
…patch TQ path

Closes Issues #3 and #4 raised in PR review of the data-plane stack.

Issue #3 — single-``KVBatchMeta`` path returned rows in scrambled order.
``shard_keys_by_seqlen`` sorts by sequence length and strides
(``order[r::dp_world_size]``) to balance per-rank token totals. The
worker logprob aggregators (``_aggregate_logprob_results``) then
concatenate per-rank outputs in rank order via
``BatchedDataDict.from_batches`` — without inverting the seqlen-
strided permutation. Result: ``policy.get_logprobs(KVBatchMeta(...))``
returned rows in
[order[0], order[d], order[2d], …, order[1], order[1+d], …]
order, not the caller's ``meta.keys`` order. Silent correctness bug
(test_seqpack_legacy_equals_tq didn't catch it because the sync path
calls ``policy.get_logprobs(BatchedDataDict)`` — legacy passthrough,
no sharder).

Fix:
  * ``shard_keys_by_seqlen`` records ``_dp_original_indices`` per
    shard in ``extra_info`` (the ``idx`` list it computed).
  * ``dp_dispatch`` reconstructs the concat-position → input-index
    permutation from the shards' ``extra_info``, then applies the
    inverse via ``BatchedDataDict.reorder_data`` after ``aggregate``.
  * The reorder is gated on ``is_meta and not is_meta_list`` — for
    ``list[KVBatchMeta]`` the driver controls ordering (PR 0
    ``fan_out_per_rank_metas``) and the decorator must not undo it.
  * Skipped silently if the result isn't a BatchedDataDict (e.g.
    ``train`` returns a plain dict — order doesn't apply).

Issue #4 — TQ path silently dropped legacy training semantics.
The decorator's TQ branch returns ``aggregate(results)`` directly
and never enters ``Policy.train``'s body — so the FLOPs accumulation
at lm_policy.py around the ``flops_tracker`` block, plus the
``num_ranks`` and ``theoretical_tflops`` fields, were missing from
results when the trainer called ``policy.train(KVBatchMeta)`` or
``policy.train(list[KVBatchMeta])``. Same gap for the missing GBS /
DP divisibility assertion.

Fix (additive — no signature changes to the existing aggregate
callables):
  * ``dp_dispatch`` adds a basic divisibility assertion on the TQ path:
    ``total_meta_size % dp_size == 0`` (legacy path enforces this via
    ``shard_by_batch_size(batch_size=gbs)``; TQ path skips that call
    site).
  * ``dp_dispatch`` looks up ``self._dp_post_<method_name>`` after
    ``aggregate``. If defined, calls
    ``post(aggregated, raw_results, shards=shards)`` and uses its
    return value. Convention-based — opt-in per Policy method, no
    decorator boilerplate.
  * ``Policy._dp_post_train`` recovers FLOPs from ``meta.sequence_lengths``
    on each shard (driver-pre-balanced for ``list[KVBatchMeta]``,
    sharder-strided for single ``KVBatchMeta``), records ``total_flops``,
    ``num_ranks``, ``theoretical_tflops`` — same fields the legacy
    body produces.

Backward-compat: existing tests in tests/data_plane/unit/test_shard_parity.py
and test_dispatch.py don't check ``extra_info`` shape on sharder output
or assert on aggregate-method return type other than what's already
returned, so the additive fields and gated reorder are transparent.
The legacy ``policy.train(BatchedDataDict)`` path is unchanged — it
keeps building results inline and never enters the new hook.

Async-on-TQ (PR 4) and grpo_sync (PR 0) both use the
``list[KVBatchMeta]`` path, so they inherit the FLOPs fix automatically
via the post-hook. The reorder fix is only meaningful for callers
that pass single ``KVBatchMeta`` — primarily future logprob/reference-
logprob TQ wiring; flagged in commit message of #3 above.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
ZhiyuLi-Nvidia added a commit that referenced this pull request May 23, 2026
* grpo_sync.py: remove unused batch_cache = None (leftover from
  grpo.py-style dynamic sampling; grpo_sync threads survivors through
  pending_meta / pending_slice).
* TQPolicy: rename _dp_client -> dp_client and _tq_partition_id ->
  tq_partition_id. They are read from grpo_sync.py in 7 places, so
  the underscore prefix was misleading. Constructor kwarg
  tq_partition_id already matched the new attribute name.
* Update README + data_plane_api_lifecycle docs example snippets.

Per yuki-97 PR review (#3, #4).

Signed-off-by: Zhiyu Li <zhiyul@NVIDIA.com>
copy-pr-bot Bot pushed a commit that referenced this pull request Jun 7, 2026
Comments addressed: #3, #5, #7, #8, #9, #10, #11.

- Rename _load_M -> _get_sparse_projection_matrix and
  _load_dense_projection -> _get_topk_projection (later removed in
  favor of module-level cache helpers below).
- Drop unused alignment_student_spans / alignment_teacher_spans
  from the cross-tokenizer batch payload.
- Remove NRL_XTOKEN_LOSS_DUMP_DIR debug-dump side effect.
- Move Fp32SparseMM, chunk_average_log_probs, valid_chunk_mask to a
  new shared module nemo_rl/algorithms/x_token/utils.py.
- Extract projection-file parsing into utils.parse_projection_file;
  tokenalign.py and loss_functions.py both go through it.
- Move per-instance projection-matrix caches to process-local caches
  in utils.get_sparse_projection_matrix / get_topk_projection. The
  driver no longer holds large CUDA tensors; each Ray worker fills
  its own cache on first loss call.

Signed-off-by: Adithya Hanasoge <avenkateshha@nvidia.com>
copy-pr-bot Bot pushed a commit that referenced this pull request Jun 7, 2026
PR #2508 review (@RayenTian):

- #2: Fold data["sample_mask"] into the gold-loss path's valid-chunk
  mask (chunk_mask & sample_mask.bool().unsqueeze(-1)) so samples with
  loss_multiplier=0 stop contributing to KL-on-common, L1-on-uncommon,
  top-1 accuracy, and the returned valid-count. Mirrors the P-KL path.

- #3: Size both projection-matrix axes from the configured tokenizer
  vocabs (student + teacher), not max(observed_idx) + 1.
  CrossTokenizerDistillationLossConfig declares student_vocab_size and
  teacher_vocab_size; xtoken_distillation.setup() injects both at
  runtime from len(student_tokenizer) / len(teacher_tokenizer).
  get_sparse_projection_matrix now takes both as keyword-only args and
  clamps V_s / V_t up against the projection's observed maxima as a
  defensive fallback. Same-magnitude-int positional swap is guarded by
  the keyword-only signature.

Signed-off-by: Adithya Hanasoge <avenkateshha@nvidia.com>
copy-pr-bot Bot pushed a commit that referenced this pull request Jun 7, 2026
Comments addressed: #3, #5, #7, #8, #9, #10, #11.

- Rename _load_M -> _get_sparse_projection_matrix and
  _load_dense_projection -> _get_topk_projection (later removed in
  favor of module-level cache helpers below).
- Drop unused alignment_student_spans / alignment_teacher_spans
  from the cross-tokenizer batch payload.
- Remove NRL_XTOKEN_LOSS_DUMP_DIR debug-dump side effect.
- Move Fp32SparseMM, chunk_average_log_probs, valid_chunk_mask to a
  new shared module nemo_rl/algorithms/x_token/utils.py.
- Extract projection-file parsing into utils.parse_projection_file;
  tokenalign.py and loss_functions.py both go through it.
- Move per-instance projection-matrix caches to process-local caches
  in utils.get_sparse_projection_matrix / get_topk_projection. The
  driver no longer holds large CUDA tensors; each Ray worker fills
  its own cache on first loss call.

Signed-off-by: Adithya Hanasoge <avenkateshha@nvidia.com>
copy-pr-bot Bot pushed a commit that referenced this pull request Jun 7, 2026
PR #2508 review (@RayenTian):

- #2: Fold data["sample_mask"] into the gold-loss path's valid-chunk
  mask (chunk_mask & sample_mask.bool().unsqueeze(-1)) so samples with
  loss_multiplier=0 stop contributing to KL-on-common, L1-on-uncommon,
  top-1 accuracy, and the returned valid-count. Mirrors the P-KL path.

- #3: Size both projection-matrix axes from the configured tokenizer
  vocabs (student + teacher), not max(observed_idx) + 1.
  CrossTokenizerDistillationLossConfig declares student_vocab_size and
  teacher_vocab_size; xtoken_distillation.setup() injects both at
  runtime from len(student_tokenizer) / len(teacher_tokenizer).
  get_sparse_projection_matrix now takes both as keyword-only args and
  clamps V_s / V_t up against the projection's observed maxima as a
  defensive fallback. Same-magnitude-int positional swap is guarded by
  the keyword-only signature.

Signed-off-by: Adithya Hanasoge <avenkateshha@nvidia.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Documentation Improvements or additions to documentation

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants