Skip to content

[ROCm][PD] add moriio kv connector.#29304

Merged
tjtanaa merged 75 commits intovllm-project:mainfrom
inkcherry:upstream_mori_
Jan 9, 2026
Merged

[ROCm][PD] add moriio kv connector.#29304
tjtanaa merged 75 commits intovllm-project:mainfrom
inkcherry:upstream_mori_

Conversation

@inkcherry
Copy link
Copy Markdown
Contributor

@inkcherry inkcherry commented Nov 24, 2025

Purpose

This PR introduces the mori-io KV connector for AMD devices. Built on top of the MORI project, the mori-io connector supports both PULL and PUSH modes for KV Cache transfer. Key features include:

  • Mori backend integration.​

  • Mori-related components (buffer merge &session cache management &batch io).​

  • PULL mode (Serial interaction of prefill and decode).​

  • PUSH mode (Parallel interaction of prefill and decode, with non-blocking layer-wise transfer)

  • A unified proxy example for both push and pull logic.

  • xPyD

  • parallel strategy support

    prefill decode EP support
    TP TP ✔️ ✔️
    DP DP ✔️ ✔️
    TP DP

High Level Design

image Thanks to the well-abstracted vLLM connector interface, we were able to integrate the logic into a single file without affecting other or core files.

Push Mode Implementation Details:

For prefill, we use a dedicated thread to maintain the transfer queue. Layers are enqueued in a layer-wise manner while the forward pass proceeds normally. Once prefill receives the block allocation signal from decode, it asynchronously schedules transfers.

For decode, Requests first send their allocated blocks to the prefill stage and are then placed into a waiting queue. Once the write completion signal is received from the prefill, the request is extracted from the waiting queue and directly scheduled via continuous batching.

This approach introduces no blocking overhead between prefill and decode, and is compatible with both decode graph mode and the chunked prefill feature.

Regarding the transfer process:

  1. Utilizing MORI-IO based RDMA transfer to maximize bandwidth utilization and minimize absolute transfer time in current and future AMD devices.
  2. Implementing layer-wise push mode to overlap computation and transfer within a single request to reduce TTFT.

TODO:

  • Support for more topologies (e.g., heterogeneous topologies)
  • Chunk-wise prefill transfer
  • Leverage the first token of prefill in push mode
  • Reduce notify overhead

Test Plan

device: MI300

Accuracy

  • 1P1D TP8
  • 3P2D TP8
  • 1P1D DP8
  • 1P1D TP8(PULL MODE)
  • 1P1D TP2(Qwen3-32B, non-MLA)

Performance

  • 1P1D TP8(PULL MODE) 2k,4k,8k,16k,32k
  • 1P1D TP8(PUSH MODE) 2k,4k,8k,16k,32k

Accuracy launch scripts

launch proxy :python examples/online_serving/disaggregated_serving/moriio_toy_proxy_server.py
gsm8k task: The GMS8K proxy remains consistent across all tasks. The startup commands for the instance are listed below.

#MODEL_PATH=/nfs/data/Qwen3-32B
MODEL_PATH=/mnt/m2m_nobackup/models/deepseek-ai/DeepSeek-V3
lm_eval --model local-completions --model_args tokenizer=${MODEL_PATH},base_url=http://127.0.0.1:10001/v1/completions,num_concurrent=256,max_retries=1,max_gen_toks=2048 --tasks gsm8k --num_fewshot 5 --batch_size auto  --apply_chat_template

1P1D TP8 or 3P2D TP8
Launch prefill Instance

export VLLM_USE_V1=1    
export VLLM_ROCM_USE_AITER=1 
MODEL_PATH=/mnt/m2m_nobackup/models/deepseek-ai/DeepSeek-V3

vllm serve $MODEL_PATH        \
 -tp 8  \
 --port 20005     \
 --block-size 1          \
 --max-num-batched-tokens 4096         \
 --distributed-executor-backend mp         \
 --gpu_memory_utilization 0.85         \
 --max-model-len 4096         \
 --enforce-eager \
 --max_num_seqs 64\
 --no-enable-prefix-caching \
 --kv-transfer-config '{"kv_connector":"MoRIIOConnector","kv_role":"kv_producer","kv_connector_extra_config":{"proxy_ip":"10.xxx.xxx.xx","proxy_ping_port":"36367","http_port":"20005"}}' 

Launch decode instance

export VLLM_USE_V1=1    
export VLLM_ROCM_USE_AITER=1 
MODEL_PATH=/mnt/m2m_nobackup/models/deepseek-ai/DeepSeek-V3

vllm serve $MODEL_PATH        \
 -tp 8  \
 --port 40005     \
 --block-size 1          \
 --no-enable-prefix-caching \
 --max-num-batched-tokens 4096         \
 --distributed-executor-backend mp         \
 --gpu_memory_utilization 0.85         \
 --max-model-len 8200         \
 --cuda-graph-sizes 1 256   \
 --trust-remote-code \
 --kv-transfer-config '{"kv_connector":"MoRIIOConnector","kv_role":"kv_consumer","kv_connector_extra_config":{"proxy_ip":"10.xxx.xxx.xx","http_port":"40005","proxy_ping_port":"36367"}}' 

1P1D DP8
Launch prefill Instance

export VLLM_ALL2ALL_BACKEND=naive 
export VLLM_USE_V1=1    
export VLLM_ROCM_USE_AITER=1 
MODEL_PATH=/mnt/m2m_nobackup/models/deepseek-ai/DeepSeek-V3
vllm serve $MODEL_PATH        \
 -tp 1  \
 -dp 8     \
 --enable-expert-parallel         \
 --port 20005     \
 --block-size 1          \
 --no-enable-prefix-caching \
 --max-num-batched-tokens 4096         \
 --distributed-executor-backend mp         \
 --gpu_memory_utilization 0.85         \
 --max-model-len 4096         \
 --enforce-eager \
 --max_num_seqs 64\
 --no-enable-prefix-caching \
 --kv-transfer-config '{"kv_connector":"MoRIIOConnector","kv_role":"kv_producer","kv_connector_extra_config":{"proxy_ip":"10.xxx.xxx.xx","proxy_ping_port":"36367","http_port":"20005"}}' \

Launch decode instance

export VLLM_ALL2ALL_BACKEND=naive 
export VLLM_USE_V1=1    
export VLLM_ROCM_USE_AITER=1 

MODEL_PATH=/mnt/m2m_nobackup/models/deepseek-ai/DeepSeek-V3
vllm serve $MODEL_PATH        \
-tp 1  \
-dp 8     \
--enable-expert-parallel         \
--port 40005     \
--block-size 1          \
--no-enable-prefix-caching \
--max-num-batched-tokens 4096         \
--distributed-executor-backend mp         \
--gpu_memory_utilization 0.85         \
--max-model-len 8200         \
--cuda-graph-sizes 1 256         \
--trust-remote-code \
--max_num_seqs 256\
--kv-transfer-config '{"kv_connector":"MoRIIOConnector","kv_role":"kv_consumer","kv_connector_extra_config":{"proxy_ip":"10.xxx.xxx.xx","http_port":"40005","proxy_ping_port":"36367"}}' 

1P1D TP8(PULL MODE)
Launch prefill instance

export MORIIO_CONNECTOR_READ_MODE=1
export VLLM_USE_V1=1    
export VLLM_ROCM_USE_AITER=1 
MODEL_PATH=/mnt/m2m_nobackup/models/deepseek-ai/DeepSeek-V3
vllm serve $MODEL_PATH        \
 -tp 8  \
 --port 20005     \
 --block-size 1          \
 --max-num-batched-tokens 32768         \
 --distributed-executor-backend mp         \
 --gpu_memory_utilization 0.85         \
 --max-model-len 32768         \
 --enforce-eager \
 --no-enable-prefix-caching \
 --kv-cache-dtype fp8  \
 --max_num_seqs 64 \
 --kv-transfer-config '{"kv_connector":"MoRIIOConnector","kv_role":"kv_producer","kv_connector_extra_config":{"proxy_ip":"10.xxx.xxx.xx","http_port":"20005","proxy_ping_port":"36367"}}' 


Launch decode instance

export MORIIO_CONNECTOR_READ_MODE=1
export VLLM_USE_V1=1    
export VLLM_ROCM_USE_AITER=1 
MODEL_PATH=/mnt/m2m_nobackup/models/deepseek-ai/DeepSeek-V3
vllm serve $MODEL_PATH        \
 -tp 8  \
 --port 40005     \
 --block-size 1          \
 --no-enable-prefix-caching \
 --max-num-batched-tokens 32768         \
 --distributed-executor-backend mp         \
 --gpu_memory_utilization 0.85         \
  --max-model-len 32768         \
 --trust-remote-code \
 --kv-cache-dtype fp8  \
 --cuda-graph-sizes 1 256         \
 --kv-transfer-config '{"kv_connector":"MoRIIOConnector","kv_role":"kv_consumer","kv_connector_extra_config":{"proxy_ip":"10.xxx.xxx.xx","http_port":"40005","proxy_ping_port":"36367"}}' 

1P1D TP2(Qwen3-32B, non-MLA)
Launch prefill instance

export VLLM_USE_V1=1    
export VLLM_ROCM_USE_AITER=1 
MODEL_PATH=/nfs/data/Qwen3-32B
vllm serve $MODEL_PATH        \
 -tp 2 \
 --port 20005     \
 --block-size 32         \
 --max-num-batched-tokens 4096         \
 --distributed-executor-backend mp         \
 --gpu_memory_utilization 0.8         \
 --max-model-len 4096         \
 --no-enable-prefix-caching \
      --enforce-eager \
 --kv-transfer-config '{"kv_connector":"MoRIIOConnector","kv_role":"kv_producer","kv_connector_extra_config":{"proxy_ip":"10.194.132.10","proxy_port":"30001","http_port":"20005"}}' 

Launch decode instance

export VLLM_USE_V1=1    
export VLLM_ROCM_USE_AITER=1 
MODEL_PATH=/nfs/data/Qwen3-32B

vllm serve $MODEL_PATH        \
 -tp 2 \
 --port 40005     \
 --block-size 32          \
   --enforce-eager \
 --no-enable-prefix-caching \
 --max-num-batched-tokens 4096         \
 --distributed-executor-backend mp         \
 --gpu_memory_utilization 0.8         \
 --max-model-len 4096         \
 --trust-remote-code \
 --kv-transfer-config '{"kv_connector":"MoRIIOConnector","kv_role":"kv_consumer","kv_connector_extra_config":{"proxy_ip":"10.194.132.10","proxy_port":"30001","http_port":"40005"}}' 

Performance launch scripts

server
The performance test uses the same startup scripts as the accuracy test, with the following key parameters standardized for all performance benchmarks to ensure comparability:

max_model_len: 32k
max_num_batched_tokens: 32k
kv_cache_dtype: fp8

bench

# ISL=2048
# ISL=4096
# ISL=8192
# ISL=16384
ISL=32760
OSL=5 
RATIO=0
PORT=10001
CONCURRENCY=1 
MODEL_PATH=/mnt/m2m_nobackup/models/deepseek-ai/DeepSeek-V3
PROMPTS=1
      vllm bench serve  \
        --dataset-name random \
        --model  $MODEL_PATH \
        --random-input-len $ISL \
        --random-output-len $OSL \
        --tokenizer $MODEL_PATH \
        --num-prompt $PROMPTS \
        --random-range-ratio $RATIO \
        --base-url "http://127.0.0.1:$PORT" \
        --backend vllm \
        --max-concurrency $CONCURRENCY \

Test Result

Accuracy:
1P1D TP8

Tasks Version Filter n-shot Metric Value Stderr
gsm8k 3 flexible-extract 5 exact_match 0.9424 ± 0.0064
strict-match 5 exact_match 0.8082 ± 0.0108

3P2D TP8

Tasks Version Filter n-shot Metric Value Stderr
gsm8k 3 flexible-extract 5 exact_match 0.9439 ± 0.0063
strict-match 5 exact_match 0.8127 ± 0.0107

1P1D DP8

Tasks Version Filter n-shot Metric Value Stderr
gsm8k 3 flexible-extract 5 exact_match 0.9447 ± 0.0063
strict-match 5 exact_match 0.8105 ± 0.0108

1P1D TP8(PULL MODE, fp8 kv cache dtype)

Tasks Version Filter n-shot Metric Value Stderr
gsm8k 3 flexible-extract 5 exact_match 0.9310 ± 0.0070
strict-match 5 exact_match 0.8635 ± 0.0095

1P1D TP2(Qwen3-32B, non-MLA)

Tasks Version Filter n-shot Metric Value Stderr
gsm8k 3 flexible-extract 5 exact_match 0.9439 ± 0.0063
strict-match 5 exact_match 0.9325 ± 0.0069

Performance:
1P1D TP8(PULL/PUSH MODE)
TTFT

Input Length PULL MODE PUSH MODE
2k 207ms 194ms
4k 340ms 311ms
8k 584ms 536ms
16k 1184ms 1090ms
32k 2758ms 2614ms

Prerequisites


Essential Elements of an Effective PR Description Checklist
  • The purpose of the PR, such as "Fix some issue (link existing issues this PR will resolve)".
  • The test plan, such as providing test command.
  • The test results, such as pasting the results comparison before and after, or e2e results
  • (Optional) The necessary documentation update, such as updating supported_models.md and examples for a new model.
  • (Optional) Release notes update. If your change is user facing, please update the release notes draft in the Google Doc.
cc @maning00 @TianDi101 @carlushuang @Duyi-Wang

Note

Cursor Bugbot is generating a summary for commit 7164614. Configure here.


Note

Cursor Bugbot is generating a summary for commit 7e1d978. Configure here.


Note

Enables RDMA-based disaggregated KV transfer on ROCm via MORI.

  • Adds MoRIIOConnector (scheduler/worker) under vllm/distributed/kv_transfer/kv_connector/v1/moriio/* with ZMQ handshakes, notify flow, READ/PUSH modes, TP/DP support, and session/offset management
  • Registers connector in kv_connector/factory.py and adds env toggles (VLLM_MORIIO_*) in vllm/envs.py
  • Provides a toy proxy server examples/online_serving/disaggregated_serving/moriio_toy_proxy_server.py for prefill/decode service discovery and request routing
  • Updates ROCm install docs to include optional MORI setup and adjusts build steps
  • Extends docker/Dockerfile.rocm_base to build/install MORI (new args/envs, deps) and bundle its wheel
  • Adds unit tests for handshake, cache registration, and metadata flow in tests/v1/kv_connector/unit/test_moriio_connector.py

Written by Cursor Bugbot for commit 7e1d978. This will update automatically on new commits. Configure here.

@mergify
Copy link
Copy Markdown
Contributor

mergify Bot commented Nov 24, 2025

Documentation preview: https://vllm--29304.org.readthedocs.build/en/29304/

@mergify mergify Bot added documentation Improvements or additions to documentation frontend rocm Related to AMD ROCm kv-connector labels Nov 24, 2025
@inkcherry inkcherry changed the title [ROCM][PD] add moriio kv connector. [ROCm][PD] add moriio kv connector. Nov 24, 2025
Signed-off-by: inkcherry <mingzhi.liu@amd.com>
Signed-off-by: inkcherry <mingzhi.liu@amd.com>
Signed-off-by: inkcherry <mingzhi.liu@amd.com>
Signed-off-by: inkcherry <mingzhi.liu@amd.com>
Signed-off-by: inkcherry <mingzhi.liu@amd.com>
Signed-off-by: inkcherry <mingzhi.liu@amd.com>
Signed-off-by: inkcherry <mingzhi.liu@amd.com>
Signed-off-by: inkcherry <mingzhi.liu@amd.com>
Signed-off-by: inkcherry <mingzhi.liu@amd.com>
Signed-off-by: inkcherry <mingzhi.liu@amd.com>
Signed-off-by: inkcherry <mingzhi.liu@amd.com>
Signed-off-by: inkcherry <mingzhi.liu@amd.com>
Signed-off-by: inkcherry <mingzhi.liu@amd.com>
Signed-off-by: inkcherry <mingzhi.liu@amd.com>
Signed-off-by: inkcherry <mingzhi.liu@amd.com>
Signed-off-by: inkcherry <mingzhi.liu@amd.com>
Signed-off-by: inkcherry <mingzhi.liu@amd.com>
Signed-off-by: inkcherry <mingzhi.liu@amd.com>
Signed-off-by: inkcherry <mingzhi.liu@amd.com>
Signed-off-by: inkcherry <mingzhi.liu@amd.com>
Signed-off-by: inkcherry <mingzhi.liu@amd.com>
Signed-off-by: inkcherry <mingzhi.liu@amd.com>
Signed-off-by: inkcherry <mingzhi.liu@amd.com>
Signed-off-by: inkcherry <mingzhi.liu@amd.com>
@kouroshHakha
Copy link
Copy Markdown
Collaborator

Hi @inkcherry, have you also tested with ray executor backend?

Copy link
Copy Markdown
Collaborator

@kouroshHakha kouroshHakha left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some more qs:

Comment on lines +241 to +246
req_data_to_prefill["kv_transfer_params"]["remote_dp_size"] = (
decode_instance_endpoint["dp_size"]
)
req_data_to_prefill["kv_transfer_params"]["remote_tp_size"] = (
decode_instance_endpoint["tp_size"]
)
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why does P need to know the dp and tp size of D for transfer?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is due to the RDMA-based push(write) mode.
If using pull (read) mode, this is not required.

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you also provide a proxy example for the pull mode?

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kouroshHakha do you have any other comments other than this example request?

Let's add this example in a follow up PR. In this PR @inkcherry has added an example script examples/online_serving/disaggregated_serving/moriio_toy_proxy_server.py.

Copy link
Copy Markdown
Contributor Author

@inkcherry inkcherry Jan 5, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both pull mode and push mode use a unified proxy. For pull mode, you can refer to 1P1D TP8 (PULL MODE) in the test plan of this pr.



async def send_request_to_prefill(
endpoint, req_data, request_id, p_endpoint, pip, pports, selected_prefill_dp_rank
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is there a p_endpoint? Isn't that supposed to be the d_endpoint?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, thanks, renamed

@inkcherry
Copy link
Copy Markdown
Contributor Author

inkcherry commented Jan 4, 2026

Hi @inkcherry, have you also tested with ray executor backend?

This PR has not been tested, but I think this shouldn't be a problem, further testing will be conducted subsequently.

Signed-off-by: inkcherry <mingzhi.liu@amd.com>
@mergify
Copy link
Copy Markdown
Contributor

mergify Bot commented Jan 4, 2026

This pull request has merge conflicts that must be resolved before it can be
merged. Please rebase the PR, @inkcherry.

https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/working-with-forks/syncing-a-fork

@mergify mergify Bot added the needs-rebase label Jan 4, 2026
Signed-off-by: inkcherry <mingzhi.liu@amd.com>
@mergify mergify Bot removed the needs-rebase label Jan 4, 2026
except UnicodeDecodeError:
logger.warning("Received non-UTF8 message: %s", msg_str)
if not handled:
raise MoRIIOError(f"Unhandled message format: {msg_str}")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Undefined variable when UnicodeDecodeError is raised

High Severity

In _handle_message, when msg.decode("UTF-8") raises a UnicodeDecodeError, the variable msg_str is never assigned because the exception occurs during its assignment. However, the exception handler on line 532 and line 534 both try to use msg_str, which will cause a NameError crash. The logging statement should use msg directly or handle the case where msg_str is not defined.

🔬 Verification Test

Why verification test was not possible: This code path requires the MORI library which is only available on ROCm hardware, and requires simulating receiving a non-UTF8 message through ZMQ. The bug is apparent from static analysis - the variable msg_str is assigned inside the try block on line 527, but if that assignment raises UnicodeDecodeError, the except block tries to use msg_str which was never assigned.

Fix in Cursor Fix in Web


# In dp(prefill)<->dp(decode) communication, we require an all-to-all handshake.

for cur_dp_rank in range(remote_dp_size):
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Undefined variables when handshake future already exists

High Severity

In _background_moriio_handshake, the variables host, port, tp_size, and remote_dp_size are only assigned inside the if fut is None block. However, these variables are used unconditionally on line 1036 in range(remote_dp_size) and line 1039. If a handshake future already exists for this remote_engine_id, fut will not be None, the variables won't be assigned, and the function will crash with a NameError. This can occur when a second request for the same remote engine arrives while handshaking is in progress.

🔬 Verification Test

Why verification test was not possible: This requires the MORI library on ROCm hardware and simulating a race condition where two requests for the same remote engine are processed concurrently. The bug is clear from static analysis - if the if fut is None block is not entered, remote_dp_size (and other variables) are never defined, but range(remote_dp_size) is called unconditionally afterward.

Fix in Cursor Fix in Web

moriio_mem_metadata
)

self.local_kv_cache_size.append(cache.nelement() * cache.element_size())
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong variable used for KV cache size calculation

Medium Severity

In register_kv_caches, line 1120 uses cache from a previous loop instead of kv_cache from the current iteration. The variable cache was last assigned in the loop on lines 1103-1109, and always points to the last cache of the last layer processed. This causes incorrect size calculations where all layers are recorded with the same size from the final cache, rather than their actual sizes. The line should use kv_cache.nelement() * kv_cache.element_size().

🔬 Verification Test

Why verification test was not possible: This requires the MORI library on ROCm hardware. The bug is clear from code inspection - the loop at line 1111 iterates over layer_name, kv_cache, but line 1120 uses cache which is from the earlier loop (lines 1103-1109).

Fix in Cursor Fix in Web



def example_round_robin_dp_loader(request_number, dp_size):
return request_nums % dp_size
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function ignores parameter, uses global instead

Medium Severity

The function example_round_robin_dp_loader accepts a parameter request_number but ignores it entirely, instead using the global variable request_nums. This makes the function parameter meaningless and the call on line 231 passes a calculated value that is never used. The function body should use request_number instead of request_nums.

🔬 Verification Test

Test code:

# Test to verify the bug
request_nums = 10  # Global

def example_round_robin_dp_loader(request_number, dp_size):
    return request_nums % dp_size  # Bug: uses global, not parameter

# The parameter is ignored - result is based on global `request_nums`, not `request_number`
result = example_round_robin_dp_loader(5, 3)
print(f"Called with request_number=5, dp_size=3")
print(f"Expected: 5 % 3 = 2")
print(f"Actual: {result} (because it uses global request_nums={request_nums})")
print(f"Bug confirmed: {result != 2}")

Command run:

python3 -c "
request_nums = 10
def example_round_robin_dp_loader(request_number, dp_size):
    return request_nums % dp_size
result = example_round_robin_dp_loader(5, 3)
print(f'Called with request_number=5, dp_size=3')
print(f'Expected: 5 % 3 = 2')
print(f'Actual: {result} (uses global request_nums={request_nums})')
print(f'Bug confirmed: {result != 2}')
"

Output:

Called with request_number=5, dp_size=3
Expected: 5 % 3 = 2
Actual: 1 (uses global request_nums=10)
Bug confirmed: True

Why this proves the bug: The function returns 1 (10 % 3) instead of 2 (5 % 3), proving it uses the global request_nums instead of the passed parameter request_number.

Fix in Cursor Fix in Web

target=self._write_worker_loop, daemon=True, name="moriio-write-worker"
)
thread.start()
logger.info("Started MoRIIO write worker thread")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race condition in write worker thread initialization

Medium Severity

In ensure_worker_started, there's a TOCTOU (time-of-check-time-of-use) race condition. The check if self._write_worker_started and the flag assignment self._write_worker_started = True both happen outside the lock. Multiple threads could pass the initial check simultaneously, then each would set the flag and proceed to acquire the lock and start separate worker threads. The flag should be checked and set inside the lock with a double-check pattern.

🔬 Verification Test

Why verification test was not possible: Reproducing race conditions reliably requires precise timing control and is non-deterministic. However, the bug is clear from code structure - lines 95-97 check and set the flag before acquiring the lock on line 98, creating a classic TOCTOU vulnerability.

Fix in Cursor Fix in Web

)
break
else:
break
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Infinite loop when no requests need saving

High Severity

In save_kv_layer, when metadata.reqs_to_save is empty, remote_engine_id remains None from its initialization. The while loop at line 1249 then checks remote_engine_id not in self.write_ready_flags, which is always True when remote_engine_id is None. Combined with an empty _ready_requests queue, the first condition is satisfied indefinitely, causing an infinite spin loop that will hang the entire system. Unlike start_load_kv which has a wait_handshake_readd_req guard, this function lacks such protection.

🔬 Verification Test

Why verification test was not possible: This requires the MORI library on ROCm hardware and simulating a forward pass with no KV transfer requests. The bug is clear from static analysis - when reqs_to_save is empty, remote_engine_id stays None, and the while loop condition empty() and None not in dict evaluates to True, causing infinite continue iterations.

Fix in Cursor Fix in Web

selected_prefill_dp_rank = None
if prefill_instance_endpoint["dp_size"] > 1:
selected_prefill_dp_rank = example_round_robin_dp_loader(
request_nums // len(prefill_instance_endpoint),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

len() called on dictionary instead of list

Medium Severity

The expression len(prefill_instance_endpoint) is called on a dictionary (the endpoint configuration dict containing keys like "dp_size", "tp_size", etc.), not on the list of prefill instances. This returns the number of dictionary keys (around 7-8) rather than the intended number of prefill instances. The load balancing calculation is therefore incorrect. This should likely be len(prefill_instances) to get the actual instance count.

🔬 Verification Test

Test code:

# Simulating the proxy server data structures
prefill_instance_endpoint = {
    "dp_size": 2,
    "tp_size": 4,
    "request_address": "http://10.0.0.1:20005/v1/completions",
    "handshake_port": 6301,
    "notify_port": 61005,
}
prefill_instances = [prefill_instance_endpoint]  # List with 1 instance

request_nums = 100
# Bug: uses dict length instead of list length
result = request_nums // len(prefill_instance_endpoint)
correct_result = request_nums // len(prefill_instances)

print(f"len(prefill_instance_endpoint) = {len(prefill_instance_endpoint)} (dict keys)")
print(f"len(prefill_instances) = {len(prefill_instances)} (actual instances)")
print(f"Bug result: {result}, Correct result: {correct_result}")

Command run:

python3 -c "
prefill_instance_endpoint = {'dp_size': 2, 'tp_size': 4, 'request_address': 'http://10.0.0.1:20005', 'handshake_port': 6301, 'notify_port': 61005}
prefill_instances = [prefill_instance_endpoint]
request_nums = 100
result = request_nums // len(prefill_instance_endpoint)
correct_result = request_nums // len(prefill_instances)
print(f'len(prefill_instance_endpoint) = {len(prefill_instance_endpoint)} (dict keys)')
print(f'len(prefill_instances) = {len(prefill_instances)} (actual instances)')
print(f'Bug result: {result}, Correct result: {correct_result}')
"

Output:

len(prefill_instance_endpoint) = 5 (dict keys)
len(prefill_instances) = 1 (actual instances)
Bug result: 20, Correct result: 100

Why this proves the bug: The code uses the dictionary's key count (5) instead of the list's length (1), producing completely different load balancing values.

Fix in Cursor Fix in Web

if new_block_ids is not None:
block_ids = new_block_ids[0]
# TODO : hybrid attn, etc
req, existing_blocks = self._reqs_need_pending_save[req_id]
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing key check causes KeyError for non-transfer requests

High Severity

In build_connector_meta, the code iterates over all requests in scheduled_cached_reqs.req_ids (which includes ALL running and resumed requests), then directly accesses self._reqs_need_pending_save[req_id] without checking if the key exists. However, _reqs_need_pending_save only contains requests that have do_remote_decode=True and are in chunked prefill mode. For any regular request without KV transfer enabled, this will crash with a KeyError. The code needs to check if req_id in self._reqs_need_pending_save before accessing it.

🔬 Verification Test

Why verification test was not possible: This requires setting up a full vLLM scheduler environment with the MORI connector and having a mix of requests with and without do_remote_decode enabled. The bug is clear from static analysis - scheduled_cached_reqs.req_ids contains all cached requests from _make_cached_request_data (which processes all running_reqs and resumed_reqs), while _reqs_need_pending_save only contains the subset of requests added via update_state_after_alloc when do_remote_decode=True and they are in chunked prefill mode.

Fix in Cursor Fix in Web



def example_round_robin_dp_loader(request_number, dp_size):
return request_nums % dp_size
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Function ignores parameter and uses global variable instead

Medium Severity

The function example_round_robin_dp_loader accepts a parameter request_number but completely ignores it, instead using the global variable request_nums. This makes the function's parameter meaningless and could cause incorrect round-robin behavior when the caller expects the passed value to be used.

Fix in Cursor Fix in Web

selected_prefill_dp_rank = None
if prefill_instance_endpoint["dp_size"] > 1:
selected_prefill_dp_rank = example_round_robin_dp_loader(
request_nums // len(prefill_instance_endpoint),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using len() on dict instead of list for sizing

High Severity

The code calls len(prefill_instance_endpoint) where prefill_instance_endpoint is a single dict (one prefill instance), not the list of all instances. This returns the number of keys in the dict rather than the number of prefill instances. The intent appears to be len(prefill_instances) to distribute requests across multiple prefill instances.

Fix in Cursor Fix in Web

except UnicodeDecodeError:
logger.warning("Received non-UTF8 message: %s", msg_str)
if not handled:
raise MoRIIOError(f"Unhandled message format: {msg_str}")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable undefined when UnicodeDecodeError is raised

High Severity

If msg.decode("UTF-8") on line 527 raises a UnicodeDecodeError, the variable msg_str is never assigned. The except block on line 532 attempts to log msg_str, and line 534 also references it. This causes a NameError to be raised instead of the intended MoRIIOError, masking the actual problem and potentially crashing the message handling thread.

Fix in Cursor Fix in Web

for cur_dp_rank in range(remote_dp_size):
dp_engine_id = self.get_engine_name_with_dp(remote_engine_id, cur_dp_rank)
future = self._handshake_initiation_executor.submit(
self._moriio_handshake, host, port, tp_size, dp_engine_id, cur_dp_rank
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variables undefined when handshake future already exists

High Severity

The variables host, port, tp_size, and remote_dp_size are only assigned inside the if fut is None block (lines 1020-1024), but they are used unconditionally on lines 1036-1039. If a handshake future already exists in _handshake_futures (i.e., fut is not None), these variables will be undefined, causing a NameError when the code attempts to iterate range(remote_dp_size) or submit the handshake.

Fix in Cursor Fix in Web

target=self._write_worker_loop, daemon=True, name="moriio-write-worker"
)
thread.start()
logger.info("Started MoRIIO write worker thread")
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Race condition in worker thread startup check

Medium Severity

The ensure_worker_started method checks and sets _write_worker_started (lines 95-97) before acquiring _write_worker_lock. If two threads call this method concurrently, both could pass the initial check before either sets the flag to True, resulting in multiple background worker threads being started. The flag check and set should occur inside the lock.

Fix in Cursor Fix in Web

moriio_mem_metadata
)

self.local_kv_cache_size.append(cache.nelement() * cache.element_size())
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong variable used for cache size calculation

High Severity

The line uses cache (a leftover variable from the previous nested loop at lines 1103-1109) instead of kv_cache (the current loop variable from line 1111). This causes local_kv_cache_size to be populated with the size of the last cache from the earlier loop for every layer, rather than each layer's actual cache size. This could lead to incorrect size calculations and potential data corruption during KV cache transfers.

Fix in Cursor Fix in Web

@tjtanaa tjtanaa enabled auto-merge (squash) January 9, 2026 08:26
@tjtanaa tjtanaa merged commit 4505849 into vllm-project:main Jan 9, 2026
58 checks passed
@gshtras
Copy link
Copy Markdown
Collaborator

gshtras commented Jan 12, 2026

Add @gshtras in this thread loop as there is an update on the Dockerfile.rocm_base and this will need to propagate to the rocm/vllm-dev:base later. Moreover, we need to propagate the rocm/vllm-dev:base to test this PR with its new unit tests.

Is building from source a preferred way of installing this ROCm component?

# Install Python and other dependencies
RUN apt-get update -y \
&& apt-get install -y software-properties-common git curl sudo vim less libgfortran5 \
&& apt-get install -y software-properties-common git curl sudo vim less libgfortran5 libopenmpi-dev libpci-dev \
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these libraries needed in runtime or just in the build phase?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

runtime

@inkcherry
Copy link
Copy Markdown
Contributor Author

Add @gshtras in this thread loop as there is an update on the Dockerfile.rocm_base and this will need to propagate to the rocm/vllm-dev:base later. Moreover, we need to propagate the rocm/vllm-dev:base to test this PR with its new unit tests.

Is building from source a preferred way of installing this ROCm component?

Currently, building from source is the recommended method. We will update to the pip method later.

akh64bit pushed a commit to akh64bit/vllm that referenced this pull request Jan 16, 2026
Signed-off-by: inkcherry <mingzhi.liu@amd.com>
@rasmith
Copy link
Copy Markdown
Contributor

rasmith commented Feb 3, 2026

@inkcherry Trying to get this to work, but it just seems to hang. In particular, it looks like the decode service is hanging.
Here are the commands I'm using (note: I'm using Justfile for this):

MODEL := "meta-llama/Llama-3.1-70B-Instruct"
TP_SIZE := "1"
ISL := "1024"
OSL := "5"
RATIO := "0"
PORT := "10001"
CONCURRENCY := "1"
PROMPTS := "1"

mori_proxy:
  python examples/online_serving/disaggregated_serving/moriio_toy_proxy_server.py

mori_serve_prefill:
  CUDA_VISIBLE_DEVICES={{PREFILL_GPUS}} \
  MORIIO_CONNECTOR_READ_MODE=1 \
  VLLM_USE_V1=1 \
  VLLM_ROCM_USE_AITER=1 \
  vllm serve {{MODEL}}        \
   -tp {{TP_SIZE}}  \
   --port 20005     \
   --max-num-batched-tokens 32768         \
   --distributed-executor-backend mp         \
   --gpu_memory_utilization 0.85         \
   --max-model-len 32768         \
   --enforce-eager \
   --no-enable-prefix-caching \
   --kv-cache-dtype fp8  \
   --max_num_seqs 64 \
   --kv-transfer-config '{"kv_connector":"MoRIIOConnector","kv_role":"kv_producer","kv_connector_extra_config":{"proxy_ip":"10.x.x.x","http_port":"20005","proxy_ping_port":"36367"}}'


mori_serve_decode:
  CUDA_VISIBLE_DEVICES={{DECODE_GPUS}} \
  MORIIO_CONNECTOR_READ_MODE=1 \
  VLLM_USE_V1=1 \
  VLLM_ROCM_USE_AITER=1 \
  vllm serve {{MODEL}} \
     -tp {{TP_SIZE}}  \
     --port 40005     \
     --no-enable-prefix-caching \
     --max-num-batched-tokens 4096         \
     --distributed-executor-backend mp         \
     --gpu_memory_utilization 0.85         \
     --max-model-len 8200         \
     --trust-remote-code \
     --compilation-config='{"cudagraph_capture_sizes": [1, 256]}' \
     --kv-transfer-config '{"kv_connector":"MoRIIOConnector","kv_role":"kv_consumer","kv_connector_extra_config":{"proxy_ip":"10.x.x.x","http_port":"40005","proxy_ping_port":"36367"}}'

send_request:
  curl -X POST http://localhost:10001/v1/completions \
    -H "Content-Type: application/json" \
    -d '{ \
      "model": "{{MODEL}}", \
      "prompt": "Give me a recipe for tomato soup", \
      "max_tokens": 150, \
      "temperature": 0.7 \
    }'

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ci/build documentation Improvements or additions to documentation frontend kv-connector ready ONLY add when PR is ready to merge/full CI is needed rocm Related to AMD ROCm v1

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants