[Core] Improve scheduling observability and fix wrong resource deadlock report message.#19746
Conversation
| num_parents = 6 | ||
| num_children = 6 | ||
| death_probability = 0.95 | ||
| # TODO(sang): Currently setting this to 10 creates segfault |
There was a problem hiding this comment.
This will be handled in the next PR
src/ray/common/task/task.h
Outdated
|
|
||
| /// Override cancellation behaviour. | ||
| void OnCancellationInstead(const CancelTaskCallback &callback) { | ||
| on_cancellation_ = callback; |
src/ray/raylet/node_manager.cc
Outdated
|
|
||
| std::string error_message_str = error_message.str(); | ||
| RAY_LOG(WARNING) << error_message_str; | ||
| RAY_LOG(WARNING) << cluster_task_manager_->DebugStr(); |
There was a problem hiding this comment.
will help us figuring exactly why warnings are printed (while it is not printed to users)
There was a problem hiding this comment.
we can eventually use the event here
There was a problem hiding this comment.
what's the overhead of this? does it make sense to use RAY_LOG_EVER_MS...?
There was a problem hiding this comment.
The function itself's overhead is probably not too big because I believe we shouldn't have too many pending tasks at a time normally.
But RAY_LOG_EVER_MS sounds not bad just in case. My question is what's the semantic? If I use
RAY_LOG_EVER_MS(log, 1000)
Does that mean we can print this maximum 1 per second?
There was a problem hiding this comment.
Limited to once per 10 seconds
|
|
||
| // If the work is not in the waiting state, it will be scheduled soon or won't be | ||
| // scheduled. Consider as non-pending. | ||
| if (work.GetState() != internal::WorkStatus::WAITING) { |
There was a problem hiding this comment.
This is the RFC; So, the approach is like this;
We only count tasks/actors that are actually waiting for resources. We don't return any pending tasks if they were pending because of other reasons (e.g., workers are failed to be started).
| int, std::deque<std::shared_ptr<internal::Work>>> | ||
| &pair) { | ||
| const auto &work_queue = pair.second; | ||
| for (auto work_it = work_queue.begin(); work_it != work_queue.end();) { |
There was a problem hiding this comment.
This could add additional overhead for DebugStr(). Since we usually don't have long queue in the raylet, I assume it won't be likely an issue, but if you think we need to hide it behind Debug flag, lmk
There was a problem hiding this comment.
In the long term, we can optimize it to update live
There was a problem hiding this comment.
Do you think it makes sense to also add STATS here? It's really simple, some sample code here:
https://github.com/ray-project/ray/blob/master/src/ray/common/asio/instrumented_io_context.cc#L23
I think when worker states change, we probably can just update it there. So even we don't print debug string, we still can have some insights about this.
There was a problem hiding this comment.
DebugString is always periodically called (I don't know if there's an option to not do that).
What about we just call this in this DebugStr() method?
|
If the high level approaches make sense, I will also add an unit test for stats. |
|
Love the direction this PR is going! @iycheng @wuisawesome also take a look? |
| // `TaskSpec` is determined at submission time. | ||
| message TaskExecutionSpec { | ||
| // The last time this task was received for scheduling. | ||
| double last_timestamp = 2; |
There was a problem hiding this comment.
i'd rather not making this backward incompatible change.
There was a problem hiding this comment.
I think for this path, it is fine because 1. it is not used anywhere 2. For task spec, backward compatibility doesn't matter IIUC (only thing that it matters is the RPC that's used by the autoscaler because that's the only path that different ray version can communicate to each other). But if you still don't like it I can just keep it
| << "\n" | ||
| << "Available resources on this node: " << available_resources | ||
| << "In total there are " << pending_tasks << " pending tasks and " | ||
| << "Available resources on this node: " |
There was a problem hiding this comment.
is it possible to print the reason why this worker can't be scheduled here as well?
There was a problem hiding this comment.
I believe the log above explains it? Or do you want to include workers not starting warnings here? (that requires the semantic change of this method because it "warns resource deadlock", which don't include worker startup failure imo)
There was a problem hiding this comment.
hmm maybe we should warn for both 1. resource deadlock + workers are not started properly
src/ray/raylet/node_manager.cc
Outdated
|
|
||
| std::string error_message_str = error_message.str(); | ||
| RAY_LOG(WARNING) << error_message_str; | ||
| RAY_LOG(WARNING) << cluster_task_manager_->DebugStr(); |
There was a problem hiding this comment.
what's the overhead of this? does it make sense to use RAY_LOG_EVER_MS...?
| WORKER_NOT_FOUND_LATE_LIMITED, | ||
| }; | ||
|
|
||
| /// Work represents all the information needed to make a scheduling decision. |
There was a problem hiding this comment.
Now since we are here, does it make sense to have comments to describe all possible state for the "Work"?
Also we might want find a better name for "Work" (not in this PR)
There was a problem hiding this comment.
Also we might want find a better name for "Work" (not in this PR)
+1
Now since we are here, does it make sense to have comments to describe all possible state for the "Work"?
isn't this already kind of described in the enum?
|
My only concern is that without DEFINE_stats, it's not easy to observe for a cluster, basically you need to dump everything and check for each raylet. And also you will loose the time series data which might be useful for debugging. |
src/ray/raylet/node_manager.cc
Outdated
|
|
||
| std::string error_message_str = error_message.str(); | ||
| RAY_LOG(WARNING) << error_message_str; | ||
| RAY_LOG(WARNING) << cluster_task_manager_->DebugStr(); |
| } else if (status == PopWorkerStatus::WorkerPendingRegistration) { | ||
| cause = internal::UnscheduledWorkCause::WORKER_NOT_FOUND_REGISTRATION_TIMEOUT; | ||
| } else { | ||
| RAY_LOG(FATAL) << "Unexpected state received for the empty pop worker."; |
There was a problem hiding this comment.
will this be too aggressive?
There was a problem hiding this comment.
Imo, this is fine because receiving other states without updating this code path is regression (or a bug). (but let me know if you have other alternative! )
@iycheng To be clear, I am totally on the same page. I think my opinion here is just that we always dump debug string, so we can call Record method within that function (not everytime the state is changed) |
|
The unit tests / integration tests will be added in the follow up PR |
|
@edoakes this PR will correctly report the "resource deadlock" |
|
@edoakes can you provide me the test script that I can verify this? |
|
cc @iycheng I will do following things in the follow-up next week.
|
|
@rkooo567 sorry a bit late here, but you can try: ray.init()
@ray.remote(runtime_env={"pip": ["tensorflow", "torch"]})
def f():
pass
# Check no warning printed.
ray.get(f.remote()) |
|
This is the state while running that particular test, and it doesn't print any deadlock message. This is the expected behavior right? (btw, this task seems to run pretty long time... is it normal? ) EDITIt is finished without printing spurious resource deadlock error, but I am seeing this. is it expected? or is it due to my setup? |
|
Ah @rkooo567 can you try running with |
|
Turns out the log was buried into the top of very long conda log messages. |
Why are these changes needed?
This PR is the replacement of #19720.
It is doing 3 things.
cluster_task_manager->DebugStr().This is an example regarding how the better observability helps us fixing issues.
For the incorrect actor scheduling error message in this particular test, the problem was that the process startups were rate limited because we starts more workers than the threshold at once;
Related issue number
closes #19207 (90% confidence) #19427
Checks
scripts/format.shto lint the changes in this PR.