[xray] Workers blocked in a ray.get release their resources#1920
[xray] Workers blocked in a ray.get release their resources#1920atumanov merged 11 commits intoray-project:masterfrom
ray.get release their resources#1920Conversation
29e8077 to
9192d9c
Compare
| } | ||
| } | ||
| return true; | ||
| return not oversubscribed; |
There was a problem hiding this comment.
can you do that? don't you need to do !oversubscribed?
There was a problem hiding this comment.
Oh, I didn't even realize I did that...apparently you can: http://en.cppreference.com/w/cpp/keyword/not.
There was a problem hiding this comment.
Can we change it to !oversubscribed? I think that will be a lot more familiar to people.
src/ray/raylet/node_manager.cc
Outdated
| const auto &task = tasks.front(); | ||
| RAY_CHECK( | ||
| cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Release( | ||
| task.GetTaskSpecification().GetRequiredResources())); |
There was a problem hiding this comment.
The behavior in legacy Ray is to release only CPU resources. I think we probably want to preserve that behavior.
src/ray/raylet/node_manager.cc
Outdated
|
|
||
| auto tasks = local_queues_.RemoveTasks({worker->GetAssignedTaskId()}); | ||
| const auto &task = tasks.front(); | ||
| bool ok = |
There was a problem hiding this comment.
instead of ok, can we call this not_oversubscribed?
| // TODO(atumanov): Return failure if attempting to perform vector | ||
| // subtraction with unknown labels. | ||
| resource_capacity_[resource_label] -= resource_capacity; | ||
| if (resource_capacity_.count(resource_label) < 0) { |
There was a problem hiding this comment.
Shouldn't this be resource_capacity_[resource_label] < 0?
| resource_capacity_[resource_label] -= resource_capacity; | ||
| // TODO(atumanov): Return failure if attempting to perform vector | ||
| // subtraction with unknown labels. | ||
| resource_capacity_[resource_label] -= resource_capacity; |
There was a problem hiding this comment.
We need to handle the case where resource_capacity_.count(resource_label) == 0) (that is the local scheduler is not aware of this resource). Or is that case impossible?
There was a problem hiding this comment.
Hmm yeah, it seemed like Alexey had left that as a todo, but I can just put a check here for now. I think we should just do a fatal check for that for now...
|
Test PASSed. |
|
Test FAILed. |
src/ray/raylet/node_manager.cc
Outdated
| // it acquired for its assigned task while it is blocked. The resources | ||
| // will be acquired again once the worker is unblocked. | ||
| std::shared_ptr<Worker> worker = worker_pool_.GetRegisteredWorker(client); | ||
| if (worker) { |
There was a problem hiding this comment.
you can combine these two if statements into one if (worker && !worker->IsBlocked()). I personally would take it a step further and use the opposite logic with early exit : if (!worker || worker->IsBlocked()) break;
There was a problem hiding this comment.
I prefer not to use break; statements whenever possible, since it is usually less error-prone if you return in only one place.
There was a problem hiding this comment.
I always prefer structuring my code with early termination logic. That way you don't have to scroll through code searching for any elses or any other code paths executed for all cases. Early termination logic is easier to follow. It also saves on indentation and is more assembly-like.
src/ray/raylet/node_manager.cc
Outdated
| // Get the CPU resources required by the running task. | ||
| const auto required_resources = | ||
| task.GetTaskSpecification().GetRequiredResources(); | ||
| double required_cpus; |
There was a problem hiding this comment.
initialize required_cpus to zero. Valgrind will legitimately complain when you pass a pointer to uninitialized memory .
src/ray/raylet/node_manager.cc
Outdated
| auto tasks = local_queues_.RemoveTasks({worker->GetAssignedTaskId()}); | ||
| const auto &task = tasks.front(); | ||
| // Get the CPU resources required by the running task. | ||
| const auto required_resources = |
There was a problem hiding this comment.
you probably want a const ref here const auto &required_resources
There was a problem hiding this comment.
I can't do that because GetRequiredResources() returns a ResourceSet and not a ResourceSet &, since the data structure is built on the fly.
| auto tasks = local_queues_.RemoveTasks({worker->GetAssignedTaskId()}); | ||
| const auto &task = tasks.front(); | ||
| // Get the CPU resources required by the running task. | ||
| const auto required_resources = task.GetTaskSpecification().GetRequiredResources(); |
src/ray/raylet/node_manager.cc
Outdated
| // Get the CPU resources required by the running task. | ||
| const auto required_resources = task.GetTaskSpecification().GetRequiredResources(); | ||
| double required_cpus; | ||
| RAY_CHECK(required_resources.GetResource(kCPU_ResourceLabel, required_cpus)); |
There was a problem hiding this comment.
oh, bad, &required_cpus !
|
Test PASSed. |
|
Test FAILed. |
| ActorID actor_id_; | ||
| /// Whether the worker is blocked. Workers become blocked in a `ray.get`, if | ||
| /// they require a data dependency while executing a task. | ||
| bool blocked_; |
There was a problem hiding this comment.
ideally, we would make it enum WorkerState and transition the worker through its finite state diagram: STARTING -> REGISTERED->READY->EXECUTING->BLOCKED. Blocked state is just one possibility. That said, I think the only use case this state is currently useful for is tracking if it's blocked or not.
src/ray/raylet/node_manager.cc
Outdated
| bool not_oversubscribed = | ||
| cluster_resource_map_[gcs_client_->client_table().GetLocalClientId()].Acquire( | ||
| ResourceSet(cpu_resources)); | ||
| if (!not_oversubscribed) { |
There was a problem hiding this comment.
minor (ok to ignore) : would be good to get rid of the double negative, by using oversubscribed variable instead (and changing the logic to match).
src/ray/raylet/scheduling_queue.h
Outdated
| void QueueRunningTasks(const std::vector<Task> &tasks); | ||
|
|
||
| /// Queue tasks in the blocked state. These are tasks that have been | ||
| /// dispatched to a worker but are blocked on a missing data dependency. |
There was a problem hiding this comment.
minor: this description sounds like a pending task (pending missing data dependencies). Maybe qualify the description by saying "but are blocked on a missing data dependency discovered at runtime"?
| actor_id_(ActorID::nil()), | ||
| blocked_(false) {} | ||
|
|
||
| void Worker::MarkBlocked() { blocked_ = true; } |
There was a problem hiding this comment.
minor: ToggleBlocked(true/false) to save on methods. But we can clean it up if we decide to go with worker states later.
|
Test PASSed. |
|
Test PASSed. |
|
Test PASSed. |
|
Test PASSed. |
* master: Handle interrupts correctly for ASIO synchronous reads and writes. (ray-project#1929) [DataFrame] Adding read methods and tests (ray-project#1712) Allow task_table_update to fail when tasks are finished. (ray-project#1927) [rllib] Contribute DDPG to RLlib (ray-project#1877) [xray] Workers blocked in a `ray.get` release their resources (ray-project#1920) Raylet task dispatch and throttling worker startup (ray-project#1912) [DataFrame] Eval fix (ray-project#1903) [tune] Polishing docs (ray-project#1846) [tune] [rllib] Automatically determine RLlib resources and add queueing mechanism for autoscaling (ray-project#1848) Preemptively push local arguments for actor tasks (ray-project#1901) [tune] Allow fetching pinned objects from trainable functions (ray-project#1895) Multithreading refactor for ObjectManager. (ray-project#1911) Add slice functionality (ray-project#1832) [DataFrame] Pass read_csv kwargs to _infer_column (ray-project#1894) Addresses missed comments from multichunk object transfer PR. (ray-project#1908) Allow numpy arrays to be passed by value into tasks (and inlined in the task spec). (ray-project#1816) [xray] Lineage cache requests notifications from the GCS about remote tasks (ray-project#1834) Fix UI issue for non-json-serializable task arguments. (ray-project#1892) Remove unnecessary calls to .hex() for object IDs. (ray-project#1910) Allow multiple raylets to be started on a single machine. (ray-project#1904) # Conflicts: # python/ray/rllib/__init__.py # python/ray/rllib/dqn/dqn.py
* master: updates (ray-project#1958) Pin Cython in autoscaler development example. (ray-project#1951) Incorporate C++ Buffer management and Seal global threadpool fix from arrow (ray-project#1950) [XRay] Add consistency check for protocol between node_manager and local_scheduler_client (ray-project#1944) Remove smart_open install. (ray-project#1943) [DataFrame] Fully implement append, concat and join (ray-project#1932) [DataFrame] Fix for __getitem__ string indexing (ray-project#1939) [DataFrame] Implementing write methods (ray-project#1918) [rllib] arr[end] was excluded when end is not None (ray-project#1931) [DataFrame] Implementing API correct groupby with aggregation methods (ray-project#1914) Handle interrupts correctly for ASIO synchronous reads and writes. (ray-project#1929) [DataFrame] Adding read methods and tests (ray-project#1712) Allow task_table_update to fail when tasks are finished. (ray-project#1927) [rllib] Contribute DDPG to RLlib (ray-project#1877) [xray] Workers blocked in a `ray.get` release their resources (ray-project#1920) Raylet task dispatch and throttling worker startup (ray-project#1912) [DataFrame] Eval fix (ray-project#1903)
What do these changes do?
A worker that is assigned a task and blocked in a
ray.getshould release its resources to allow another task to run. Without this, with the following task, you cannot have a recursion deeper than the number of cores available:Related issue number
This is similar to #286, but for the Raylet.