-
Notifications
You must be signed in to change notification settings - Fork 7.4k
[Ray V2 Tune + Train] Tuner is not aware of resources and oversubscribes leading to deadlocks #53921
Description
What happened + What you expected to happen
Based on the documentation for HPO here (https://docs.ray.io/en/master/train/user-guides/hyperparameter-optimization.html), I implemented the HPO with a trainable function instead of an object like Ray's TorchTrainer.
As explained in the documentation this hides the resource usage from the Tuner, leading to it only showing the CPUs in use by the driver functions themselves, not the driver functions + the TorchTrainer trial resources.
Using the trainable function now leads to the tuner starting all trials with the defined resources unaware of potential oversubscription. In case of oversubscription (e.g. a user wanting more num_samples than would fit in parallel) all trials will stay in RUNNING state while trying to acquire resources forever:
(train_driver_fn pid=366383) Attempting to start training worker group of size 1 with the following resources: [{'CPU': 9, 'GPU': 1}] * 1
(train_driver_fn pid=366602) Retrying the launch of the training worker group. The previous launch attempt encountered the following failure:
(train_driver_fn pid=366602) The worker group startup timed out after 30.0 seconds waiting for 1 workers. Potential causes include: (1) temporary insufficient cluster resources while waiting for autoscaling (ignore this warning in t
his case), (2) infeasible resource request where the provided `ScalingConfig` cannot be satisfied), and (3) transient network issues. Set the RAY_TRAIN_WORKER_GROUP_START_TIMEOUT_S environment variable to increase the timeout.
(train_driver_fn pid=366602) Attempting to start training worker group of size 1 with the following resources: [{'CPU': 9, 'GPU': 1}] * 1
Only if the resources in the ScalingConfig and the num_samples in the TuneConfig are set such that all samples can run in parallel it works.
Trying to avoid this kind of deadlock, I followed the suggestion to set max_concurrent_trials in the TuneConfig.
But this does not safe it from oversubscription, the group of trials of size max_concurrent_trials will train, but afterwards everything deadlocks again in the same way:
By now I feel like I have tried everything, also setting with_resources does not help (I assume because this creates another placement group in which the TorchTrainers will then create there own placement group, which is ofc not what we want).
How do you intend to propagate all the resource usage and management to the tuner, such that it knows about all trials resources, only creates as many trials as resources are available and puts the others in PENDING as it was previously when just passing the Trainer as a trainable instead of a driver function?
It seems like I integrated everything as done in the documentation (https://docs.ray.io/en/master/train/user-guides/hyperparameter-optimization.html) but it does not behave as intended. I checked the values of max_concurrent_trials, they are correctly computed. I dont find the error.
Code:
def _execute_with_ray(
self,
train_dataset: Dataset,
validation_dataset: Dataset | None = None,
test_dataset: Dataset | None = None,
) -> Tuple[Dataset, Dataset, Dataset, Any]:
"""Launch training and, optionally, hyperparameter tuning with Ray
Args:
train_dataset (Dataset): training dataset.
validation_dataset (Dataset | None, optional): validation
dataset. Defaults to None.
test_dataset (Dataset | None, optional): test dataset.
Defaults to None.
Returns:
Tuple[Dataset, Dataset, Dataset, Any]: training dataset,
validation dataset, test dataset, trained model.
"""
if self.from_checkpoint:
if not self.ray_run_config:
# If a checkpoint is provided, we need to create a RunConfig
# to be used by Ray Tuner
py_logger.warning(
"from_checkpoint was passed, but ray_run_config is not set. "
"Creating a new RunConfig with the checkpoint path."
)
self.ray_run_config = ray.tune.RunConfig()
checkpoint_path = to_uri(self.from_checkpoint)
# Create a new RunConfig with checkpoint path
self.ray_run_config.name = Path(checkpoint_path).name
self.ray_run_config.storage_path = str(Path(checkpoint_path).parent)
elif self.ray_run_config:
# Create Ray checkpoints dir if it does not exist yet
ckpt_dir = Path(self.ray_run_config.storage_path)
ckpt_dir.mkdir(parents=True, exist_ok=True)
# Store large datasets in Rays object store to avoid serialization issues
train_dataset_ref = ray.put(train_dataset)
validation_dataset_ref = (
ray.put(validation_dataset) if validation_dataset is not None else None
)
test_dataset_ref = ray.put(test_dataset) if test_dataset is not None else None
# Define a worker function to be run on each worker
def train_fn_per_worker(train_loop_config: dict):
"""Retrieve datasets from references and start worker
Args:
train_loop_config (dict): configuration for the training loop,
passed by Ray Tuner.
"""
train_dataset = ray.get(train_dataset_ref)
val_dataset = (
ray.get(validation_dataset_ref) if validation_dataset_ref is not None else None
)
test_dataset = ray.get(test_dataset_ref) if test_dataset_ref is not None else None
# Call the original worker function with the datasets and config
self._run_worker(
train_loop_config,
train_dataset=train_dataset,
validation_dataset=val_dataset,
test_dataset=test_dataset,
)
def train_driver_fn(config: dict):
"""Driver function for Ray tune
This function is called by Ray Tuner to start the training process.
Args:
config (dict): configuration for the training loop,
passed by Ray Tuner.
"""
# Extract hyperparameters from config
train_loop_config = config.get("train_loop_config", {})
avail = ray.available_resources()
print(f"trial: {avail}")
run_config = ray.train.RunConfig(
name=f"train-trial_id={ray.tune.get_context().get_trial_id()}",
# Needed as of ray version 2.46, to propagate train.report back to the Tuner
callbacks = [TuneReportCallback()],
storage_path=self.ray_run_config.storage_path if self.ray_run_config else None,
)
trainer = RayTorchTrainer(
train_loop_per_worker=train_fn_per_worker,
train_loop_config=train_loop_config,
scaling_config=self.ray_scaling_config,
run_config=run_config,
torch_config=self.ray_torch_config,
dataset_config=self.ray_data_config,
)
trainer.fit()
# Create the parameter space for hyperparameter tuning
param_space = {"train_loop_config": search_space(self.ray_search_space)}
# get current avail resources
available_resources = ray.available_resources()
print("AVAIL RES: ", available_resources)
print("RAY_V2", os.environ.get("RAY_TRAIN_V2_ENABLED"))
available_cpus = available_resources["CPU"]
available_gpus = available_resources["GPU"]
num_w = self.ray_scaling_config.num_workers
cpu_w = self.ray_scaling_config.num_cpus_per_worker
gpu_w = self.ray_scaling_config.num_gpus_per_worker
# calculate the max number of concurrent_trials
# +1 for cpus as each trial gets a driver function running on 1 CPU
if gpu_w > 0.0 and cpu_w > 0.0:
max_concurrent_trials = int(
min(available_cpus // (num_w * cpu_w + 1), available_gpus // (num_w * gpu_w))
)
elif cpu_w > 0.0:
max_concurrent_trials = int(available_cpus // (num_w * cpu_w + 1))
elif gpu_w > 0.0:
max_concurrent_trials = int(available_gpus // (num_w * gpu_w))
else:
max_concurrent_trials = 0
print("MAX CONC TRIALS: ", max_concurrent_trials)
if max_concurrent_trials == 0:
raise ValueError("Not enough resources for one trial available.")
self.ray_tune_config.max_concurrent_trials = max_concurrent_trials
# self.ray_tune_config.reuse_actors = True
print("TUNE_conf: ", self.ray_tune_config)
# Create the tuner with the driver function
tuner = ray.tune.Tuner(
trainable=train_driver_fn,
param_space=param_space,
tune_config=self.ray_tune_config,
run_config=self.ray_run_config,
)
if self.time_ray:
self.tune_result_grid = self._time_and_log(
lambda: tuner.fit(), "ray_fit_time_s", step=0
)
else:
# Run the tuner and capture results
self.tune_result_grid = tuner.fit()
return train_dataset, validation_dataset, test_dataset, NoneVersions / Dependencies
ray 2.46.0
torch 2.4.1+cu121
Run was on one node with 4 Nvidia GPUs and 48 CPUs (cores).
Reproduction script
Code is hard to impossible to disentangle into a small script, but you can find the code here:
https://github.com/interTwin-eu/itwinai/tree/temp-ray-error
Issue Severity
High: It blocks me from completing my task.
