Skip to content

[Model Runner V2] Feature: Support ElasticEPScalingExecutor for MRv2#43915

Open
yewentao256 wants to merge 7 commits into
mainfrom
wentao-fix-es-v2-bug
Open

[Model Runner V2] Feature: Support ElasticEPScalingExecutor for MRv2#43915
yewentao256 wants to merge 7 commits into
mainfrom
wentao-fix-es-v2-bug

Conversation

@yewentao256

@yewentao256 yewentao256 commented May 28, 2026

Copy link
Copy Markdown
Member

Purpose

VLLM_USE_V2_MODEL_RUNNER=1 pytest tests/distributed/test_elastic_ep.py -k test_elastic_ep_scaling -xvs

Will raise error

(APIServer pid=1758158) (RayWorkerProc pid=1911141) (Worker_DP3_EP3 pid=1911141) ERROR 05-28 14:32:09 [multiproc_executor.py:962]     output = func(*args, **kwargs) [repeated 4x across cluster]
(APIServer pid=1758158) (RayWorkerProc pid=1911141) (Worker_DP3_EP3 pid=1911141) ERROR 05-28 14:32:09 [multiproc_executor.py:962]            ^^^^^^^^^^^^^^^^^^^^^ [repeated 12x across cluster]
(APIServer pid=1758158) (RayWorkerProc pid=1787075) (Worker_DP0_EP0 pid=1787075) ERROR 05-28 14:32:05 [multiproc_executor.py:962]   File "/home/yewentao256/vllm-source/vllm/v1/worker/gpu_worker.py", line 1121, in elastic_ep_execute [repeated 2x across cluster]
(APIServer pid=1758158) (RayWorkerProc pid=1787075) (Worker_DP0_EP0 pid=1787075) ERROR 05-28 14:32:05 [multiproc_executor.py:962]     return self.elastic_ep_executor.execute(execute_method, *args, **kwargs) [repeated 2x across cluster]
(APIServer pid=1758158) (RayWorkerProc pid=1787075) (Worker_DP0_EP0 pid=1787075) ERROR 05-28 14:32:05 [multiproc_executor.py:962]            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ [repeated 2x across cluster]
(APIServer pid=1758158) (RayWorkerProc pid=1787075) (Worker_DP0_EP0 pid=1787075) ERROR 05-28 14:32:05 [multiproc_executor.py:962]   File "/home/yewentao256/vllm-source/vllm/distributed/elastic_ep/elastic_execute.py", line 160, in execute [repeated 2x across cluster]
(APIServer pid=1758158) (RayWorkerProc pid=1787075) (Worker_DP0_EP0 pid=1787075) ERROR 05-28 14:32:05 [multiproc_executor.py:962]     return method(*args, **kwargs) [repeated 2x across cluster]
(APIServer pid=1758158) (RayWorkerProc pid=1911141) (Worker_DP3_EP3 pid=1911141) ERROR 05-28 14:32:09 [multiproc_executor.py:962]                   ^^^^^^^^^^^^^^^^^^^^^^^ [repeated 4x across cluster]
(APIServer pid=1758158) (RayWorkerProc pid=1787075) (Worker_DP0_EP0 pid=1787075) ERROR 05-28 14:32:05 [multiproc_executor.py:962]   File "/home/yewentao256/vllm-source/vllm/distributed/elastic_ep/elastic_execute.py", line 494, in switch_and_prepare [repeated 2x across cluster]
(APIServer pid=1758158) (RayWorkerProc pid=1787075) (Worker_DP0_EP0 pid=1787075) ERROR 05-28 14:32:05 [multiproc_executor.py:962]     multi_block_table = self.worker.model_runner.input_batch.block_table [repeated 2x across cluster]
(APIServer pid=1758158) (RayWorkerProc pid=1787075) (Worker_DP0_EP0 pid=1787075) ERROR 05-28 14:32:05 [multiproc_executor.py:962]                         ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ [repeated 2x across cluster]
(APIServer pid=1758158) (RayWorkerProc pid=1787075) (Worker_DP0_EP0 pid=1787075) ERROR 05-28 14:32:05 [multiproc_executor.py:962] AttributeError: 'GPUModelRunner' object has no attribute 'input_batch' [repeated 2x across cluster]

This is beause the storage of input batch is changed in v2, this PR fixes the issue

Note that ray env is hard to use and causes a lot of troubles, I write this easy test to reproduce quickly:

#!/usr/bin/env python3
# SPDX-License-Identifier: Apache-2.0
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project

import torch

import vllm.distributed.elastic_ep.elastic_execute as elastic_execute_module
from vllm.distributed.elastic_ep.elastic_execute import ElasticEPScalingExecutor


class _FakeWorker:
    def __init__(self, model_runner):
        self.model_runner = model_runner
        self.compile_calls = 0

    def compile_or_warm_up_model(self):
        self.compile_calls += 1


class _FakeV2Row:
    def __init__(self, values):
        self.gpu = torch.tensor(values, dtype=torch.int32)
        self.clear_staged_writes_calls = 0

    def clear_staged_writes(self):
        self.clear_staged_writes_calls += 1


class _FakeNumBlocks:
    def __init__(self, values):
        self.cpu = torch.tensor(values, dtype=torch.int32)
        self.copy_to_uva_calls = 0

    def copy_to_uva(self):
        self.copy_to_uva_calls += 1


class _FakeV2BlockTables:
    def __init__(self):
        self.block_tables = [
            _FakeV2Row([[1, 2], [3, 4]]),
            _FakeV2Row([[5, 6], [7, 8]]),
        ]
        self.input_block_tables = [
            torch.tensor([[9, 10], [11, 12]], dtype=torch.int32),
            torch.tensor([[13, 14], [15, 16]], dtype=torch.int32),
        ]
        self.num_blocks = _FakeNumBlocks([[2, 1], [1, 2]])


class _FakeV2Runner:
    def __init__(self):
        self.block_tables = _FakeV2BlockTables()
        self.max_num_tokens = 32
        self.dummy_run_calls = []

    def _dummy_run(self, num_tokens, *, is_profile, skip_eplb):
        self.dummy_run_calls.append((num_tokens, is_profile, skip_eplb))


class _FakeCpuGpuBuffer:
    def __init__(self, gpu_values, cpu_values):
        self.gpu = torch.tensor(gpu_values, dtype=torch.int32)
        self.cpu = torch.tensor(cpu_values, dtype=torch.int32)


class _FakeV1Row:
    def __init__(self, gpu_values, cpu_values):
        self.block_table = _FakeCpuGpuBuffer(gpu_values, cpu_values)


class _FakeMultiBlockTable:
    def __init__(self):
        self.block_tables = [
            _FakeV1Row([[1, 2], [3, 4]], [[5, 6], [7, 8]]),
            _FakeV1Row([[9, 10], [11, 12]], [[13, 14], [15, 16]]),
        ]
        self.clear_calls = 0

    def clear(self):
        self.clear_calls += 1
        for row in self.block_tables:
            row.block_table.gpu.zero_()
            row.block_table.cpu.zero_()


class _FakeInputBatch:
    def __init__(self):
        self.block_table = _FakeMultiBlockTable()


class _FakeV1Runner:
    def __init__(self):
        self.input_batch = _FakeInputBatch()
        self.max_num_tokens = 16
        self.dummy_run_calls = []

    def _dummy_run(self, num_tokens, *, is_profile, skip_eplb):
        self.dummy_run_calls.append((num_tokens, is_profile, skip_eplb))


def _patch_rewarm_dependencies(monkeypatch):
    monkeypatch.setattr(elastic_execute_module, "lock_workspace", lambda: None)
    monkeypatch.setattr(elastic_execute_module, "unlock_workspace", lambda: None)
    monkeypatch.setattr(
        ElasticEPScalingExecutor, "_release_cuda_graphs", lambda self: None
    )


def test_rewarm_workspace_uses_v2_block_tables(monkeypatch):
    _patch_rewarm_dependencies(monkeypatch)
    worker = _FakeWorker(_FakeV2Runner())
    executor = ElasticEPScalingExecutor(worker)
    block_tables = worker.model_runner.block_tables
    original_rows = [row.gpu.clone() for row in block_tables.block_tables]
    original_input_rows = [row.clone() for row in block_tables.input_block_tables]
    original_num_blocks = block_tables.num_blocks.cpu.clone()

    executor.rewarm_workspace()

    assert worker.model_runner.dummy_run_calls == [(32, True, True)]
    assert worker.compile_calls == 1
    assert [row.gpu.tolist() for row in block_tables.block_tables] == [
        row.tolist() for row in original_rows
    ]
    assert [row.tolist() for row in block_tables.input_block_tables] == [
        row.tolist() for row in original_input_rows
    ]
    assert torch.equal(block_tables.num_blocks.cpu, original_num_blocks)
    assert block_tables.num_blocks.copy_to_uva_calls == 2
    assert [row.clear_staged_writes_calls for row in block_tables.block_tables] == [2, 2]


def test_rewarm_workspace_preserves_v1_block_tables(monkeypatch):
    _patch_rewarm_dependencies(monkeypatch)
    worker = _FakeWorker(_FakeV1Runner())
    executor = ElasticEPScalingExecutor(worker)
    multi_block_table = worker.model_runner.input_batch.block_table
    original_gpu = [
        row.block_table.gpu.clone() for row in multi_block_table.block_tables
    ]
    original_cpu = [
        row.block_table.cpu.clone() for row in multi_block_table.block_tables
    ]

    executor.rewarm_workspace()

    assert worker.model_runner.dummy_run_calls == [(16, True, True)]
    assert worker.compile_calls == 1
    assert multi_block_table.clear_calls == 1
    assert [row.block_table.gpu.tolist() for row in multi_block_table.block_tables] == [
        row.tolist() for row in original_gpu
    ]
    assert [row.block_table.cpu.tolist() for row in multi_block_table.block_tables] == [
        row.tolist() for row in original_cpu
    ]

Originally

FAILED tests/v1/worker/test_elastic_execute.py::test_rewarm_workspace_uses_v2_block_tables - AttributeError: '_FakeV2Runner' object has no attribute 'input_batch'
===================================================== 1 failed, 1 passed, 17 warnings in 1.38s =====================================================

Now

================================= 2 passed, 17 warnings in 1.22s ==================================

Signed-off-by: yewentao256 <zhyanwentao@126.com>
@yewentao256 yewentao256 added the ready ONLY add when PR is ready to merge/full CI is needed label May 28, 2026
@tlrmchlsmth tlrmchlsmth self-assigned this May 28, 2026
@tlrmchlsmth

Copy link
Copy Markdown
Member

fyi @njhill @itayalroy

@itayalroy

Copy link
Copy Markdown
Contributor

Elastic EP already touches too much V1 model-runner internals, which led us to this issue with V2. Instead of now also touching a lot of V2 internals, I think we are better off moving this logic to the model runners themselves. Perhaps they can expose a context manager?

    with model_runner.preserve_serving_state():
        worker.compile_or_warm_up_model()

Also, it seems like for V2, compile_or_warm_up_model() calls warmup_kernels(), which creates synthetic requests with real KV block mappings. If there are existing real requests that are mapped to these blocks, the warmup could overwrite their KV data, we might need to address that too.

Signed-off-by: yewentao256 <zhyanwentao@126.com>
@mergify mergify Bot added the v1 label Jun 3, 2026
Signed-off-by: yewentao256 <zhyanwentao@126.com>
@yewentao256

yewentao256 commented Jun 3, 2026

Copy link
Copy Markdown
Member Author

touching a lot of V2 internals, I think we are better off moving this logic to the model runners themselves.

@itayalroy make sense, already updates to model runner.

For warmup_kernels, this PR doesn't cover, this PR is aimed to fix a current CI issue and should be landed soon, we can have a following up PR instead if there does have an issue (perhaps not as warmup_kernels doesn't overwrite meaningful data)

@tlrmchlsmth tlrmchlsmth left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test for this? Otherwise looks good to me

yewentao256 and others added 2 commits June 4, 2026 15:08
Signed-off-by: yewentao256 <zhyanwentao@126.com>

@yewentao256 yewentao256 left a comment

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tlrmchlsmth Thanks for the review! Done for adding test

@yewentao256 yewentao256 enabled auto-merge (squash) June 9, 2026 18:22
@njhill njhill added the v2 label Jun 13, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ready ONLY add when PR is ready to merge/full CI is needed v1 v2

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants