Keeping pipelines full#10225
Conversation
There was a problem hiding this comment.
Discussed offline about refactoring this class to squash all of the fields that are currently related to SchedulingKey into one hashmap. The value should be a new struct that includes:
- these new counts
- the workers that are currently leased to us with that scheduling key (
worker_to_lease_entry_) - whether there is a pending lease request for that key (
pending_lease_requests_) - the tasks queued with that key (
task_queues_)
|
I'll take another look once the refactoring and unit tests are done! |
02125fc to
b847614
Compare
| it->second.push_back(task_spec); | ||
| auto &it = scheduling_key_entries_[scheduling_key]; | ||
| it.task_queue_.push_back(task_spec); | ||
| RAY_CHECK(it.task_queue_.size() >= 1); |
There was a problem hiding this comment.
This check doesn't seem necessary.
|
|
||
| // If pipelining is enabled, check whether we really need a new worer or whether we have | ||
| // enough room in an existing worker's pipeline to send the new tasks | ||
| if (max_tasks_in_flight_per_worker_ > 1) { |
There was a problem hiding this comment.
I don't think we need this check because it should be okay to run the following code even when max_tasks_in_flight = 1.
| // enough room in an existing worker's pipeline to send the new tasks | ||
| if (max_tasks_in_flight_per_worker_ > 1) { | ||
| if (scheduling_key_entry.tot_tasks_in_flight < | ||
| scheduling_key_entry.active_workers_.size() * max_tasks_in_flight_per_worker_) { |
There was a problem hiding this comment.
I'm not sure about putting this logic in RequestNewWorkerIfNeeded. It seems a bit strange to be submitting tasks in this method (before the logic only had to do with requesting new workers and not anything to do with task submission). It also seems a bit brittle because OnWorkerIdle, called below, calls back into RequestNewWorkerIfNeeded.
Instead, how about we only use the check to see whether we should request a new worker or not? Then, we should also move this new logic to submit tasks to already active workers to when the task is first queued in SubmitTask.
There was a problem hiding this comment.
The reason why I added this part (calling OnWorkerIdle from within RequestNewWorkerIfNeeded) was to avoid introducing new latency due to the fact that OnWorkerIdle works in a pull (rather than push) fashion. OnWorkerIdle is normally only called when (1) we get a new worker, or (2) when we get a response from a worker (after the worker has completed the execution of a task). At that point, we pull tasks from the queue and submit them.
Now, consider a scenario with a set of active workers with non-full pipelines, where we have just added a few more tasks to a queue at the owner's. In this situation, even if some of the pipelines are not full, before we can submit the new tasks to an active worker, we have to wait until that worker has responded back to the owner (because OnWorkerIdle is not called until then). If we submit to a queue at the owner's a number of tasks that is larger than the grand total of existing "spots" available in the pipelines to the existing worker, the system will only fill those pipelines (and realize that it needs to request an additional worker) only after it has received a response from every single one of the existing workers with non-full pipelines. As a result, the total number of active workers will be lower for a longer period of time, and the overall execution time will suffer.
There was a problem hiding this comment.
Yes, I understand that we should call OnWorkerIdle even while it is still executing other tasks. My comment was more about where we should call it. I think it is better to call it directly in SubmitTask right after we've added tasks to the queue. The reason is that: a) dispatching tasks doesn't match the current semantics of the method, which is supposed to only request a new worker, and b) it prevents the bad recursive structure where RequestNewWorkerIfNeeded calls OnWorkerIdle calls RequestNewWorkerIfNeeded, etc.
There was a problem hiding this comment.
Oh I see! I think I misread your initial comment! Sorry about that. This makes sense. So I guess we would just move the code block
if (scheduling_key_entry.tot_tasks_in_flight <
scheduling_key_entry.active_workers_.size() * max_tasks_in_flight_per_worker_) {
// The pipelines to the current workers are not full yet, so we don't need more
// workers.
// Find a worker with a number of tasks in flight that is less than the maximum
// value (max_tasks_in_flight_per_worker_) and call OnWorkerIdle to send tasks to
// that worker
for (auto active_worker_addr : scheduling_key_entry.active_workers_) {
RAY_CHECK(worker_to_lease_entry_.find(active_worker_addr) !=
worker_to_lease_entry_.end());
auto &lease_entry = worker_to_lease_entry_[active_worker_addr];
if (lease_entry.tasks_in_flight_ < max_tasks_in_flight_per_worker_) {
OnWorkerIdle(active_worker_addr, scheduling_key, false,
lease_entry.assigned_resources_);
break;
}
}
return;
} (without the return statement, of course) to the SubmitTask function. RequestNewWorkerIfNeeded would then only keep the following if statement?
if (scheduling_key_entry.tot_tasks_in_flight <
scheduling_key_entry.active_workers_.size() * max_tasks_in_flight_per_worker_) {
// The pipelines to the current workers are not full yet, so we don't need more
// workers.
return;
} There was a problem hiding this comment.
Sounds good! Let me do that right now.
| if (lease_entry.tasks_in_flight_ < max_tasks_in_flight_per_worker_) { | ||
| OnWorkerIdle(active_worker_addr, scheduling_key, false, | ||
| lease_entry.assigned_resources_); | ||
| break; |
There was a problem hiding this comment.
I'm not sure if we want to break here. Couldn't there be multiple idle workers that could get filled by new tasks? But I guess this depends on where we decide to put this logic (see above comment).
There was a problem hiding this comment.
I see! So I guess we would remove the break statement if we put this in SubmitTask, right?
There was a problem hiding this comment.
Hmm I need to think about that. It seems like we could structure the code so that we're always guaranteed that only one worker needs to be filled up during SubmitTask, but I'm not sure.
| auto lease_client = std::move(pending_lease_request.first); | ||
| const auto task_id = pending_lease_request.second; | ||
| pending_lease_request = std::make_pair(nullptr, TaskID::Nil()); | ||
| RAY_CHECK(lease_client); |
There was a problem hiding this comment.
I don't think this check is necessary (std::move should guarantee this already).
| .emplace(scheduling_key, std::make_pair(lease_client, task_id)) | ||
| .second); | ||
| pending_lease_request = std::make_pair(lease_client, task_id); | ||
| RAY_CHECK(pending_lease_request.first); |
There was a problem hiding this comment.
I don't think this check is necessary. The previous check was just to make sure that there wasn't already a pending lease request for the same scheduling key (arguably also not necessary).
| cancel_retry_timer_->async_wait(boost::bind( | ||
| &CoreWorkerDirectTaskSubmitter::CancelTask, this, task_spec, force_kill)); | ||
| } | ||
| } else if (status.ok() && reply.attempt_succeeded()) { |
There was a problem hiding this comment.
I don't think this is necessary because we should still get back the callback for PushNormalTask.
| std::deque<TaskSpecification> task_queue_ = std::deque<TaskSpecification>(); | ||
| // Keep track of the active workers, so that we can quickly check if one of them has | ||
| // room for more tasks in flight | ||
| absl::flat_hash_set<rpc::WorkerAddress> active_workers_ = |
There was a problem hiding this comment.
Consider making this a hashmap from rpc::WorkerAddress -> LeaseEntry instead of keeping a separate hashmap.
There was a problem hiding this comment.
Doesn't worker_to_lease_entry_ already have this mapping? Do you mean that I should just move worker_to_lease_entry_ into the SchedulingKeyEntry struct, so that each SchedulingKey will be paired to its own worker_to_lease_entry_ hashmap?
|
Hmm actually I think I'm just wrong about this one haha. Ignore that!
…On Mon, Aug 24, 2020, 5:54 PM Gabriele Oliaro ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In src/ray/core_worker/transport/direct_task_transport.cc
<#10225 (comment)>:
> + if (scheduling_key_entry.tot_tasks_in_flight <
+ scheduling_key_entry.active_workers_.size() * max_tasks_in_flight_per_worker_) {
+ // The pipelines to the current workers are not full yet, so we don't need more
+ // workers.
+
+ // Find a worker with a number of tasks in flight that is less than the maximum
+ // value (max_tasks_in_flight_per_worker_) and call OnWorkerIdle to send tasks to
+ // that worker
+ for (auto active_worker_addr : scheduling_key_entry.active_workers_) {
+ RAY_CHECK(worker_to_lease_entry_.find(active_worker_addr) !=
+ worker_to_lease_entry_.end());
+ auto &lease_entry = worker_to_lease_entry_[active_worker_addr];
+ if (lease_entry.tasks_in_flight_ < max_tasks_in_flight_per_worker_) {
+ OnWorkerIdle(active_worker_addr, scheduling_key, false,
+ lease_entry.assigned_resources_);
+ break;
I see! So I guess we would remove the break statement if we put this in
SubmitTask, right?
—
You are receiving this because you were assigned.
Reply to this email directly, view it on GitHub
<#10225 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AATREBEC6MI2XQLEOBBOARLSCMDUHANCNFSM4QGMOAUQ>
.
|
|
Yes! I'm not sure which would be cleaner. I'll leave that up to you.
…On Mon, Aug 24, 2020, 6:04 PM Gabriele Oliaro ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In src/ray/core_worker/transport/direct_task_transport.h
<#10225 (comment)>:
> };
// Map from worker address to a LeaseEntry struct containing the lease's metadata.
absl::flat_hash_map<rpc::WorkerAddress, LeaseEntry> worker_to_lease_entry_
GUARDED_BY(mu_);
- // Keeps track of pending worker lease requests to the raylet.
- absl::flat_hash_map<SchedulingKey,
- std::pair<std::shared_ptr<WorkerLeaseInterface>, TaskID>>
- pending_lease_requests_ GUARDED_BY(mu_);
+ struct SchedulingKeyEntry {
+ // Keep track of pending worker lease requests to the raylet.
+ std::pair<std::shared_ptr<WorkerLeaseInterface>, TaskID> pending_lease_request_ =
+ std::make_pair(nullptr, TaskID::Nil());
+ // Tasks that are queued for execution. We keep an individual queue per
+ // scheduling class to ensure fairness.
+ std::deque<TaskSpecification> task_queue_ = std::deque<TaskSpecification>();
+ // Keep track of the active workers, so that we can quickly check if one of them has
+ // room for more tasks in flight
+ absl::flat_hash_set<rpc::WorkerAddress> active_workers_ =
Doesn't worker_to_lease_entry_ already have this mapping? Do you mean
that I should just move worker_to_lease_entry_ into the SchedulingKeyEntry
struct, so that each SchedulingKey will be paired to its own
worker_to_lease_entry_ hashmap?
—
You are receiving this because you were assigned.
Reply to this email directly, view it on GitHub
<#10225 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AATREBFNUKZY5ARBLPPKHITSCMEY7ANCNFSM4QGMOAUQ>
.
|
|
@stephanie-wang Ok, I just pushed the updated code :) |
Co-authored-by: fangfengbin <869218239a@zju.edu.cn>
stephanie-wang
left a comment
There was a problem hiding this comment.
A few more comments :)
| task_queues_.emplace(scheduling_key, std::deque<TaskSpecification>()).first; | ||
| auto &scheduling_key_entry = scheduling_key_entries_[scheduling_key]; | ||
| scheduling_key_entry.task_queue_.push_back(task_spec); | ||
| if (scheduling_key_entry.tot_tasks_in_flight < |
There was a problem hiding this comment.
Maybe we should make this a while loop? And yeah, I think it's not correct to break after only one worker. Could you add this case to the unit test (i.e. check that we don't request a new worker if there are multiple workers that could be filled from the owner's queue). Thanks!
There was a problem hiding this comment.
Hmm okay now that I'm thinking about this again, I think it is okay to have the break statement and only dispatch to one worker! Sorry about that :)
Could you add a comment explaining why it's okay to break after one worker?
There was a problem hiding this comment.
Sounds good, let me add that right now!
| task_queues_.erase(queue_entry); | ||
| RAY_LOG(DEBUG) << "Task queue empty, canceling lease request"; | ||
| if (current_queue.empty()) { | ||
| RAY_LOG(INFO) << "Task queue empty, canceling lease request"; |
There was a problem hiding this comment.
Shouldn't we still attempt to delete the entry here? It'd be good to add a method to the new SchedulingKeyEntry struct to check whether it's safe to delete the entry (i.e. if the task queue is empty, no pending request, etc).
There was a problem hiding this comment.
I just added the new method! However, I am not sure about attempting to delete in this place in the code, because there would still be some worker in the active_workers set, so calling the new method would tell us that it's not safe to delete the entry yet
There was a problem hiding this comment.
Ahh gotcha, makes sense!
| // The pipelines to the current workers are not full yet, so we don't need more | ||
| // workers. | ||
|
|
||
| // Find a worker with a number of tasks in flight that is less than the maximum |
There was a problem hiding this comment.
Could you remove this comment?
| return Status::OK(); | ||
| } | ||
| client = maybe_client.value(); | ||
| client_addr = rpc_client->second.ToProto(); |
There was a problem hiding this comment.
I don't think we need this variable anymore.
| absl::flat_hash_set<rpc::WorkerAddress> active_workers_ = | ||
| absl::flat_hash_set<rpc::WorkerAddress>(); | ||
| // Keep track of how many tasks with this SchedulingKey are in flight, in total | ||
| uint32_t tot_tasks_in_flight = 0; |
There was a problem hiding this comment.
| uint32_t tot_tasks_in_flight = 0; | |
| uint32_t total_tasks_in_flight = 0; |
We try to use complete words for variable naming in most cases.
| pending_lease_requests_ GUARDED_BY(mu_); | ||
| struct SchedulingKeyEntry { | ||
| // Keep track of pending worker lease requests to the raylet. | ||
| std::pair<std::shared_ptr<WorkerLeaseInterface>, TaskID> pending_lease_request_ = |
There was a problem hiding this comment.
Could you remove the tail underscore at the end of these field names? Usually we do this only for private class members (see Google style guide). Realizing now that the above LeaseEntry struct doesn't follow this convention, oops :)
There was a problem hiding this comment.
Should I also remove the underscores from the field names in the LeaseEntry struct? I guess it's better late than never, right?
There was a problem hiding this comment.
That would be great, thanks! :)
Co-authored-by: Stephanie Wang <swang@cs.berkeley.edu>
|
Ok, I just pushed the updated code! |
stephanie-wang
left a comment
There was a problem hiding this comment.
Thanks, this is looking very close! I just had some questions about making sure we delete the SchedulingKeyEntry properly. It would also be good to add unit tests for that, to make sure we're not leaking any memory.
| scheduling_key_entry.task_queue.push_back(task_spec); | ||
| if (scheduling_key_entry.total_tasks_in_flight < | ||
| scheduling_key_entry.active_workers.size() * | ||
| max_tasks_in_flight_per_worker_) { |
There was a problem hiding this comment.
Suggest wrapping this in a const method of the SchedulingKeyEntry (e.g., HasAvailableWorkers()) to make it more readable!
There was a problem hiding this comment.
I was wondering, what do you mean by a const method? Also, I had to define the method outside of the SchedulingKeyEntry struct to be able to access the max_tasks_in_flight_per_worker_ member of CoreWorkerDirectTaskSubmitter. Otherwise, the compiler reported an error that I did not know how to fix. Do you know if there is a way to access max_tasks_in_flight_per_worker_ from a method within the SchedulingKeyEntry struct?
There was a problem hiding this comment.
Ah I just mean to mark that the method does not modify the SchedulingKeyEntry, like this:
CanDelete() const { ... }
Yes, it shouldn't be able to access it because it's a private member of CoreWorkerDirectTaskSubmitter. You can fix it by allowing CanDelete to take the max tasks as an argument.
There was a problem hiding this comment.
Sounds good! I'll add the const keyword, and allow CanDelete() (and the other two methods I added to check whether the pipeline fullness) to take max_tasks_in_flight_per_worker_ as an argument, so that I can place the function inside the struct
| RAY_CHECK(worker_to_lease_entry_.find(active_worker_addr) != | ||
| worker_to_lease_entry_.end()); | ||
| auto &lease_entry = worker_to_lease_entry_[active_worker_addr]; | ||
| if (lease_entry.tasks_in_flight < max_tasks_in_flight_per_worker_) { |
There was a problem hiding this comment.
Suggest wrapping this in a const method of the WorkerLeaseEntry to make it more readable!
|
|
||
| auto &task_queue = scheduling_key_entry.task_queue; | ||
| if (task_queue.empty()) { | ||
| // We don't have any of this type of task to run. |
There was a problem hiding this comment.
Shouldn't we check if it's okay to delete the SchedulingKeyEntry here too?
| absl::MutexLock lock(&mu_); | ||
| worker_to_lease_entry_.erase(addr); | ||
| auto &scheduling_key_entry = scheduling_key_entries_[scheduling_key]; | ||
| scheduling_key_entry.active_workers.erase(addr); |
There was a problem hiding this comment.
Do we need to check if we should delete the SchedulingKeyEntry here?
| if (scheduled_tasks->second.empty()) { | ||
| task_queues_.erase(scheduling_key); | ||
| if (scheduled_tasks.empty()) { | ||
| CancelWorkerLeaseIfNeeded(scheduling_key); |
There was a problem hiding this comment.
Do we need to check whether we should delete the SchedulingKeyEntry here?
There was a problem hiding this comment.
Yes! However, I think that the check should be placed inside the callback function in CancelWorkerLeaseIfNeeded, because the callback can call CancelWorkerLeaseIfNeeded as well, and CancelWorkerLeaseIfNeeded needs to access the SchedulingKeyEntry
Co-authored-by: Stephanie Wang <swang@cs.berkeley.edu>
I was wondering what type of unit test you had in mind to check that we are not leaking memory by forgetting to delete some entries in the |
|
Yes, you can either add a public function to check the size or you can make the unit test class a |
| if (scheduling_key_entries_.size() != 0) { | ||
| RAY_LOG(INFO) << "size: " << scheduling_key_entries_.size(); | ||
| } | ||
| return scheduling_key_entries_.size() == 0; |
There was a problem hiding this comment.
You can also use scheduling_key_entries_.empty() here!
There was a problem hiding this comment.
Good call! Just changed this. I also removed the RAY_LOG(INFO) line
stephanie-wang
left a comment
There was a problem hiding this comment.
Looks great! I'll merge once Travis finishes.
|
Hey @gabrieleoliaro, looks like there is a build error. Could you fix it? https://api.travis-ci.com/v3/job/378975005/log.txt |
|
@gabrieleoliaro, I'm not sure if that last commit will fix the issue. That annotation means that the compiler should check that the mutex is held whenever that method is called. But the method is public and the mutex is private, so I don't think it will work. You can fix the error by acquiring the lock inside the method! |
Just pushed! |
sorry, this was just a first commit. I was not done yet :) |
|
https://travis-ci.com/github/ray-project/ray/jobs/379015992 Hi @gabrieleoliaro , the java ci job failure is related to the pr, pls help take a look, thanks. |
Really? Seems unlikely, the test is failing on master too: https://travis-ci.com/github/ray-project/ray/jobs/379177286 |
Sorry, It should be irrelevant. |
Why are these changes needed?
These changes are needed to avoid over-requesting new workers when using pipelining to submit tasks from owners to their workers. When a new task is submitted to an owner, the code first tries to send the task to an existing worker if the number of tasks in flight to that worker is less than the maximum allowed by the pipelining settings. If all pipelines to all workers are full, then it requests a new worker.
Related issue number
Checks
scripts/format.shto lint the changes in this PR.