-
Notifications
You must be signed in to change notification settings - Fork 7.4k
[Core] Ray hangs with vllm0.8.5 v1 api for tp8+pp4 #53758
Copy link
Copy link
Open
Labels
P1Issue that should be fixed within a few weeksIssue that should be fixed within a few weeksbugSomething that is supposed to be working; but isn'tSomething that is supposed to be working; but isn'tcompiled-graphscoreIssues that should be addressed in Ray CoreIssues that should be addressed in Ray Corellmstabilityvllm
Description
What happened + What you expected to happen
vllm v1 api uses DAG by default, and it may hang with ray::PlasmaObjectHeader::TryToAcquireSemaphore , and check the hanged node, 7 gpu utils are 100%, probably with nccl allreduce, 1 gpu utils is 0%, this process hangs at ray::PlasmaObjectHeader::TryToAcquireSemaphore.
stacktrace is following:
Thread 32810 (idle): "Dummy-3"
do_futex_wait.constprop.0 (libpthread-2.32.so)
__new_sem_wait_slow.constprop.0 (libpthread-2.32.so)
ray::PlasmaObjectHeader::TryToAcquireSemaphore (ray/_raylet.so)
ray::PlasmaObjectHeader::WriteAcquire (ray/_raylet.so)
ray::experimental::MutableObjectManager::WriteAcquire (ray/_raylet.so)
ray::core::experimental::MutableObjectProvider::WriteAcquire (ray/_raylet.so)
ray::core::CoreWorker::ExperimentalChannelWriteAcquire (ray/_raylet.so)
experimental_channel_put_serialized (ray/_raylet.so)
write (ray/experimental/channel/shared_memory_channel.py:466)
write (ray/experimental/channel/shared_memory_channel.py:772)
write (ray/experimental/channel/torch_tensor_nccl_channel.py:561)
_send_cpu_and_gpu_data (ray/experimental/channel/torch_tensor_nccl_channel.py:206)
write (ray/experimental/channel/torch_tensor_nccl_channel.py:270)
write (ray/experimental/channel/common.py:626)
_write (ray/dag/compiled_dag_node.py:718)
exec_operation (ray/dag/compiled_dag_node.py:753)
do_exec_tasks (ray/dag/compiled_dag_node.py:230)
__ray_call__ (ray/actor.py:1722)
_resume_span (ray/util/tracing/tracing_helper.py:463)
actor_method_executor (ray/_private/function_manager.py:696)
function_executor (ray/_raylet.so)
_raylet_task_execution_handler (ray/_raylet.so)
std::_Function_handler<ray::Status(ray::rpc::Address const&, ray::rpc::TaskType, std::string, ray::core::RayFunction const&, std::unordered_map<std::string, double, std::hash<std::string>, std::equal_to<std::string>, std::allocator<std::pair<std::string const, double> > > const&, std::vector<std::shared_ptr<ray::RayObject>, std::allocator<std::shared_ptr<ray::RayObject> > > const&, std::vector<ray::rpc::ObjectReference, std::allocator<ray::rpc::ObjectReference> > const&, std::string const&, std::string const&, std::vector<std::pair<ray::ObjectID, std::shared_ptr<ray::RayObject> >, std::allocator<std::pair<ray::ObjectID, std::shared_ptr<ray::RayObject> > > >*, std::vector<std::pair<ray::ObjectID, std::shared_ptr<ray::RayObject> >, std::allocator<std::pair<ray::ObjectID, std::shared_ptr<ray::RayObject> > > >*, std::vector<std::pair<ray::ObjectID, bool>, std::allocator<std::pair<ray::ObjectID, bool> > >*, std::shared_ptr<ray::LocalMemoryBuffer>&, bool*, std::string*, std::vector<ray::ConcurrencyGroup, std::allocator<ray::ConcurrencyGroup> > const&, std::string, bool, bool, bool, long), ray::Status (*)(ray::rpc::Address const&, ray::rpc::TaskType, std::string, ray::core::RayFunction const&, std::unordered_map<std::string, double, std::hash<std::string>, std::equal_to<std::string>, std::allocator<std::pair<std::string const, double> > > const&, std::vector<std::shared_ptr<ray::RayObject>, std::allocator<std::shared_ptr<ray::RayObject> > > const&, std::vector<ray::rpc::ObjectReference, std::allocator<ray::rpc::ObjectReference> > const&, std::string, std::string, std::vector<std::pair<ray::ObjectID, std::shared_ptr<ray::RayObject> >, std::allocator<std::pair<ray::ObjectID, std::shared_ptr<ray::RayObject> > > >*, std::vector<std::pair<ray::ObjectID, std::shared_ptr<ray::RayObject> >, std::allocator<std::pair<ray::ObjectID, std::shared_ptr<ray::RayObject> > > >*, std::vector<std::pair<ray::ObjectID, bool>, std::allocator<std::pair<ray::ObjectID, bool> > >*, std::shared_ptr<ray::LocalMemoryBuffer>&, bool*, std::string*, std::vector<ray::ConcurrencyGroup, std::allocator<ray::ConcurrencyGroup> > const&, std::string, bool, bool, bool, long)>::_M_invoke (ray/_raylet.so)
ray::core::CoreWorker::ExecuteTask (ray/_raylet.so)
std::_Function_handler<ray::Status(ray::TaskSpecification const&, std::optional<std::unordered_map<std::string, std::vector<std::pair<long, double>, std::allocator<std::pair<long, double> > >, std::hash<std::string>, std::equal_to<std::string>, std::allocator<std::pair<std::string const, std::vector<std::pair<long, double>, std::allocator<std::pair<long, double> > > > > > >, std::vector<std::pair<ray::ObjectID, std::shared_ptr<ray::RayObject> >, std::allocator<std::pair<ray::ObjectID, std::shared_ptr<ray::RayObject> > > >*, std::vector<std::pair<ray::ObjectID, std::shared_ptr<ray::RayObject> >, std::allocator<std::pair<ray::ObjectID, std::shared_ptr<ray::RayObject> > > >*, std::vector<std::pair<ray::ObjectID, bool>, std::allocator<std::pair<ray::ObjectID, bool> > >*, google::protobuf::RepeatedPtrField<ray::rpc::ObjectReferenceCount>*, bool*, std::string*), std::_Bind<ray::Status (ray::core::CoreWorker(ray::core::CoreWorker*, std::_Placeholder<1>, std::_Placeholder<2>, std::_Placeholder<3>, std::_Placeholder<4>, std::_Placeholder<5>, std::_Placeholder<6>, std::_Placeholder<7>, std::_Placeholder<8>)::*)(ray::TaskSpecification const&, std::optional<std::unordered_map<std::string, std::vector<std::pair<long, double>, std::allocator<std::pair<long, double> > >, std::hash<std::string>, std::equal_to<std::string>, std::allocator<std::pair<std::string const, std::vector<std::pair<long, double>, std::allocator<std::pair<long, double> > > > > > >, std::vector<std::pair<ray::ObjectID, std::shared_ptr<ray::RayObject> >, std::allocator<std::pair<ray::ObjectID, std::shared_ptr<ray::RayObject> > > >*, std::vector<std::pair<ray::ObjectID, std::shared_ptr<ray::RayObject> >, std::allocator<std::pair<ray::ObjectID, std::shared_ptr<ray::RayObject> > > >*, std::vector<std::pair<ray::ObjectID, bool>, std::allocator<std::pair<ray::ObjectID, bool> > >*, google::protobuf::RepeatedPtrField<ray::rpc::ObjectReferenceCount>*, bool*, std::string*)> >::_M_invoke (ray/_raylet.so)
ray::core::TaskReceiver::HandleTask(ray::rpc::PushTaskRequest const&, ray::rpc::PushTaskReply*, std::function<void (ray::Status, std::function<void ()>, std::function<void ()>)>)::{lambda(ray::TaskSpecification const&, std::function<void (ray::Status, std::function<void ()>, std::function<void ()>)>)#1}::operator() const (ray/_raylet.so)
std::_Function_handler<void (ray::TaskSpecification const&, std::function<void (ray::Status, std::function<void ()>, std::function<void ()>)>), ray::core::TaskReceiver::HandleTask(ray::rpc::PushTaskRequest const&, ray::rpc::PushTaskReply*, std::function<void (ray::Status, std::function<void ()>, std::function<void ()>)>)::{lambda(ray::TaskSpecification const&, std::function<void (ray::Status, std::function<void ()>, std::function<void ()>)>)#1}>::_M_invoke (ray/_raylet.so)
ray::core::InboundRequest::Accept (ray/_raylet.so)
ray::core::ActorSchedulingQueue::AcceptRequestOrRejectIfCanceled (ray/_raylet.so)
std::_Function_handler<void (), ray::core::ActorSchedulingQueue::ScheduleRequests()::{lambda()#2}>::_M_invoke (ray/_raylet.so)
boost::asio::detail::executor_op<boost::asio::detail::binder0<std::function<void ()> >, std::allocator<void>, boost::asio::detail::scheduler_operation>::do_complete (ray/_raylet.so)
boost::asio::detail::scheduler::do_run_one (ray/_raylet.so)
boost::asio::detail::scheduler::run (ray/_raylet.so)
boost::asio::detail::posix_thread::func<boost::asio::thread_pool::thread_function>::run (ray/_raylet.so)
boost_asio_detail_posix_thread_function (ray/_raylet.so)
clone (libc-2.32.so)
I tuned RAY_CGRAPH_get_timeout to very long time, if not, this timeout will happen.
the model is our private model, and Qwen will also run into this, taking much more time to reproduce.
Versions / Dependencies
ray 2.46.0 and 2.43.0 is tested
Reproduction script
import re
import time
import uuid
from typing import Dict, List, Optional, Any
import numpy as np
from ray.runtime_env import RuntimeEnv
from ray.util import placement_group
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
import ray
import asyncio
from tqdm import tqdm
from transformers.models.auto.tokenization_auto import get_tokenizer_config
from vllm.config import VllmConfig
from vllm.v1.engine.async_llm import AsyncLLM
from vllm.v1.executor.abstract import Executor
from opencompass.models.base import BaseModel
from opencompass.utils import get_logger
from vllm import SamplingParams, AsyncEngineArgs
DEFAULT_MODEL_KWARGS = dict(trust_remote_code=True)
class AsyncVLLMModel:
def __init__(self, vllm_config: VllmConfig, executor_class):
self.engine = AsyncLLM(vllm_config, executor_class, True)
self.result_map = {}
self.result_generator_q = asyncio.Queue()
self.result_generator_task = None
asyncio.create_task(self.process_result())
asyncio.create_task(self.__log())
async def __log(self):
while True:
await asyncio.sleep(1)
await self.engine.do_log_stats()
async def _process_result(self, request_id: str, generator):
try:
finial_result = None
async for res in generator:
finial_result = res
except Exception as e:
finial_result = e
self.result_map[request_id] = finial_result
async def process_result(self):
while True:
request_id, generator = await self.result_generator_q.get()
asyncio.create_task(self._process_result(request_id, generator))
async def add_request(self, input: str, sampling_params: SamplingParams) -> str:
request_id = str(uuid.uuid4())
generator = self.engine.generate(input, sampling_params, request_id)
self.result_generator_q.put_nowait((request_id, generator))
return request_id
def query_result(self, request_id: str):
return self.result_map.pop(request_id, None)
class AsyncVLLMOnRay(BaseModel):
"""Model Wrapper for VLLM."""
def bin_trim(self, prompt: str, num_token: int) -> str:
import jieba
token_len = self.get_token_len(prompt)
if token_len <= num_token:
return prompt
pattern = re.compile(r'[\u4e00-\u9fa5]')
if pattern.search(prompt):
words = list(jieba.cut(prompt, cut_all=False))
sep = ''
else:
words = prompt.split(' ')
sep = ' '
l, r = 1, len(words)
while l + 2 < r:
mid = (l + r) // 2
if self.mode == 'front':
cur_prompt = sep.join(words[-mid:])
elif self.mode == 'mid':
cur_prompt = sep.join(words[:mid]) + sep.join(words[-mid:])
elif self.mode == 'rear':
cur_prompt = sep.join(words[:mid])
if self.get_token_len(cur_prompt) <= num_token:
l = mid # noqa: E741
else:
r = mid
if self.mode == 'front':
prompt = sep.join(words[-l:])
elif self.mode == 'mid':
prompt = sep.join(words[:l]) + sep.join(words[-l:])
elif self.mode == 'rear':
prompt = sep.join(words[:l])
return prompt
def __init__(
self,
path: str,
tokenizer_path: Optional[str] = None,
abbr: str = None,
max_seq_len: int = 2048,
model_kwargs: dict = None,
generation_kwargs: dict = dict(),
tokenizer_kwargs: Dict[str, Any] = {},
fixed_min_out_len=500,
meta_template: Optional[Dict] = None,
mode: str = 'none',
use_fastchat_template: bool = False,
stop_words: List[str] = [],
):
super().__init__(path=path,
abbr=abbr,
max_seq_len=max_seq_len,
meta_template=meta_template)
self.logger = get_logger()
self._load_model(path, model_kwargs)
self.generation_kwargs = generation_kwargs
self.generation_kwargs.pop('do_sample', None)
self.tokenizer = self._load_tokenizer()
self.fixed_min_out_len = fixed_min_out_len
self.mode = mode
self.use_fastchat_template = use_fastchat_template
self.stop_words = list(set(stop_words + self._get_potential_stop_words(path)))
def _load_tokenizer(self):
from transformers import AutoTokenizer
tokenizer_config = get_tokenizer_config(self.path)
tokenizer_class = tokenizer_config.get('tokenizer_class')
if tokenizer_class == 'BailingTokenizer':
from opencompass.utils.tokenzier.tokenization_bailing import BailingTokenizer
self.logger.info("use bailing tokenizer")
tokenizer = BailingTokenizer.from_pretrained(self.path)
else:
self.logger.info("use default tokenizer")
tokenizer = AutoTokenizer.from_pretrained(self.path)
return tokenizer
def _load_model(self,
path: str,
add_model_kwargs: dict = {},
num_retry: int = 3):
engine_args = AsyncEngineArgs()
DEFAULT_MODEL_KWARGS = {
'served_model_name': 'auto',
'trust_remote_code': True,
'disable_custom_all_reduce': True,
'distributed_executor_backend': 'ray',
'model': path,
'tokenizer': path,
'max_num_seqs': 1024,
}
for k, v in DEFAULT_MODEL_KWARGS.items():
setattr(engine_args, k, v)
pp_split = add_model_kwargs.pop('pp_split', None)
for k, v in add_model_kwargs.items():
setattr(engine_args, k, v)
self.logger.info("vllm engine args: {}".format(engine_args.__dict__))
tp = engine_args.tensor_parallel_size
pp = engine_args.pipeline_parallel_size
pg_resources = []
for i in range(tp * pp):
pg_resources.append({"CPU": 1, "GPU": 1})
pg = placement_group(pg_resources)
pg_group = ray.get(pg.ready())
vllm_config = engine_args.create_engine_config()
vllm_config.parallel_config.placement_group = pg_group
scheduling_strategy = PlacementGroupSchedulingStrategy(
placement_group=pg,
placement_group_capture_child_tasks=True,
)
exec_class = Executor.get_class(vllm_config)
env_var = {
"VLLM_USE_V1": "1",
"VLLM_USE_RAY_COMPILED_DAG_OVERLAP_COMM": "1",
"NCCL_DEBUG": "INFO",
"RAY_CGRAPH_get_timeout": "20000",
"NCCL_COMM_BLOCKING": "1",
# "RAY_CGRAPH_overlap_gpu_communication": "1",
'NCCL_MAX_CTAS': "8",
# 'RAY_CGRAPH_max_inflight_executions': "80"
}
if pp_split:
env_var['VLLM_PP_LAYER_PARTITION'] = str(pp_split)
self.model = ray.remote(AsyncVLLMModel).options(
scheduling_strategy=scheduling_strategy,
runtime_env=RuntimeEnv(
env_vars=env_var
)
).remote(
vllm_config,
exec_class
)
def _get_potential_stop_words(self, path: Optional[str]):
from transformers import GenerationConfig
potential_stop_words = []
try:
generation_config = GenerationConfig.from_pretrained(path)
except:
generation_config = None
if generation_config and hasattr(generation_config, 'eos_token_id'):
if isinstance(generation_config.eos_token_id, int):
potential_stop_words.append(self.tokenizer.decode(generation_config.eos_token_id))
else:
assert isinstance(generation_config.eos_token_id, list)
for token_id in generation_config.eos_token_id:
potential_stop_words.append(self.tokenizer.decode(token_id))
if self.tokenizer.eos_token is not None:
potential_stop_words.append(self.tokenizer.eos_token)
potential_stop_words = list(set(potential_stop_words))
potential_stop_words = [s for s in potential_stop_words if s]
return potential_stop_words
def generate(self, inputs: List[str], max_out_len: int, stopping_criteria: List[str] = [], **kwargs) -> List[str]:
return self._generate(inputs, max_out_len, stopping_criteria, **kwargs)
def get_ppl(self,
inputs: List[str],
mask_length: Optional[List[int]] = None) -> List[float]:
return self._get_ppl(inputs, mask_length)
async def infer(self, input, sampling_kwargs: SamplingParams):
if self.mode != 'none' or self.mode is not None:
context_window = self.max_seq_len
max_out_len = sampling_kwargs.max_tokens
if max_out_len != 1:
max_out_len = self.fixed_min_out_len
input = self.bin_trim(input, context_window - max_out_len)
request_id = await self.model.add_request.remote(input, sampling_kwargs)
return request_id
def query_result(self, request_id: str):
while True:
result = ray.get(self.model.query_result.remote(request_id))
if result is not None:
return result
time.sleep(1)
async def do_batch_infer(self, inputs: List[str], sampling_kwargs: SamplingParams):
request_ids = [
await self.infer(input, sampling_kwargs)
for idx, input in enumerate(inputs)
]
result = []
for request_id in tqdm(request_ids):
res = self.query_result(request_id)
result.append(res)
return result
def _generate(self,
inputs: List[str],
max_out_len: int,
stopping_criteria: List[str] = [],
**kwargs) -> List[str]:
"""Generate results given a list of inputs.
Args:
inputs (List[str]): A list of strings.
max_out_len (int): The maximum length of the output.
Returns:
List[str]: A list of generated strings.
"""
DEFAULT_GENERATION_KWARGS = {
'temperature': 0,
'max_tokens': max_out_len,
'stop': list(set(self.stop_words + stopping_criteria))
}
sampling_kwargs = DEFAULT_GENERATION_KWARGS.copy()
sampling_kwargs.update(self.generation_kwargs)
sampling_kwargs.update(kwargs)
sampling_kwargs = SamplingParams(**sampling_kwargs)
self.logger.info("sampling_kwargs: {}".format(sampling_kwargs))
outputs = asyncio.run(self.do_batch_infer(inputs, sampling_kwargs))
prompt_list, output_strs = [], []
for output in outputs:
prompt = output.prompt
generated_text = output.outputs[0].text
prompt_list.append(prompt)
output_strs.append(generated_text)
return output_strs
def _get_ppl(self,
inputs: List[str],
mask_length: Optional[List[int]] = None) -> List[float]:
batch_size = len(inputs)
sampling_kwargs = SamplingParams(prompt_logprobs=0,
max_tokens=1,
**self.generation_kwargs)
# forward
outputs = asyncio.run(self.do_batch_infer(inputs, sampling_kwargs))
# compute ppl
ce_loss = []
for i in range(batch_size):
prompt_logprobs = outputs[i].prompt_logprobs[1:]
prompt_token_ids = outputs[i].prompt_token_ids[1:]
prompt_logprobs_list = [
prompt_logprobs[i][prompt_token_ids[i]]
for i in range(len(prompt_logprobs))
]
prompt_logprobs_list = [i.logprob for i in prompt_logprobs_list]
prompt_logprobs_list = np.array(prompt_logprobs_list)
if mask_length is not None:
prompt_logprobs_list = prompt_logprobs_list[-mask_length[i]:]
loss = -prompt_logprobs_list.sum(axis=-1) / len(prompt_token_ids)
ce_loss.append(loss)
return np.array(ce_loss)
def get_loglikelihood(self, inputs: List[str],
conts: List[str]) -> List[float]:
mask_length = [
self.get_token_len(c, add_special_tokens=False) for c in conts
]
return -self.get_ppl(inputs, mask_length)
def get_token_len(self,
prompt: str,
add_special_tokens: bool = True) -> int:
"""Get lengths of the tokenized strings.
Args:
prompt (str): Input string.
Returns:
int: Length of the input tokens
"""
token_ids = self.tokenizer.encode(prompt,
add_special_tokens=add_special_tokens)
return len(token_ids)
if __name__ == '__main__':
path = '/some/model/path/'
model = AsyncVLLMOnRay(path=path, model_kwargs={
"tensor_parallel_size": 8,
"pipeline_parallel_size": 4,
"gpu_memory_utilization": 0.9,
})
res = model.generate(
["你好", "你叫什么名字"],
max_out_len=10,
)
print(res)
test_input = ["".join([str(uuid.uuid4())] * 100) for i in range(10000)]
res = model.generate(
test_input,
max_out_len=1024,
)
print(res)
Issue Severity
None
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
P1Issue that should be fixed within a few weeksIssue that should be fixed within a few weeksbugSomething that is supposed to be working; but isn'tSomething that is supposed to be working; but isn'tcompiled-graphscoreIssues that should be addressed in Ray CoreIssues that should be addressed in Ray Corellmstabilityvllm
Type
Projects
Status
Todo