Skip to content

[Data] GPU resource leakage after ray.data.llm pipeline is terminated #53124

@kouroshHakha

Description

@kouroshHakha

What happened + What you expected to happen

Comparing nightly vs. 2.46 on the script below highlights a problem with resource leakage in ray.

Basically in 2.46 the following ray data + llm pipeline releases gpu resource completely after it is done.
But in nightly the following script leaves one orphaned process hugging on the gpu resource.

You can run this repro with ray-llm images on any > 2 gpu head node cluster.

I suspect something with gc in ray core has drifted from how it used to be.

Versions / Dependencies

vllm = 0.8.5

Reproduction script

import ray
from ray.data.llm import build_llm_processor, vLLMEngineProcessorConfig


model_source = "Qwen/Qwen2.5-0.5B-Instruct"
tp_size = 1
pp_size = 1
concurrency = 2
sample_size = 60
runtime_env = dict(
    env_vars=dict(
        VLLM_USE_V1="1",
    ),
)
tokenize = False
detokenize = False

processor_config = vLLMEngineProcessorConfig(
    model_source=model_source,
    task_type="generate",
    engine_kwargs=dict(
        tensor_parallel_size=tp_size,
        pipeline_parallel_size=pp_size,
        max_model_len=4096,
        enable_chunked_prefill=True,
        # default is to use mp, but using ray will help with the cleanup.
        # distributed_executor_backend="ray",
    ),
    apply_chat_template=True,
    runtime_env=runtime_env,
    tokenize=tokenize,
    detokenize=detokenize,
    batch_size=16,
    concurrency=concurrency,
)

processor = build_llm_processor(
    processor_config,
    preprocess=lambda row: dict(
        model=model_source,
        messages=[
            {"role": "system", "content": "You are an assistant"},
            {
                "role": "user",
                "content": f"Say {row['id']} words about this image.",
            },
        ],
        sampling_params=dict(
            temperature=0.3,
            max_tokens=50,
        ),
    ),
    postprocess=lambda row: {
        "resp": row["generated_text"],
    },
)

ds = ray.data.range(sample_size)
ds = ds.map(lambda x: {"id": x["id"], "val": x["id"] + 5})
ds = processor(ds)
ds = ds.materialize()
outs = ds.take_all()
assert len(outs) == sample_size
assert all("resp" in out for out in outs)

Issue Severity

High: It blocks me from completing my task.

Metadata

Metadata

Assignees

Labels

bugSomething that is supposed to be working; but isn'tcommunity-backlogdataRay Data-related issuesllmstabilitytriageNeeds triage (eg: priority, bug/not-bug, and owning component)

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions