NixlKVManager: async multi-threaded KV transfer#20680
NixlKVManager: async multi-threaded KV transfer#20680usernamehaha2022 wants to merge 7 commits intosgl-project:mainfrom
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a fundamental architectural change to the NixlKVManager by transforming its KV transfer mechanism from synchronous to asynchronous and multi-threaded. The primary goal is to eliminate performance bottlenecks on the prefill path, which previously occurred due to synchronous chunk transfers blocking the scheduler. By offloading transfer operations to a dedicated queue and worker pool, the system achieves substantial reductions in transfer latency and variance, leading to improved overall throughput and a more responsive prefill process. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a significant performance improvement to NixlKVManager by refactoring the KV transfer to be asynchronous and multi-threaded. The change from a synchronous, polling-based approach in the sender to a queue-based worker pool design on the prefill node is well-executed and aligns with modern high-performance patterns. The performance gains are impressive, with a ~4x reduction in mean transfer latency.
The code changes are clear and well-structured. The introduction of TransferKVChunk and the transfer_worker effectively decouples the transfer logic. The simplifications in NixlKVSender are a great consequence of this new design.
I have a couple of suggestions for improvement. One is a critical fix for a potential race condition due to unsynchronized access to a shared dictionary from multiple threads. The other is a minor cleanup of an unused parameter.
Overall, this is an excellent contribution that significantly boosts performance.
| def clear(self): | ||
| try: | ||
| self.kv_mgr.request_status.pop(self.bootstrap_room, None) | ||
| except Exception: | ||
| pass |
There was a problem hiding this comment.
The self.kv_mgr.request_status dictionary is accessed from multiple threads (the main scheduler thread via NixlKVSender and multiple transfer_worker threads) without any synchronization, which can lead to race conditions. The try...except Exception block here is too broad and likely hides such concurrency issues, as pop(key, None) does not raise an exception for a missing key.
To fix this, a threading.Lock should be introduced in NixlKVManager to protect all accesses to self.request_status.
You would need to:
- Add
self.request_status_lock = threading.Lock()inNixlKVManager.__init__. - Wrap accesses to
self.request_statusincheck_statusandupdate_statuswith this lock. You will need to overrideupdate_statusinNixlKVManager. - Update this
clearmethod to use the lock and remove thetry-exceptblock.
def clear(self):
with self.kv_mgr.request_status_lock:
self.kv_mgr.request_status.pop(self.bootstrap_room, None)| def check_status(self, bootstrap_room: int): | ||
| return self.request_status.get(bootstrap_room, KVPoll.Bootstrapping) | ||
|
|
||
| def transfer_worker(self, queue: FastQueue, executor: concurrent.futures.Executor): |
There was a problem hiding this comment.
The executor parameter is passed to transfer_worker but is not used within the function. Similarly, self.executors is created in __init__ but appears to be unused. This seems to be a remnant of a previous design.
To simplify the code, I suggest removing self.executors and the executor parameter.
The __init__ method can be simplified to:
# In NixlKVManager.__init__
...
self.transfer_queues: List[FastQueue] = [
FastQueue() for _ in range(transfer_queue_size)
]
# The self.executors list is not used.
for queue in self.transfer_queues:
threading.Thread(
target=self.transfer_worker, args=(queue,), daemon=True
).start()
self._start_bootstrap_thread()
...And the signature of transfer_worker should be updated accordingly.
| def transfer_worker(self, queue: FastQueue, executor: concurrent.futures.Executor): | |
| def transfer_worker(self, queue: FastQueue): |
|
Could you please share the benchmark that you used, to reproduce the results? |
Hello, thank for the review. We conducted PD separation tests on two machines and one machine. One machine: CUDA_VISIBLE_DEVICES=4,5,6,7 UCX_TLS=^cuda_ipc python3 -m sglang.launch_server Benchmark is the same as 2 machines. The We use NIXL telemetry data to obtain the performance difference. In our experiments with 2machine, we found that this change reduced the average transmission time of the nixl from 25,000 µs to 19,000 µs. |
| kv_chunk: TransferKVChunk = queue.get() | ||
| room = kv_chunk.room | ||
| try: | ||
| if ( |
There was a problem hiding this comment.
Why this check is needed?
Under what circumstances it might be true?
There was a problem hiding this comment.
This check is needed to handle the case where a previous chunk for the same room has already failed in this worker thread.
All chunks for the same room are routed to the same queue (bootstrap_room % len(self.transfer_queues)), so they are processed sequentially by the same worker. If chunk N fails (e.g., send_kvcache raises), the except block marks the room as KVPoll.Failed. When chunk N+1 is dequeued, without this check it would:
- Call self.update_status(room, KVPoll.Transferring) , overwriting the Failed status back to Transferring — breaking status monotonicity and potentially confusing the scheduler.
- Attempt the NIXL transfer again, which would likely fail for the same reason, wasting RDMA resources.
The continue skips all remaining chunks for the failed room. Retries (if any) are handled by the decode-side scheduler with a new room number, so they won't be affected by this check.
| ): | ||
| continue | ||
|
|
||
| if room not in self.transfer_infos: |
There was a problem hiding this comment.
Why this is needed? Maybe just assert is enough?
| raise RuntimeError(f"NIXL transfer encountered ERR room={room}") | ||
| if all(s == "DONE" for s in states): | ||
| break | ||
| time.sleep(0.001) |
There was a problem hiding this comment.
I have tested this polling with my benchmark, and I see that using time.sleep(0) gives even more boost:
# p2d4, best Mean TTFT
num_prompts main PR 20860
# with sleep(0)
128 523 562 372
256 597 485 490
512 1167 954 855
1024 2724 2350 2245
So please consider using time.sleep(0) or maybe even os.sched_yield()
How would that impact your benchmark results?
There was a problem hiding this comment.
Thanks for the benchmark data! We've updated to time.sleep(0) in the latest push.
We've also been experimenting with moving this polling loop into NIXL's C++ layer, which shows even further improvement by avoiding the Python overhead entirely. We may follow up on that in a subsequent PR after more testing.
|
CC: @zackyoray |
|
|
||
| def poll(self) -> KVPoll: | ||
| status = self.kv_mgr.check_status(self.bootstrap_room) | ||
| if not self.has_sent: |
There was a problem hiding this comment.
It looks like we return status here in all cases, so we can just return directly and remove the ifs
| continue | ||
|
|
||
| chunked_dst_kv_indice = req.dst_kv_indices[kv_chunk.index_slice] | ||
| if len(chunked_dst_kv_indice) < len(kv_chunk.prefill_kv_indices): |
There was a problem hiding this comment.
The old code has:
assert len(chunked_dst_kv_indice) == len(kv_indices)
Why was this changed?
|
|
||
| if kv_chunk.is_last: | ||
| if room in self.transfer_infos: | ||
| del self.transfer_infos[room] |
There was a problem hiding this comment.
I see multiple threads modifying and reading self.transfer_infos. Do we need locking? Or pass the data to the workers in a safer way?
|
/tag-and-rerun-ci |
|
Opened #23967 and rebased onto main |
Motivation
This PR improves the performance of NixlKVManager by making KV transfer asynchronous and multi-threaded on the prefill node. Previously,
add_transfer_requestperformed each chunk transfer synchronously and the caller (NixlKVSender) had to track and poll all transfer handles. With many decode instances and chunked transfers, this caused the prefill scheduler to block on transfer completion and limited throughput. This change aligns NIXL with the queue-based, multi-worker transfer design.Performance
We ran Qwen3-32B PD disaggregation with NIXL and observed a clear improvement in transfer latency via NIXL telemetry:
Async multi-worker transfer removes the synchronous bottleneck on the prefill path: chunks are processed in parallel by worker threads, and decode instances are sharded across queues for better overlap, which explains the lower mean and significantly improved tail (P95/P99) latency.
Modifications
Async transfer with queue + worker pool (PREFILL mode)
FastQueueinstances (count controlled bySGLANG_DISAGGREGATION_QUEUE_SIZE) and aThreadPoolExecutorper queue (total worker count fromSGLANG_DISAGGREGATION_THREAD_POOL_SIZE).TransferKVChunkdataclass and daemontransfer_workerthreads that consume chunks from the queues and executesend_kvcache/send_kvcache_slice,maybe_send_extra, andsend_auxin the worker.min(max(4, (0.5 * cpu_count) // 8), 12)when the env var is not set; queue size defaults to env (e.g. 4).Non-blocking
add_transfer_requestadd_transfer_requestno longer performs transfer inline; it enqueues aTransferKVChunktotransfer_queues[bootstrap_room % len(transfer_queues)]and returnsNone.request_status(e.g.Transferring,Success,Failed), so the sender no longer needs to hold or poll transfer handles.NixlKVSender simplifications
xfer_handles;poll()now relies onkv_mgr.check_status(bootstrap_room)only.clear()to removebootstrap_roomfromrequest_statuswhen appropriate.request_statusin the sender; the worker clearstransfer_infosand sets status toSuccesswhen the last chunk is done.Scheduler handling of Bootstrapping
prefill.py, requests inKVPoll.Bootstrappingare now treated as undone (together withWaitingForInputandTransferring) so the scheduler does not consider them complete before transfer progress.Testing
test/registered/disaggregation/test_disaggregation_basic.pylocally: 7 tests OK (229s). Example GSM8K metrics: accuracy 0.725, invalid 0.005, output_throughput ~2908 token/s.Checklist
test_disaggregation_basic.py; 7 tests passed.)Review Process
/tag-run-ci-label,/rerun-failed-ci,/tag-and-rerun-ci)