Motivation
On the PD-disaggregated decode path, creation of each DecodeRequest’s kv_receiver is currently deferred until after prefill-info fetch (via _ensure_prefill_info) and prefill DP-rank resolution (via _resolve_pending_reqs). As a result, the time at which a kv_receiver is created and the request enters the scheduler queue can differ across tensor-parallel (TP) ranks.
That desynchronization breaks DecodePreallocQueue when it runs _update_handshake_waiters and calls:
poll_and_all_reduce([decode_req.kv_receiver for decode_req in self.queue], self.gloo_group)
If len(self.queue) is not identical on every TP rank, the collective observes mismatched tensor sizes and Gloo fails with an error such as:
[enforce fail at /pytorch/third_party/gloo/gloo/transport/tcp/pair.cc:456] op.preamble.length <= op.nbytes. 2 vs 1. Received data size doesn't match expected size. Is there a distributed collective mismatch in your code?
Pull request #21299 mitigates queue inconsistency by designating a leader and synchronizing ranks, but that approach feels like a workaround, instead of removing the root cause: bootstrap and queue membership are still not aligned with a single, well-defined collective boundary.
The desired direction is to keep all TP/CP allreduce usage confined to poll_and_all_reduce and to refactor PD decode-side bootstrapping so that every rank presents the same queue length from the start of the handshake phase. While DP-rank resolution may still complete at different times on different ranks, poll_and_all_reduce will treat requests that are not fully ready on all ranks as the bootstrapping phase, avoiding size mismatches.
CC: @weireweire @hnyls2002
Proposed design
1. BaseKVReceiver and CommonKVReceiver
- Add an abstract method
send_metadata on BaseKVReceiver. Move logic that today lives in KVReceiver.init into send_metadata (implementations required for Mooncake, Nixl, and Mori).
- In
CommonKVReceiver, move everything after
if self.bootstrap_addr not in self.kv_mgr.prefill_info_table:
(and all subsequent prefill_info_table-related work) from construction into init. For Mooncake, Nixl, and Mori, any post-super().__init__ code that touches bootstrap metadata or status updates must move into init as well.
- Remove prefill DP rank assignment from
__init__. Resolve and pass prefill_dp_rank as an argument to init. Better handle this in CommonKVReceiver.init for all backends.
2. DecodePreallocQueue and pending resolution
- In
add, call _create_receiver_and_enqueue up front for each request so all ranks enqueue the same requests immediately when scheduled.
- If
_resolve_prefill_dp_rank succeeds immediately, call that request’s kv_receiver.init(...) with the resolved prefill DP rank (e.g. have _create_receiver_and_enqueue return the DecodeRequest so kv_receiver is reachable).
- If the DP rank is not yet available, still enqueue the request (it remains in
KVPoll.Bootstrapping) and track it in pending_reqs for asynchronous DP-rank resolution.
- In
_resolve_pending_reqs, after _resolve_prefill_dp_rank / query_prefill_dp_ranks yield a DP rank, invoke kv_receiver.init for each affected request.
- Remove this req from
pending_reqs.
3. When bootstrapping is considered complete
- Only after
CommonKVReceiver.init completes successfully should the request be considered past bootstrapping; then call
self.kv_mgr.update_status(self.bootstrap_room, KVPoll.WaitingForInput).
Adjust the exact call site for Mooncake and Mori; Nixl may require corresponding updates in NixlKVReceiver.poll so status transitions stay consistent.
4. Outcome
With the above, every rank creates receivers and enqueues requests in lockstep, so len(self.queue) matches across TP/CP ranks for poll_and_all_reduce. Ranks that are still resolving prefill DP ranks continue to report KVPoll.Bootstrapping until init finishes, so the collective no longer sees inconsistent participant counts or payload sizes for the same logical step.
Related resources
Related resources
No response
Motivation
On the PD-disaggregated decode path, creation of each
DecodeRequest’skv_receiveris currently deferred until after prefill-info fetch (via_ensure_prefill_info) and prefill DP-rank resolution (via_resolve_pending_reqs). As a result, the time at which akv_receiveris created and the request enters the scheduler queue can differ across tensor-parallel (TP) ranks.That desynchronization breaks
DecodePreallocQueuewhen it runs_update_handshake_waitersand calls:poll_and_all_reduce([decode_req.kv_receiver for decode_req in self.queue], self.gloo_group)If
len(self.queue)is not identical on every TP rank, the collective observes mismatched tensor sizes and Gloo fails with an error such as:Pull request #21299 mitigates queue inconsistency by designating a leader and synchronizing ranks, but that approach feels like a workaround, instead of removing the root cause: bootstrap and queue membership are still not aligned with a single, well-defined collective boundary.
The desired direction is to keep all TP/CP
allreduceusage confined topoll_and_all_reduceand to refactor PD decode-side bootstrapping so that every rank presents the same queue length from the start of the handshake phase. While DP-rank resolution may still complete at different times on different ranks,poll_and_all_reducewill treat requests that are not fully ready on all ranks as the bootstrapping phase, avoiding size mismatches.CC: @weireweire @hnyls2002
Proposed design
1.
BaseKVReceiverandCommonKVReceiversend_metadataonBaseKVReceiver. Move logic that today lives inKVReceiver.initintosend_metadata(implementations required for Mooncake, Nixl, and Mori).CommonKVReceiver, move everything afterif self.bootstrap_addr not in self.kv_mgr.prefill_info_table:(and all subsequent
prefill_info_table-related work) from construction intoinit. For Mooncake, Nixl, and Mori, any post-super().__init__code that touches bootstrap metadata or status updates must move intoinitas well.__init__. Resolve and passprefill_dp_rankas an argument toinit. Better handle this inCommonKVReceiver.initfor all backends.2.
DecodePreallocQueueand pending resolutionadd, call_create_receiver_and_enqueueup front for each request so all ranks enqueue the same requests immediately when scheduled._resolve_prefill_dp_ranksucceeds immediately, call that request’skv_receiver.init(...)with the resolved prefill DP rank (e.g. have_create_receiver_and_enqueuereturn theDecodeRequestsokv_receiveris reachable).KVPoll.Bootstrapping) and track it inpending_reqsfor asynchronous DP-rank resolution._resolve_pending_reqs, after_resolve_prefill_dp_rank/query_prefill_dp_ranksyield a DP rank, invokekv_receiver.initfor each affected request.pending_reqs.3. When bootstrapping is considered complete
CommonKVReceiver.initcompletes successfully should the request be considered past bootstrapping; then callself.kv_mgr.update_status(self.bootstrap_room, KVPoll.WaitingForInput).Adjust the exact call site for Mooncake and Mori; Nixl may require corresponding updates in
NixlKVReceiver.pollso status transitions stay consistent.4. Outcome
With the above, every rank creates receivers and enqueues requests in lockstep, so
len(self.queue)matches across TP/CP ranks forpoll_and_all_reduce. Ranks that are still resolving prefill DP ranks continue to reportKVPoll.Bootstrappinguntilinitfinishes, so the collective no longer sees inconsistent participant counts or payload sizes for the same logical step.Related resources
Related resources
No response