Skip to content

[Ray V2 Tune + Train] Tuner is not aware of resources and oversubscribes leading to deadlocks #53921

@lineick

Description

@lineick

What happened + What you expected to happen

Based on the documentation for HPO here (https://docs.ray.io/en/master/train/user-guides/hyperparameter-optimization.html), I implemented the HPO with a trainable function instead of an object like Ray's TorchTrainer.

As explained in the documentation this hides the resource usage from the Tuner, leading to it only showing the CPUs in use by the driver functions themselves, not the driver functions + the TorchTrainer trial resources.

Using the trainable function now leads to the tuner starting all trials with the defined resources unaware of potential oversubscription. In case of oversubscription (e.g. a user wanting more num_samples than would fit in parallel) all trials will stay in RUNNING state while trying to acquire resources forever:

(train_driver_fn pid=366383) Attempting to start training worker group of size 1 with the following resources: [{'CPU': 9, 'GPU': 1}] * 1
(train_driver_fn pid=366602) Retrying the launch of the training worker group. The previous launch attempt encountered the following failure:
(train_driver_fn pid=366602) The worker group startup timed out after 30.0 seconds waiting for 1 workers. Potential causes include: (1) temporary insufficient cluster resources while waiting for autoscaling (ignore this warning in t
his case), (2) infeasible resource request where the provided `ScalingConfig` cannot be satisfied), and (3) transient network issues. Set the RAY_TRAIN_WORKER_GROUP_START_TIMEOUT_S environment variable to increase the timeout.
(train_driver_fn pid=366602) Attempting to start training worker group of size 1 with the following resources: [{'CPU': 9, 'GPU': 1}] * 1

Only if the resources in the ScalingConfig and the num_samples in the TuneConfig are set such that all samples can run in parallel it works.

Trying to avoid this kind of deadlock, I followed the suggestion to set max_concurrent_trials in the TuneConfig.
But this does not safe it from oversubscription, the group of trials of size max_concurrent_trials will train, but afterwards everything deadlocks again in the same way:

Image

By now I feel like I have tried everything, also setting with_resources does not help (I assume because this creates another placement group in which the TorchTrainers will then create there own placement group, which is ofc not what we want).

How do you intend to propagate all the resource usage and management to the tuner, such that it knows about all trials resources, only creates as many trials as resources are available and puts the others in PENDING as it was previously when just passing the Trainer as a trainable instead of a driver function?

It seems like I integrated everything as done in the documentation (https://docs.ray.io/en/master/train/user-guides/hyperparameter-optimization.html) but it does not behave as intended. I checked the values of max_concurrent_trials, they are correctly computed. I dont find the error.

Code:

    def _execute_with_ray(
        self,
        train_dataset: Dataset,
        validation_dataset: Dataset | None = None,
        test_dataset: Dataset | None = None,
    ) -> Tuple[Dataset, Dataset, Dataset, Any]:
        """Launch training and, optionally, hyperparameter tuning with Ray
        Args:
            train_dataset (Dataset): training dataset.
            validation_dataset (Dataset | None, optional): validation
                dataset. Defaults to None.
            test_dataset (Dataset | None, optional): test dataset.
                Defaults to None.
        Returns:
            Tuple[Dataset, Dataset, Dataset, Any]: training dataset,
            validation dataset, test dataset, trained model.
        """

        if self.from_checkpoint:
            if not self.ray_run_config:
                # If a checkpoint is provided, we need to create a RunConfig
                # to be used by Ray Tuner
                py_logger.warning(
                    "from_checkpoint was passed, but ray_run_config is not set. "
                    "Creating a new RunConfig with the checkpoint path."
                )
                self.ray_run_config = ray.tune.RunConfig()

            checkpoint_path = to_uri(self.from_checkpoint)
            # Create a new RunConfig with checkpoint path
            self.ray_run_config.name = Path(checkpoint_path).name
            self.ray_run_config.storage_path = str(Path(checkpoint_path).parent)

        elif self.ray_run_config:
            # Create Ray checkpoints dir if it does not exist yet
            ckpt_dir = Path(self.ray_run_config.storage_path)
            ckpt_dir.mkdir(parents=True, exist_ok=True)

        # Store large datasets in Rays object store to avoid serialization issues
        train_dataset_ref = ray.put(train_dataset)
        validation_dataset_ref = (
            ray.put(validation_dataset) if validation_dataset is not None else None
        )
        test_dataset_ref = ray.put(test_dataset) if test_dataset is not None else None

        # Define a worker function to be run on each worker
        def train_fn_per_worker(train_loop_config: dict):
            """Retrieve datasets from references and start worker

            Args:
                train_loop_config (dict): configuration for the training loop,
                    passed by Ray Tuner.
            """

            train_dataset = ray.get(train_dataset_ref)
            val_dataset = (
                ray.get(validation_dataset_ref) if validation_dataset_ref is not None else None
            )
            test_dataset = ray.get(test_dataset_ref) if test_dataset_ref is not None else None

            # Call the original worker function with the datasets and config
            self._run_worker(
                train_loop_config,
                train_dataset=train_dataset,
                validation_dataset=val_dataset,
                test_dataset=test_dataset,
            )

        def train_driver_fn(config: dict):
            """Driver function for Ray tune
            This function is called by Ray Tuner to start the training process.

            Args:
                config (dict): configuration for the training loop,
                    passed by Ray Tuner.
            """
            # Extract hyperparameters from config
            train_loop_config = config.get("train_loop_config", {})
            avail = ray.available_resources()
            print(f"trial: {avail}")

            run_config = ray.train.RunConfig(
                name=f"train-trial_id={ray.tune.get_context().get_trial_id()}",
                # Needed as of ray version 2.46, to propagate train.report back to the Tuner
                callbacks = [TuneReportCallback()],
                storage_path=self.ray_run_config.storage_path if self.ray_run_config else None,
            )

            trainer = RayTorchTrainer(
                train_loop_per_worker=train_fn_per_worker,
                train_loop_config=train_loop_config,
                scaling_config=self.ray_scaling_config,
                run_config=run_config,
                torch_config=self.ray_torch_config,
                dataset_config=self.ray_data_config,
            )

            trainer.fit()

        # Create the parameter space for hyperparameter tuning
        param_space = {"train_loop_config": search_space(self.ray_search_space)}

        # get current avail resources
        available_resources = ray.available_resources()
        print("AVAIL RES: ", available_resources)
        print("RAY_V2", os.environ.get("RAY_TRAIN_V2_ENABLED"))

        available_cpus = available_resources["CPU"]
        available_gpus = available_resources["GPU"]

        num_w = self.ray_scaling_config.num_workers
        cpu_w = self.ray_scaling_config.num_cpus_per_worker
        gpu_w = self.ray_scaling_config.num_gpus_per_worker

        # calculate the max number of concurrent_trials
        # +1 for cpus as each trial gets a driver function running on 1 CPU
        if gpu_w > 0.0 and cpu_w > 0.0:
            max_concurrent_trials = int(
                    min(available_cpus // (num_w * cpu_w + 1), available_gpus // (num_w * gpu_w))
            )
        elif cpu_w > 0.0:
            max_concurrent_trials = int(available_cpus // (num_w * cpu_w + 1))
        elif gpu_w > 0.0:
            max_concurrent_trials = int(available_gpus // (num_w * gpu_w))
        else:
            max_concurrent_trials = 0

        print("MAX CONC TRIALS: ", max_concurrent_trials)
        if max_concurrent_trials == 0:
            raise ValueError("Not enough resources for one trial available.")

        self.ray_tune_config.max_concurrent_trials = max_concurrent_trials
        # self.ray_tune_config.reuse_actors = True

        print("TUNE_conf: ", self.ray_tune_config)

        # Create the tuner with the driver function
        tuner = ray.tune.Tuner(
            trainable=train_driver_fn,
            param_space=param_space,
            tune_config=self.ray_tune_config,
            run_config=self.ray_run_config,
        )

        if self.time_ray:
            self.tune_result_grid = self._time_and_log(
                lambda: tuner.fit(), "ray_fit_time_s", step=0
            )
        else:
            # Run the tuner and capture results
            self.tune_result_grid = tuner.fit()

        return train_dataset, validation_dataset, test_dataset, None

Versions / Dependencies

ray 2.46.0
torch 2.4.1+cu121

Run was on one node with 4 Nvidia GPUs and 48 CPUs (cores).

Reproduction script

Code is hard to impossible to disentangle into a small script, but you can find the code here:

https://github.com/interTwin-eu/itwinai/tree/temp-ray-error

Issue Severity

High: It blocks me from completing my task.

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething that is supposed to be working; but isn'tquestionJust a question :)stabilitytriageNeeds triage (eg: priority, bug/not-bug, and owning component)tuneTune-related issues

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions