-
Notifications
You must be signed in to change notification settings - Fork 7.4k
[Data/LLM] Runai Streamer integration error with vLLM 0.10.2 #56905
Copy link
Copy link
Closed
Labels
bugSomething that is supposed to be working; but isn'tSomething that is supposed to be working; but isn'tcommunity-backlogdataRay Data-related issuesRay Data-related issuesllmstabilitytriageNeeds triage (eg: priority, bug/not-bug, and owning component)Needs triage (eg: priority, bug/not-bug, and owning component)
Description
What happened + What you expected to happen
The runai_streamer fix I made had an integration issue with vLLM where we are passing in the local directory as the actual model instead of the s3 path, see issue below. In addition, there seems to be 2 layers of downloading model files, and the first layer that happens in vllm_engine_proc.py was missed in the original PR: #55662. This makes it so that models like Deepseek which rely on a configuration_deepseek.py file will fail when trying to load in.
import ray
from ray.data.llm import vLLMEngineProcessorConfig, build_llm_processor
config = vLLMEngineProcessorConfig(
model_source="s3://path/qwen3-32b/",
engine_kwargs={
"enable_chunked_prefill": True,
"max_num_batched_tokens": 4096,
"max_model_len": 16384,
"load_format": "runai_streamer",
"tensor_parallel_size": 1,
},
concurrency=1,
batch_size=64,
)
processor = build_llm_processor(
config,
preprocess=lambda row: dict(
messages=[
{"role": "system", "content": "You are a bot that responds with haikus."},
{"role": "user", "content": row["item"]}
],
sampling_params=dict(
temperature=0.3,
max_tokens=250,
)
),
postprocess=lambda row: dict(
answer=row["generated_text"],
**row # This will return all the original columns in the dataset.
),
)
ds = ray.data.from_items(["Start of the haiku is: Complete this for me..."])
ds = processor(ds)
ds.show(limit=1)And the error:
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) INFO 09-23 17:01:42 [gpu_model_runner.py:2338] Starting to load model /home/ray/.cache/huggingface/hub/models--s3:----cclim-rsch-databricks-storage--data--project_data--ccllm--qwen3-32b--/snapshots/0000000000000000000000000000000000000000...
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) [W923 17:01:42.510487211 ProcessGroupNCCL.cpp:981] Warning: TORCH_NCCL_AVOID_RECORD_STREAMS is the default now, this environment variable is thus deprecated. (function operator())
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) INFO 09-23 17:01:42 [gpu_model_runner.py:2370] Loading model from scratch...
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) INFO 09-23 17:01:42 [cuda.py:362] Using Flash Attention backend on V1 engine.
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585] WorkerProc failed to start.
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585] Traceback (most recent call last):
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585] File "/home/ray/anaconda3/lib/python3.11/site-packages/vllm/v1/executor/multiproc_executor.py", line 559, in worker_main
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585] worker = WorkerProc(*args, **kwargs)
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585] ^^^^^^^^^^^^^^^^^^^^^^^^^^^
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585] File "/home/ray/anaconda3/lib/python3.11/site-packages/vllm/v1/executor/multiproc_executor.py", line 427, in __init__
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585] self.worker.load_model()
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585] File "/home/ray/anaconda3/lib/python3.11/site-packages/vllm/v1/worker/gpu_worker.py", line 213, in load_model
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585] self.model_runner.load_model(eep_scale_up=eep_scale_up)
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585] File "/home/ray/anaconda3/lib/python3.11/site-packages/vllm/v1/worker/gpu_model_runner.py", line 2371, in load_model
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585] self.model = model_loader.load_model(
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585] ^^^^^^^^^^^^^^^^^^^^^^^^
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585] File "/home/ray/anaconda3/lib/python3.11/site-packages/vllm/model_executor/model_loader/base_loader.py", line 50, in load_model
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585] self.load_weights(model, model_config)
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585] File "/home/ray/anaconda3/lib/python3.11/site-packages/vllm/model_executor/model_loader/runai_streamer_loader.py", line 104, in load_weights
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585] self._get_weights_iterator(model_weights, model_config.revision))
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585] File "/home/ray/anaconda3/lib/python3.11/site-packages/vllm/model_executor/model_loader/runai_streamer_loader.py", line 87, in _get_weights_iterator
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585] hf_weights_files = self._prepare_weights(model_or_path, revision)
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585] ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585] File "/home/ray/anaconda3/lib/python3.11/site-packages/vllm/model_executor/model_loader/runai_streamer_loader.py", line 77, in _prepare_weights
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585] raise RuntimeError(
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) ERROR 09-23 17:01:43 [multiproc_executor.py:585] RuntimeError: Cannot find any safetensors model weights with `/home/ray/.cache/huggingface/hub/models--s3:----cclim-rsch-databricks-storage--data--project_data--ccllm--qwen3-32b--/snapshots/0000000000000000000000000000000000000000`
(MapWorker(MapBatches(vLLMEngineStageUDF)) pid=162837) (Worker pid=186718) INFO 09-23 17:01:43 [multiproc_executor.py:546] Parent process exited, terminating worker
Ray is on nightly docker image, installed vllm 0.10.2 and added in the fix made by lengrongfu.
Versions / Dependencies
Ray nightly version
vllm 0.10.2 with fix for runai_streamer
Reproduction script
import ray
from ray.data.llm import vLLMEngineProcessorConfig, build_llm_processor
config = vLLMEngineProcessorConfig(
model_source="s3://path/qwen3-32b/",
engine_kwargs={
"enable_chunked_prefill": True,
"max_num_batched_tokens": 4096,
"max_model_len": 16384,
"load_format": "runai_streamer",
"tensor_parallel_size": 1,
},
concurrency=1,
batch_size=64,
)
processor = build_llm_processor(
config,
preprocess=lambda row: dict(
messages=[
{"role": "system", "content": "You are a bot that responds with haikus."},
{"role": "user", "content": row["item"]}
],
sampling_params=dict(
temperature=0.3,
max_tokens=250,
)
),
postprocess=lambda row: dict(
answer=row["generated_text"],
**row # This will return all the original columns in the dataset.
),
)
ds = ray.data.from_items(["Start of the haiku is: Complete this for me..."])
ds = processor(ds)
ds.show(limit=1)Issue Severity
Medium: It is a significant difficulty but I can work around it.
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
bugSomething that is supposed to be working; but isn'tSomething that is supposed to be working; but isn'tcommunity-backlogdataRay Data-related issuesRay Data-related issuesllmstabilitytriageNeeds triage (eg: priority, bug/not-bug, and owning component)Needs triage (eg: priority, bug/not-bug, and owning component)