Skip to content

[Feature] Refactor PD decode KV receiver bootstrap for synchronized queue state #21680

@ShangmingCai

Description

@ShangmingCai

Motivation

On the PD-disaggregated decode path, creation of each DecodeRequest’s kv_receiver is currently deferred until after prefill-info fetch (via _ensure_prefill_info) and prefill DP-rank resolution (via _resolve_pending_reqs). As a result, the time at which a kv_receiver is created and the request enters the scheduler queue can differ across tensor-parallel (TP) ranks.

That desynchronization breaks DecodePreallocQueue when it runs _update_handshake_waiters and calls:

poll_and_all_reduce([decode_req.kv_receiver for decode_req in self.queue], self.gloo_group)

If len(self.queue) is not identical on every TP rank, the collective observes mismatched tensor sizes and Gloo fails with an error such as:

[enforce fail at /pytorch/third_party/gloo/gloo/transport/tcp/pair.cc:456] op.preamble.length <= op.nbytes. 2 vs 1. Received data size doesn't match expected size. Is there a distributed collective mismatch in your code?

Pull request #21299 mitigates queue inconsistency by designating a leader and synchronizing ranks, but that approach feels like a workaround, instead of removing the root cause: bootstrap and queue membership are still not aligned with a single, well-defined collective boundary.

The desired direction is to keep all TP/CP allreduce usage confined to poll_and_all_reduce and to refactor PD decode-side bootstrapping so that every rank presents the same queue length from the start of the handshake phase. While DP-rank resolution may still complete at different times on different ranks, poll_and_all_reduce will treat requests that are not fully ready on all ranks as the bootstrapping phase, avoiding size mismatches.

CC: @weireweire @hnyls2002


Proposed design

1. BaseKVReceiver and CommonKVReceiver

  • Add an abstract method send_metadata on BaseKVReceiver. Move logic that today lives in KVReceiver.init into send_metadata (implementations required for Mooncake, Nixl, and Mori).
  • In CommonKVReceiver, move everything after
    if self.bootstrap_addr not in self.kv_mgr.prefill_info_table:
    (and all subsequent prefill_info_table-related work) from construction into init. For Mooncake, Nixl, and Mori, any post-super().__init__ code that touches bootstrap metadata or status updates must move into init as well.
  • Remove prefill DP rank assignment from __init__. Resolve and pass prefill_dp_rank as an argument to init. Better handle this in CommonKVReceiver.init for all backends.

2. DecodePreallocQueue and pending resolution

  • In add, call _create_receiver_and_enqueue up front for each request so all ranks enqueue the same requests immediately when scheduled.
    • If _resolve_prefill_dp_rank succeeds immediately, call that request’s kv_receiver.init(...) with the resolved prefill DP rank (e.g. have _create_receiver_and_enqueue return the DecodeRequest so kv_receiver is reachable).
    • If the DP rank is not yet available, still enqueue the request (it remains in KVPoll.Bootstrapping) and track it in pending_reqs for asynchronous DP-rank resolution.
  • In _resolve_pending_reqs, after _resolve_prefill_dp_rank / query_prefill_dp_ranks yield a DP rank, invoke kv_receiver.init for each affected request.
  • Remove this req from pending_reqs.

3. When bootstrapping is considered complete

  • Only after CommonKVReceiver.init completes successfully should the request be considered past bootstrapping; then call
    self.kv_mgr.update_status(self.bootstrap_room, KVPoll.WaitingForInput).
    Adjust the exact call site for Mooncake and Mori; Nixl may require corresponding updates in NixlKVReceiver.poll so status transitions stay consistent.

4. Outcome

With the above, every rank creates receivers and enqueues requests in lockstep, so len(self.queue) matches across TP/CP ranks for poll_and_all_reduce. Ranks that are still resolving prefill DP ranks continue to report KVPoll.Bootstrapping until init finishes, so the collective no longer sees inconsistent participant counts or payload sizes for the same logical step.


Related resources

Related resources

No response

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions