Skip to content

[Core] Improve scheduling observability and fix wrong resource deadlock report message.#19746

Merged
ericl merged 15 commits intoray-project:masterfrom
rkooo567:improve-scheduling-observability
Oct 28, 2021
Merged

[Core] Improve scheduling observability and fix wrong resource deadlock report message.#19746
ericl merged 15 commits intoray-project:masterfrom
rkooo567:improve-scheduling-observability

Conversation

@rkooo567
Copy link
Copy Markdown
Contributor

@rkooo567 rkooo567 commented Oct 26, 2021

Why are these changes needed?

This PR is the replacement of #19720.

It is doing 3 things.

  1. Improve scheduling observability. Currently, it is hard to know why tasks are not scheduled when things are hanging. This PR adds per task "state" that indicates why tasks are not scheduled. The states are trackable through the cluster_task_manager->DebugStr().
  2. Add a threaded actor stress test. Currently it prints the segfault occasionally when max_concurrency == 10, so we will run it with a single threaded actor. The issue is tracked here [Bug] Threaded actor stress test invokes SIGSEGV #19748
  3. Fix [Core][Bug] ray nodes get into a bad state and actor can't be scheduled #19207. I could somehow repro this issue, and it seems like the the problem was that we print resource deadlock warning when tasks/actors are not actually waiting for resources to be available. The first task (improving observability) allows us to correctly identify when to raise an error. Please check the code for more details (the approach could be controversial). Note that I am not 100% sure if the issue I could repro is the exactly the same issue or not.

This is an example regarding how the better observability helps us fixing issues.

For the incorrect actor scheduling error message in this particular test, the problem was that the process startups were rate limited because we starts more workers than the threshold at once;

(raylet) Infeasible queue length: 0
(raylet) Schedule queue length: 0
(raylet) Dispatch queue length: 15
(raylet) num_waiting_for_resource: 0
(raylet) num_waiting_for_plasma_memory: 0
(raylet) num_waiting_for_remote_node_resources: 0
(raylet) num_worker_not_started_by_job_config_not_exist: 0
(raylet) num_worker_not_started_by_registration_timeout: 0
(raylet) num_worker_not_started_by_process_rate_limit: 13
(raylet) num_worker_waiting_for_workers: 2

Related issue number

closes #19207 (90% confidence) #19427

Checks

  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

num_parents = 6
num_children = 6
death_probability = 0.95
# TODO(sang): Currently setting this to 10 creates segfault
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be handled in the next PR


/// Override cancellation behaviour.
void OnCancellationInstead(const CancelTaskCallback &callback) {
on_cancellation_ = callback;
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not used


std::string error_message_str = error_message.str();
RAY_LOG(WARNING) << error_message_str;
RAY_LOG(WARNING) << cluster_task_manager_->DebugStr();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will help us figuring exactly why warnings are printed (while it is not printed to users)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we can eventually use the event here

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the overhead of this? does it make sense to use RAY_LOG_EVER_MS...?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function itself's overhead is probably not too big because I believe we shouldn't have too many pending tasks at a time normally.

But RAY_LOG_EVER_MS sounds not bad just in case. My question is what's the semantic? If I use

RAY_LOG_EVER_MS(log, 1000)

Does that mean we can print this maximum 1 per second?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's correct.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Limited to once per 10 seconds


// If the work is not in the waiting state, it will be scheduled soon or won't be
// scheduled. Consider as non-pending.
if (work.GetState() != internal::WorkStatus::WAITING) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the RFC; So, the approach is like this;

We only count tasks/actors that are actually waiting for resources. We don't return any pending tasks if they were pending because of other reasons (e.g., workers are failed to be started).

int, std::deque<std::shared_ptr<internal::Work>>>
&pair) {
const auto &work_queue = pair.second;
for (auto work_it = work_queue.begin(); work_it != work_queue.end();) {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could add additional overhead for DebugStr(). Since we usually don't have long queue in the raylet, I assume it won't be likely an issue, but if you think we need to hide it behind Debug flag, lmk

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the long term, we can optimize it to update live

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think it makes sense to also add STATS here? It's really simple, some sample code here:

https://github.com/ray-project/ray/blob/master/src/ray/common/asio/instrumented_io_context.cc#L23

I think when worker states change, we probably can just update it there. So even we don't print debug string, we still can have some insights about this.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

DebugString is always periodically called (I don't know if there's an option to not do that).

What about we just call this in this DebugStr() method?

@rkooo567
Copy link
Copy Markdown
Contributor Author

If the high level approaches make sense, I will also add an unit test for stats.

@scv119
Copy link
Copy Markdown
Contributor

scv119 commented Oct 26, 2021

Love the direction this PR is going! @iycheng @wuisawesome also take a look?

// `TaskSpec` is determined at submission time.
message TaskExecutionSpec {
// The last time this task was received for scheduling.
double last_timestamp = 2;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'd rather not making this backward incompatible change.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think for this path, it is fine because 1. it is not used anywhere 2. For task spec, backward compatibility doesn't matter IIUC (only thing that it matters is the RPC that's used by the autoscaler because that's the only path that different ray version can communicate to each other). But if you still don't like it I can just keep it

<< "\n"
<< "Available resources on this node: " << available_resources
<< "In total there are " << pending_tasks << " pending tasks and "
<< "Available resources on this node: "
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is it possible to print the reason why this worker can't be scheduled here as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe the log above explains it? Or do you want to include workers not starting warnings here? (that requires the semantic change of this method because it "warns resource deadlock", which don't include worker startup failure imo)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm maybe we should warn for both 1. resource deadlock + workers are not started properly


std::string error_message_str = error_message.str();
RAY_LOG(WARNING) << error_message_str;
RAY_LOG(WARNING) << cluster_task_manager_->DebugStr();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the overhead of this? does it make sense to use RAY_LOG_EVER_MS...?

WORKER_NOT_FOUND_LATE_LIMITED,
};

/// Work represents all the information needed to make a scheduling decision.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now since we are here, does it make sense to have comments to describe all possible state for the "Work"?

Also we might want find a better name for "Work" (not in this PR)

Copy link
Copy Markdown
Contributor Author

@rkooo567 rkooo567 Oct 27, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also we might want find a better name for "Work" (not in this PR)

+1

Now since we are here, does it make sense to have comments to describe all possible state for the "Work"?

isn't this already kind of described in the enum?

@rkooo567 rkooo567 added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Oct 27, 2021
@rkooo567 rkooo567 removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Oct 27, 2021
@fishbone
Copy link
Copy Markdown
Contributor

My only concern is that without DEFINE_stats, it's not easy to observe for a cluster, basically you need to dump everything and check for each raylet. And also you will loose the time series data which might be useful for debugging.
But if you think it fits your business which I don't have that much context, I'm ok with this.


std::string error_message_str = error_message.str();
RAY_LOG(WARNING) << error_message_str;
RAY_LOG(WARNING) << cluster_task_manager_->DebugStr();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

that's correct.

} else if (status == PopWorkerStatus::WorkerPendingRegistration) {
cause = internal::UnscheduledWorkCause::WORKER_NOT_FOUND_REGISTRATION_TIMEOUT;
} else {
RAY_LOG(FATAL) << "Unexpected state received for the empty pop worker.";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will this be too aggressive?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Imo, this is fine because receiving other states without updating this code path is regression (or a bug). (but let me know if you have other alternative! )

@scv119 scv119 added the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Oct 27, 2021
@rkooo567
Copy link
Copy Markdown
Contributor Author

My only concern is that without DEFINE_stats, it's not easy to observe for a cluster, basically you need to dump everything and check for each raylet. And also you will loose the time series data which might be useful for debugging.

@iycheng To be clear, I am totally on the same page. I think my opinion here is just that we always dump debug string, so we can call Record method within that function (not everytime the state is changed)

@rkooo567
Copy link
Copy Markdown
Contributor Author

The unit tests / integration tests will be added in the follow up PR

@rkooo567 rkooo567 assigned edoakes and unassigned ericl Oct 28, 2021
@rkooo567
Copy link
Copy Markdown
Contributor Author

@edoakes this PR will correctly report the "resource deadlock"

@rkooo567
Copy link
Copy Markdown
Contributor Author

@edoakes can you provide me the test script that I can verify this?

@rkooo567
Copy link
Copy Markdown
Contributor Author

cc @iycheng

I will do following things in the follow-up next week.

  • More tests (as discussed with @scv119)
  • DEFINE_STATS
  • the test that proves it doesn't print this error msg upon runtime env failures (if I can get the repro quickly by tomorrow, I will just add them here).

@rkooo567 rkooo567 removed the @author-action-required The PR author is responsible for the next step. Remove tag to send back to the reviewer. label Oct 28, 2021
Copy link
Copy Markdown
Contributor

@ericl ericl left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Proto change LGTM

@ericl ericl merged commit 96fc875 into ray-project:master Oct 28, 2021
@edoakes
Copy link
Copy Markdown
Collaborator

edoakes commented Oct 28, 2021

@rkooo567 sorry a bit late here, but you can try:

ray.init()

@ray.remote(runtime_env={"pip": ["tensorflow", "torch"]})
def f():
    pass

# Check no warning printed.
ray.get(f.remote())

@rkooo567
Copy link
Copy Markdown
Contributor Author

rkooo567 commented Oct 28, 2021

@edoakes

This is the state while running that particular test, and it doesn't print any deadlock message. This is the expected behavior right? (btw, this task seems to run pretty long time... is it normal? )

num_waiting_for_resource: 0
num_waiting_for_plasma_memory: 0
num_waiting_for_remote_node_resources: 0
num_worker_not_started_by_job_config_not_exist: 0
num_worker_not_started_by_registration_timeout: 0
num_worker_not_started_by_process_rate_limit: 0
num_worker_waiting_for_workers: 1
num_cancelled_tasks: 0

EDIT

It is finished without printing spurious resource deadlock error, but I am seeing this.

(raylet) Traceback (most recent call last):
(raylet)   File "/Users/sangbincho/work/ray/python/ray/workers/default_worker.py", line 8, in <module>
(raylet)     import ray
(raylet) ModuleNotFoundError: No module named 'ray'

is it expected? or is it due to my setup?

@architkulkarni
Copy link
Copy Markdown
Contributor

Ah @rkooo567 can you try running with RAY_RUNTIME_ENV_LOCAL_DEV_MODE=1? You need that flag if you're using pip or conda in the runtime env with Ray built from source.

@rkooo567
Copy link
Copy Markdown
Contributor Author

Turns out the log was buried into the top of very long conda log messages.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Core][Bug] ray nodes get into a bad state and actor can't be scheduled

7 participants