Skip to content

[Data/LLM] Runai Streamer integration error with vLLM 0.10.2 #56905

@jiangwu300

Description

@jiangwu300

What happened + What you expected to happen

The runai_streamer fix I made had an integration issue with vLLM where we are passing in the local directory as the actual model instead of the s3 path, see issue below. In addition, there seems to be 2 layers of downloading model files, and the first layer that happens in vllm_engine_proc.py was missed in the original PR: #55662. This makes it so that models like Deepseek which rely on a configuration_deepseek.py file will fail when trying to load in.

import ray
from ray.data.llm import vLLMEngineProcessorConfig, build_llm_processor

config = vLLMEngineProcessorConfig(
    model_source="s3://path/qwen3-32b/",
    engine_kwargs={
        "enable_chunked_prefill": True,
        "max_num_batched_tokens": 4096,
        "max_model_len": 16384,
        "load_format": "runai_streamer",
        "tensor_parallel_size": 1,
    },
    concurrency=1,
    batch_size=64,
)
processor = build_llm_processor(
    config,
    preprocess=lambda row: dict(
        messages=[
            {"role": "system", "content": "You are a bot that responds with haikus."},
            {"role": "user", "content": row["item"]}
        ],
        sampling_params=dict(
            temperature=0.3,
            max_tokens=250,
        )
    ),
    postprocess=lambda row: dict(
        answer=row["generated_text"],
        **row  # This will return all the original columns in the dataset.
    ),
)

ds = ray.data.from_items(["Start of the haiku is: Complete this for me..."])

ds = processor(ds)
ds.show(limit=1)

And the error:

(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) INFO 09-23 17:01:42 [gpu_model_runner.py:2338] Starting to load model /home/ray/.cache/huggingface/hub/models--s3:----cclim-rsch-databricks-storage--data--project_data--ccllm--qwen3-32b--/snapshots/0000000000000000000000000000000000000000...
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) [W923 17:01:42.510487211 ProcessGroupNCCL.cpp:981] Warning: TORCH_NCCL_AVOID_RECORD_STREAMS is the default now, this environment variable is thus deprecated. (function operator())
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) INFO 09-23 17:01:42 [gpu_model_runner.py:2370] Loading model from scratch...
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) INFO 09-23 17:01:42 [cuda.py:362] Using Flash Attention backend on V1 engine.
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585] WorkerProc failed to start.
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585] Traceback (most recent call last):
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585]   File "/home/ray/anaconda3/lib/python3.11/site-packages/vllm/v1/executor/multiproc_executor.py", line 559, in worker_main
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585]     worker = WorkerProc(*args, **kwargs)
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585]              ^^^^^^^^^^^^^^^^^^^^^^^^^^^
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585]   File "/home/ray/anaconda3/lib/python3.11/site-packages/vllm/v1/executor/multiproc_executor.py", line 427, in __init__
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585]     self.worker.load_model()
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585]   File "/home/ray/anaconda3/lib/python3.11/site-packages/vllm/v1/worker/gpu_worker.py", line 213, in load_model
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585]     self.model_runner.load_model(eep_scale_up=eep_scale_up)
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585]   File "/home/ray/anaconda3/lib/python3.11/site-packages/vllm/v1/worker/gpu_model_runner.py", line 2371, in load_model
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585]     self.model = model_loader.load_model(
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585]                  ^^^^^^^^^^^^^^^^^^^^^^^^
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585]   File "/home/ray/anaconda3/lib/python3.11/site-packages/vllm/model_executor/model_loader/base_loader.py", line 50, in load_model
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585]     self.load_weights(model, model_config)
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585]   File "/home/ray/anaconda3/lib/python3.11/site-packages/vllm/model_executor/model_loader/runai_streamer_loader.py", line 104, in load_weights
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585]     self._get_weights_iterator(model_weights, model_config.revision))
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585]     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585]   File "/home/ray/anaconda3/lib/python3.11/site-packages/vllm/model_executor/model_loader/runai_streamer_loader.py", line 87, in _get_weights_iterator
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585]     hf_weights_files = self._prepare_weights(model_or_path, revision)
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585]                        ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585]   File "/home/ray/anaconda3/lib/python3.11/site-packages/vllm/model_executor/model_loader/runai_streamer_loader.py", line 77, in _prepare_weights
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585]     raise RuntimeError(
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585] RuntimeError: Cannot find any safetensors model weights with `/home/ray/.cache/huggingface/hub/models--s3:----cclim-rsch-databricks-storage--data--project_data--ccllm--qwen3-32b--/snapshots/0000000000000000000000000000000000000000`
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) INFO 09-23 17:01:43 [multiproc_executor.py:546] Parent process exited, terminating worker

Ray is on nightly docker image, installed vllm 0.10.2 and added in the fix made by lengrongfu.

Versions / Dependencies

Ray nightly version
vllm 0.10.2 with fix for runai_streamer

Reproduction script

import ray
from ray.data.llm import vLLMEngineProcessorConfig, build_llm_processor

config = vLLMEngineProcessorConfig(
    model_source="s3://path/qwen3-32b/",
    engine_kwargs={
        "enable_chunked_prefill": True,
        "max_num_batched_tokens": 4096,
        "max_model_len": 16384,
        "load_format": "runai_streamer",
        "tensor_parallel_size": 1,
    },
    concurrency=1,
    batch_size=64,
)
processor = build_llm_processor(
    config,
    preprocess=lambda row: dict(
        messages=[
            {"role": "system", "content": "You are a bot that responds with haikus."},
            {"role": "user", "content": row["item"]}
        ],
        sampling_params=dict(
            temperature=0.3,
            max_tokens=250,
        )
    ),
    postprocess=lambda row: dict(
        answer=row["generated_text"],
        **row  # This will return all the original columns in the dataset.
    ),
)

ds = ray.data.from_items(["Start of the haiku is: Complete this for me..."])

ds = processor(ds)
ds.show(limit=1)

Issue Severity

Medium: It is a significant difficulty but I can work around it.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething that is supposed to be working; but isn'tcommunity-backlogdataRay Data-related issuesllmstabilitytriageNeeds triage (eg: priority, bug/not-bug, and owning component)

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions