Skip to content

[Core] Ray hangs with vllm0.8.5 v1 api for tp8+pp4 #53758

@strgrb

Description

@strgrb

What happened + What you expected to happen

vllm v1 api uses DAG by default, and it may hang with ray::PlasmaObjectHeader::TryToAcquireSemaphore , and check the hanged node, 7 gpu utils are 100%, probably with nccl allreduce, 1 gpu utils is 0%, this process hangs at ray::PlasmaObjectHeader::TryToAcquireSemaphore.
stacktrace is following:

Thread 32810 (idle): "Dummy-3"
    do_futex_wait.constprop.0 (libpthread-2.32.so)
    __new_sem_wait_slow.constprop.0 (libpthread-2.32.so)
    ray::PlasmaObjectHeader::TryToAcquireSemaphore (ray/_raylet.so)
    ray::PlasmaObjectHeader::WriteAcquire (ray/_raylet.so)
    ray::experimental::MutableObjectManager::WriteAcquire (ray/_raylet.so)
    ray::core::experimental::MutableObjectProvider::WriteAcquire (ray/_raylet.so)
    ray::core::CoreWorker::ExperimentalChannelWriteAcquire (ray/_raylet.so)
    experimental_channel_put_serialized (ray/_raylet.so)
    write (ray/experimental/channel/shared_memory_channel.py:466)
    write (ray/experimental/channel/shared_memory_channel.py:772)
    write (ray/experimental/channel/torch_tensor_nccl_channel.py:561)
    _send_cpu_and_gpu_data (ray/experimental/channel/torch_tensor_nccl_channel.py:206)
    write (ray/experimental/channel/torch_tensor_nccl_channel.py:270)
    write (ray/experimental/channel/common.py:626)
    _write (ray/dag/compiled_dag_node.py:718)
    exec_operation (ray/dag/compiled_dag_node.py:753)
    do_exec_tasks (ray/dag/compiled_dag_node.py:230)
    __ray_call__ (ray/actor.py:1722)
    _resume_span (ray/util/tracing/tracing_helper.py:463)
    actor_method_executor (ray/_private/function_manager.py:696)
    function_executor (ray/_raylet.so)
    _raylet_task_execution_handler (ray/_raylet.so)
    std::_Function_handler<ray::Status(ray::rpc::Address const&, ray::rpc::TaskType, std::string, ray::core::RayFunction const&, std::unordered_map<std::string, double, std::hash<std::string>, std::equal_to<std::string>, std::allocator<std::pair<std::string const, double> > > const&, std::vector<std::shared_ptr<ray::RayObject>, std::allocator<std::shared_ptr<ray::RayObject> > > const&, std::vector<ray::rpc::ObjectReference, std::allocator<ray::rpc::ObjectReference> > const&, std::string const&, std::string const&, std::vector<std::pair<ray::ObjectID, std::shared_ptr<ray::RayObject> >, std::allocator<std::pair<ray::ObjectID, std::shared_ptr<ray::RayObject> > > >*, std::vector<std::pair<ray::ObjectID, std::shared_ptr<ray::RayObject> >, std::allocator<std::pair<ray::ObjectID, std::shared_ptr<ray::RayObject> > > >*, std::vector<std::pair<ray::ObjectID, bool>, std::allocator<std::pair<ray::ObjectID, bool> > >*, std::shared_ptr<ray::LocalMemoryBuffer>&, bool*, std::string*, std::vector<ray::ConcurrencyGroup, std::allocator<ray::ConcurrencyGroup> > const&, std::string, bool, bool, bool, long), ray::Status (*)(ray::rpc::Address const&, ray::rpc::TaskType, std::string, ray::core::RayFunction const&, std::unordered_map<std::string, double, std::hash<std::string>, std::equal_to<std::string>, std::allocator<std::pair<std::string const, double> > > const&, std::vector<std::shared_ptr<ray::RayObject>, std::allocator<std::shared_ptr<ray::RayObject> > > const&, std::vector<ray::rpc::ObjectReference, std::allocator<ray::rpc::ObjectReference> > const&, std::string, std::string, std::vector<std::pair<ray::ObjectID, std::shared_ptr<ray::RayObject> >, std::allocator<std::pair<ray::ObjectID, std::shared_ptr<ray::RayObject> > > >*, std::vector<std::pair<ray::ObjectID, std::shared_ptr<ray::RayObject> >, std::allocator<std::pair<ray::ObjectID, std::shared_ptr<ray::RayObject> > > >*, std::vector<std::pair<ray::ObjectID, bool>, std::allocator<std::pair<ray::ObjectID, bool> > >*, std::shared_ptr<ray::LocalMemoryBuffer>&, bool*, std::string*, std::vector<ray::ConcurrencyGroup, std::allocator<ray::ConcurrencyGroup> > const&, std::string, bool, bool, bool, long)>::_M_invoke (ray/_raylet.so)
    ray::core::CoreWorker::ExecuteTask (ray/_raylet.so)
    std::_Function_handler<ray::Status(ray::TaskSpecification const&, std::optional<std::unordered_map<std::string, std::vector<std::pair<long, double>, std::allocator<std::pair<long, double> > >, std::hash<std::string>, std::equal_to<std::string>, std::allocator<std::pair<std::string const, std::vector<std::pair<long, double>, std::allocator<std::pair<long, double> > > > > > >, std::vector<std::pair<ray::ObjectID, std::shared_ptr<ray::RayObject> >, std::allocator<std::pair<ray::ObjectID, std::shared_ptr<ray::RayObject> > > >*, std::vector<std::pair<ray::ObjectID, std::shared_ptr<ray::RayObject> >, std::allocator<std::pair<ray::ObjectID, std::shared_ptr<ray::RayObject> > > >*, std::vector<std::pair<ray::ObjectID, bool>, std::allocator<std::pair<ray::ObjectID, bool> > >*, google::protobuf::RepeatedPtrField<ray::rpc::ObjectReferenceCount>*, bool*, std::string*), std::_Bind<ray::Status (ray::core::CoreWorker(ray::core::CoreWorker*, std::_Placeholder<1>, std::_Placeholder<2>, std::_Placeholder<3>, std::_Placeholder<4>, std::_Placeholder<5>, std::_Placeholder<6>, std::_Placeholder<7>, std::_Placeholder<8>)::*)(ray::TaskSpecification const&, std::optional<std::unordered_map<std::string, std::vector<std::pair<long, double>, std::allocator<std::pair<long, double> > >, std::hash<std::string>, std::equal_to<std::string>, std::allocator<std::pair<std::string const, std::vector<std::pair<long, double>, std::allocator<std::pair<long, double> > > > > > >, std::vector<std::pair<ray::ObjectID, std::shared_ptr<ray::RayObject> >, std::allocator<std::pair<ray::ObjectID, std::shared_ptr<ray::RayObject> > > >*, std::vector<std::pair<ray::ObjectID, std::shared_ptr<ray::RayObject> >, std::allocator<std::pair<ray::ObjectID, std::shared_ptr<ray::RayObject> > > >*, std::vector<std::pair<ray::ObjectID, bool>, std::allocator<std::pair<ray::ObjectID, bool> > >*, google::protobuf::RepeatedPtrField<ray::rpc::ObjectReferenceCount>*, bool*, std::string*)> >::_M_invoke (ray/_raylet.so)
    ray::core::TaskReceiver::HandleTask(ray::rpc::PushTaskRequest const&, ray::rpc::PushTaskReply*, std::function<void (ray::Status, std::function<void ()>, std::function<void ()>)>)::{lambda(ray::TaskSpecification const&, std::function<void (ray::Status, std::function<void ()>, std::function<void ()>)>)#1}::operator() const (ray/_raylet.so)
    std::_Function_handler<void (ray::TaskSpecification const&, std::function<void (ray::Status, std::function<void ()>, std::function<void ()>)>), ray::core::TaskReceiver::HandleTask(ray::rpc::PushTaskRequest const&, ray::rpc::PushTaskReply*, std::function<void (ray::Status, std::function<void ()>, std::function<void ()>)>)::{lambda(ray::TaskSpecification const&, std::function<void (ray::Status, std::function<void ()>, std::function<void ()>)>)#1}>::_M_invoke (ray/_raylet.so)
    ray::core::InboundRequest::Accept (ray/_raylet.so)
    ray::core::ActorSchedulingQueue::AcceptRequestOrRejectIfCanceled (ray/_raylet.so)
    std::_Function_handler<void (), ray::core::ActorSchedulingQueue::ScheduleRequests()::{lambda()#2}>::_M_invoke (ray/_raylet.so)
    boost::asio::detail::executor_op<boost::asio::detail::binder0<std::function<void ()> >, std::allocator<void>, boost::asio::detail::scheduler_operation>::do_complete (ray/_raylet.so)
    boost::asio::detail::scheduler::do_run_one (ray/_raylet.so)
    boost::asio::detail::scheduler::run (ray/_raylet.so)
    boost::asio::detail::posix_thread::func<boost::asio::thread_pool::thread_function>::run (ray/_raylet.so)
    boost_asio_detail_posix_thread_function (ray/_raylet.so)
    clone (libc-2.32.so)

I tuned RAY_CGRAPH_get_timeout to very long time, if not, this timeout will happen.

the model is our private model, and Qwen will also run into this, taking much more time to reproduce.

Versions / Dependencies

ray 2.46.0 and 2.43.0 is tested

Reproduction script

import re
import time
import uuid
from typing import Dict, List, Optional, Any

import numpy as np
from ray.runtime_env import RuntimeEnv
from ray.util import placement_group
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
import ray
import asyncio

from tqdm import tqdm
from transformers.models.auto.tokenization_auto import get_tokenizer_config
from vllm.config import VllmConfig
from vllm.v1.engine.async_llm import AsyncLLM
from vllm.v1.executor.abstract import Executor

from opencompass.models.base import BaseModel
from opencompass.utils import get_logger

from vllm import SamplingParams, AsyncEngineArgs

DEFAULT_MODEL_KWARGS = dict(trust_remote_code=True)


class AsyncVLLMModel:
    def __init__(self, vllm_config: VllmConfig, executor_class):
        self.engine = AsyncLLM(vllm_config, executor_class, True)
        self.result_map = {}
        self.result_generator_q = asyncio.Queue()
        self.result_generator_task = None
        asyncio.create_task(self.process_result())
        asyncio.create_task(self.__log())

    async def __log(self):
        while True:
            await asyncio.sleep(1)
            await self.engine.do_log_stats()

    async def _process_result(self, request_id: str, generator):
        try:
            finial_result = None
            async for res in generator:
                finial_result = res
        except Exception as e:
            finial_result = e
        self.result_map[request_id] = finial_result

    async def process_result(self):
        while True:
            request_id, generator = await self.result_generator_q.get()
            asyncio.create_task(self._process_result(request_id, generator))

    async def add_request(self, input: str, sampling_params: SamplingParams) -> str:
        request_id = str(uuid.uuid4())
        generator = self.engine.generate(input, sampling_params, request_id)
        self.result_generator_q.put_nowait((request_id, generator))
        return request_id

    def query_result(self, request_id: str):
        return self.result_map.pop(request_id, None)


class AsyncVLLMOnRay(BaseModel):
    """Model Wrapper for VLLM."""

    def bin_trim(self, prompt: str, num_token: int) -> str:
        import jieba

        token_len = self.get_token_len(prompt)
        if token_len <= num_token:
            return prompt
        pattern = re.compile(r'[\u4e00-\u9fa5]')
        if pattern.search(prompt):
            words = list(jieba.cut(prompt, cut_all=False))
            sep = ''
        else:
            words = prompt.split(' ')
            sep = ' '
        l, r = 1, len(words)
        while l + 2 < r:
            mid = (l + r) // 2
            if self.mode == 'front':
                cur_prompt = sep.join(words[-mid:])
            elif self.mode == 'mid':
                cur_prompt = sep.join(words[:mid]) + sep.join(words[-mid:])
            elif self.mode == 'rear':
                cur_prompt = sep.join(words[:mid])

            if self.get_token_len(cur_prompt) <= num_token:
                l = mid  # noqa: E741
            else:
                r = mid

        if self.mode == 'front':
            prompt = sep.join(words[-l:])
        elif self.mode == 'mid':
            prompt = sep.join(words[:l]) + sep.join(words[-l:])
        elif self.mode == 'rear':
            prompt = sep.join(words[:l])
        return prompt

    def __init__(
            self,
            path: str,
            tokenizer_path: Optional[str] = None,
            abbr: str = None,
            max_seq_len: int = 2048,
            model_kwargs: dict = None,
            generation_kwargs: dict = dict(),
            tokenizer_kwargs: Dict[str, Any] = {},
            fixed_min_out_len=500,
            meta_template: Optional[Dict] = None,
            mode: str = 'none',
            use_fastchat_template: bool = False,
            stop_words: List[str] = [],
    ):
        super().__init__(path=path,
                         abbr=abbr,
                         max_seq_len=max_seq_len,
                         meta_template=meta_template)

        self.logger = get_logger()
        self._load_model(path, model_kwargs)
        self.generation_kwargs = generation_kwargs
        self.generation_kwargs.pop('do_sample', None)
        self.tokenizer = self._load_tokenizer()
        self.fixed_min_out_len = fixed_min_out_len
        self.mode = mode
        self.use_fastchat_template = use_fastchat_template
        self.stop_words = list(set(stop_words + self._get_potential_stop_words(path)))

    def _load_tokenizer(self):
        from transformers import AutoTokenizer
        tokenizer_config = get_tokenizer_config(self.path)
        tokenizer_class = tokenizer_config.get('tokenizer_class')
        if tokenizer_class == 'BailingTokenizer':
            from opencompass.utils.tokenzier.tokenization_bailing import BailingTokenizer
            self.logger.info("use bailing tokenizer")
            tokenizer = BailingTokenizer.from_pretrained(self.path)
        else:
            self.logger.info("use default tokenizer")
            tokenizer = AutoTokenizer.from_pretrained(self.path)
        return tokenizer

    def _load_model(self,
                    path: str,
                    add_model_kwargs: dict = {},
                    num_retry: int = 3):
        engine_args = AsyncEngineArgs()
        DEFAULT_MODEL_KWARGS = {
            'served_model_name': 'auto',
            'trust_remote_code': True,
            'disable_custom_all_reduce': True,
            'distributed_executor_backend': 'ray',
            'model': path,
            'tokenizer': path,
            'max_num_seqs': 1024,
        }
        for k, v in DEFAULT_MODEL_KWARGS.items():
            setattr(engine_args, k, v)

        pp_split = add_model_kwargs.pop('pp_split', None)
        for k, v in add_model_kwargs.items():
            setattr(engine_args, k, v)
        self.logger.info("vllm engine args: {}".format(engine_args.__dict__))
        tp = engine_args.tensor_parallel_size
        pp = engine_args.pipeline_parallel_size
        pg_resources = []
        for i in range(tp * pp):
            pg_resources.append({"CPU": 1, "GPU": 1})
        pg = placement_group(pg_resources)
        pg_group = ray.get(pg.ready())
        vllm_config = engine_args.create_engine_config()
        vllm_config.parallel_config.placement_group = pg_group
        scheduling_strategy = PlacementGroupSchedulingStrategy(
            placement_group=pg,
            placement_group_capture_child_tasks=True,
        )

        exec_class = Executor.get_class(vllm_config)

        env_var = {
            "VLLM_USE_V1": "1",
            "VLLM_USE_RAY_COMPILED_DAG_OVERLAP_COMM": "1",
            "NCCL_DEBUG": "INFO",
            "RAY_CGRAPH_get_timeout": "20000",
            "NCCL_COMM_BLOCKING": "1",
            # "RAY_CGRAPH_overlap_gpu_communication": "1",
            'NCCL_MAX_CTAS': "8",
            # 'RAY_CGRAPH_max_inflight_executions': "80"
        }
        if pp_split:
            env_var['VLLM_PP_LAYER_PARTITION'] = str(pp_split)
        self.model = ray.remote(AsyncVLLMModel).options(
            scheduling_strategy=scheduling_strategy,
            runtime_env=RuntimeEnv(
                env_vars=env_var
            )
        ).remote(
            vllm_config,
            exec_class
        )

    def _get_potential_stop_words(self, path: Optional[str]):
        from transformers import GenerationConfig
        potential_stop_words = []
        try:
            generation_config = GenerationConfig.from_pretrained(path)
        except:
            generation_config = None
        if generation_config and hasattr(generation_config, 'eos_token_id'):
            if isinstance(generation_config.eos_token_id, int):
                potential_stop_words.append(self.tokenizer.decode(generation_config.eos_token_id))
            else:
                assert isinstance(generation_config.eos_token_id, list)
                for token_id in generation_config.eos_token_id:
                    potential_stop_words.append(self.tokenizer.decode(token_id))
        if self.tokenizer.eos_token is not None:
            potential_stop_words.append(self.tokenizer.eos_token)
        potential_stop_words = list(set(potential_stop_words))
        potential_stop_words = [s for s in potential_stop_words if s]
        return potential_stop_words

    def generate(self, inputs: List[str], max_out_len: int, stopping_criteria: List[str] = [], **kwargs) -> List[str]:
        return self._generate(inputs, max_out_len, stopping_criteria, **kwargs)

    def get_ppl(self,
                inputs: List[str],
                mask_length: Optional[List[int]] = None) -> List[float]:
        return self._get_ppl(inputs, mask_length)

    async def infer(self, input, sampling_kwargs: SamplingParams):
        if self.mode != 'none' or self.mode is not None:
            context_window = self.max_seq_len
            max_out_len = sampling_kwargs.max_tokens
            if max_out_len != 1:
                max_out_len = self.fixed_min_out_len
            input = self.bin_trim(input, context_window - max_out_len)
        request_id = await self.model.add_request.remote(input, sampling_kwargs)
        return request_id

    def query_result(self, request_id: str):
        while True:
            result = ray.get(self.model.query_result.remote(request_id))
            if result is not None:
                return result
            time.sleep(1)

    async def do_batch_infer(self, inputs: List[str], sampling_kwargs: SamplingParams):
        request_ids = [
            await self.infer(input, sampling_kwargs)
            for idx, input in enumerate(inputs)
        ]
        result = []
        for request_id in tqdm(request_ids):
            res = self.query_result(request_id)
            result.append(res)
        return result

    def _generate(self,
                  inputs: List[str],
                  max_out_len: int,
                  stopping_criteria: List[str] = [],
                  **kwargs) -> List[str]:
        """Generate results given a list of inputs.

        Args:
            inputs (List[str]): A list of strings.
            max_out_len (int): The maximum length of the output.

        Returns:
            List[str]: A list of generated strings.
        """

        DEFAULT_GENERATION_KWARGS = {
            'temperature': 0,
            'max_tokens': max_out_len,
            'stop': list(set(self.stop_words + stopping_criteria))
        }
        sampling_kwargs = DEFAULT_GENERATION_KWARGS.copy()

        sampling_kwargs.update(self.generation_kwargs)
        sampling_kwargs.update(kwargs)
        sampling_kwargs = SamplingParams(**sampling_kwargs)

        self.logger.info("sampling_kwargs: {}".format(sampling_kwargs))
        outputs = asyncio.run(self.do_batch_infer(inputs, sampling_kwargs))
        prompt_list, output_strs = [], []
        for output in outputs:
            prompt = output.prompt
            generated_text = output.outputs[0].text
            prompt_list.append(prompt)
            output_strs.append(generated_text)

        return output_strs

    def _get_ppl(self,
                 inputs: List[str],
                 mask_length: Optional[List[int]] = None) -> List[float]:
        batch_size = len(inputs)
        sampling_kwargs = SamplingParams(prompt_logprobs=0,
                                         max_tokens=1,
                                         **self.generation_kwargs)
        # forward
        outputs = asyncio.run(self.do_batch_infer(inputs, sampling_kwargs))
        # compute ppl
        ce_loss = []
        for i in range(batch_size):
            prompt_logprobs = outputs[i].prompt_logprobs[1:]
            prompt_token_ids = outputs[i].prompt_token_ids[1:]
            prompt_logprobs_list = [
                prompt_logprobs[i][prompt_token_ids[i]]
                for i in range(len(prompt_logprobs))
            ]
            prompt_logprobs_list = [i.logprob for i in prompt_logprobs_list]
            prompt_logprobs_list = np.array(prompt_logprobs_list)
            if mask_length is not None:
                prompt_logprobs_list = prompt_logprobs_list[-mask_length[i]:]
            loss = -prompt_logprobs_list.sum(axis=-1) / len(prompt_token_ids)
            ce_loss.append(loss)
        return np.array(ce_loss)

    def get_loglikelihood(self, inputs: List[str],
                          conts: List[str]) -> List[float]:
        mask_length = [
            self.get_token_len(c, add_special_tokens=False) for c in conts
        ]
        return -self.get_ppl(inputs, mask_length)

    def get_token_len(self,
                      prompt: str,
                      add_special_tokens: bool = True) -> int:
        """Get lengths of the tokenized strings.

        Args:
            prompt (str): Input string.

        Returns:
            int: Length of the input tokens
        """
        token_ids = self.tokenizer.encode(prompt,
                                          add_special_tokens=add_special_tokens)
        return len(token_ids)


if __name__ == '__main__':
    path = '/some/model/path/'
    model = AsyncVLLMOnRay(path=path, model_kwargs={
        "tensor_parallel_size": 8,
        "pipeline_parallel_size": 4,
        "gpu_memory_utilization": 0.9,
    })
    res = model.generate(
        ["你好", "你叫什么名字"],
        max_out_len=10,
    )
    print(res)
    test_input = ["".join([str(uuid.uuid4())] * 100) for i in range(10000)]
    res = model.generate(
        test_input,
        max_out_len=1024,
    )
    print(res)

Issue Severity

None

Metadata

Metadata

Assignees

Labels

P1Issue that should be fixed within a few weeksbugSomething that is supposed to be working; but isn'tcompiled-graphscoreIssues that should be addressed in Ray Corellmstabilityvllm

Type

No type

Projects

Status

Todo

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions