Ensure steal requests from same-IP but distinct workers are rejected#6585
Ensure steal requests from same-IP but distinct workers are rejected#6585
Conversation
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 15 files ±0 15 suites ±0 7h 29m 35s ⏱️ + 12m 45s For more details on these failures and errors, see this check. Results for commit b20ada6. ± Comparison against base commit e6cc40a. ♻️ This comment has been updated with latest results. |
|
I missed the comment about the worker closing again in #6263 (comment) |
distributed/stealing.py
Outdated
| def remove_worker(self, scheduler: Scheduler, worker: str): | ||
| del self.stealable[worker] |
There was a problem hiding this comment.
FWIW I thought about modifying / tracking removed workers and match up with in_flight. That led me towards a "remember cancellation" mechanism which was way too complicated.
There was a problem hiding this comment.
What about something like (with appropriate KeyError handling)
levels = self.stealable.pop(worker)
for level_i, level_tasks in enumerate(levels):
for ts in level_tasks:
self.stealable_all[level_i].remove(ts)
self.key_stealable.pop(ts, None)
self.in_flight.pop(ts) # I don't know??
# Maybe??:
self.in_flight_occupancy.pop(worker) # this may cause KeyErrors in other places, need to handle
if not self.in_flight:
self.in_flight_occupancy.clear()
self._in_flight_event.set()I'm just copying from
distributed/distributed/stealing.py
Lines 193 to 206 in cb88e3b
and mixing in
distributed/distributed/stealing.py
Lines 166 to 177 in cb88e3b
(but I'm not sure if the second part is necessary, because it would happen anyway when
remove_worker transitions any tasks processing on the worker to released.)
I don't know if this is necessary or not.
There was a problem hiding this comment.
I tried something very similar and ended up deleting it again because of complexity.
There was a problem hiding this comment.
Okay. I'm a little hesitant about not clearing out the state properly, but I guess it's ok.
There was a problem hiding this comment.
See also #6600 I would like to clarify the discussion on that ticket before we implement any more complex logic for work stealing
|
There are failing tests |
|
I wonder why the FutureWarning didn't fail for me locally. Is there anything I need to enable for this to cause a failure? |
gjoseph92
left a comment
There was a problem hiding this comment.
It's probably less important, but could the metrics get thrown off here by stale workers?
distributed/distributed/stealing.py
Lines 170 to 177 in cb88e3b
distributed/stealing.py
Outdated
| def remove_worker(self, scheduler: Scheduler, worker: str): | ||
| del self.stealable[worker] |
There was a problem hiding this comment.
What about something like (with appropriate KeyError handling)
levels = self.stealable.pop(worker)
for level_i, level_tasks in enumerate(levels):
for ts in level_tasks:
self.stealable_all[level_i].remove(ts)
self.key_stealable.pop(ts, None)
self.in_flight.pop(ts) # I don't know??
# Maybe??:
self.in_flight_occupancy.pop(worker) # this may cause KeyErrors in other places, need to handle
if not self.in_flight:
self.in_flight_occupancy.clear()
self._in_flight_event.set()I'm just copying from
distributed/distributed/stealing.py
Lines 193 to 206 in cb88e3b
and mixing in
distributed/distributed/stealing.py
Lines 166 to 177 in cb88e3b
(but I'm not sure if the second part is necessary, because it would happen anyway when
remove_worker transitions any tasks processing on the worker to released.)
I don't know if this is necessary or not.
distributed/tests/test_steal.py
Outdated
| while len_before == len(s.events["stealing"]): | ||
| await asyncio.sleep(0.1) | ||
|
|
||
| assert victim_ts.processing_on != wsB |
There was a problem hiding this comment.
What should it be processing on? Because wsA confirmed the steal request, so wsA has released the task. Stealing called Scheduler.reschedule which transitioned it to released:
distributed/distributed/scheduler.py
Lines 6561 to 6579 in b59a322
So I think I'd expect something like assert victim_ts.state == "released". So if reschedule goes through a full transitions cycle, you'd actually expect it to go processing->released->waiting->processing and be assigned to a worker (probably wsB2 since it's idle).
I guess I'd expect something like assert victim_ts.processing_on in (wsA, wsB2) and assert victim_ts.state == "processing". We want to be sure we don't forget about this task, since that would cause a deadlock too.
There was a problem hiding this comment.
I'm asserting that it's not the stale WorkerState. Everything else is beyond the scope of this unit test. We'll reschedule the task and the scheduler can make a decision. I don't care as long as I get my result, i.e. the futures complete.
I prefer not asserting too much here. From a high level, I don't even care at this point that we're going through full rescheduling.
As long as |
|
I removed |
|
If any other cosmetic changes pop up, feel free to push on this branch so we can merge this asap |
gjoseph92
left a comment
There was a problem hiding this comment.
I don't have permission to push to your branch. Could you add
diff --git a/distributed/stealing.py b/distributed/stealing.py
index 2c78704c..30b99175 100644
--- a/distributed/stealing.py
+++ b/distributed/stealing.py
@@ -332,12 +332,12 @@ class WorkStealing(SchedulerPlugin):
state in _WORKER_STATE_UNDEFINED
# If our steal information is somehow stale we need to reschedule
or state in _WORKER_STATE_CONFIRM
- and thief != self.scheduler.workers.get(thief.address)
+ and thief is not self.scheduler.workers.get(thief.address)
):
self.log(
(
"reschedule",
- thief.address not in self.scheduler.workers,
+ thief is not self.scheduler.workers.get(thief.address),
*_log_msg,
)
)
distributed/stealing.py
Outdated
| def remove_worker(self, scheduler: Scheduler, worker: str): | ||
| del self.stealable[worker] |
There was a problem hiding this comment.
Okay. I'm a little hesitant about not clearing out the state properly, but I guess it's ok.
|
It seems you have some tests that are actually failing, |
| if not isinstance(other, WorkerState): | ||
| return False | ||
| return hash(self) == hash(other) | ||
|
|
There was a problem hiding this comment.
#6593 instead makes it explicit, and explains why the decision. I think we should agree on one or the other and be consistent.
def __hash__(self) -> int:
# TODO eplain
return id(self)
def __eq__(self, other: object) -> bool:
# TODO explain
return other is selfThere was a problem hiding this comment.
#6593 adds to worker_state_machine.WorkerState.validate_state a check that you can never have two instances of a TaskState with the same key in any of its sets. I think scheduler.SchedulerState.validate_state should have the same logic.
There was a problem hiding this comment.
I'm not entirely convinced that this is necessary. There are many collections and this particular problem was not even a collection on the scheduler itself but rather an extension.
Specifically, without https://github.com/dask/distributed/pull/6585/files#r899011537 this condition would not even be true and I don't think we should implement the full remove_worker cleanup on the stealing extension
|
The failing test_feed is actually interesting since is performs a cloudpickle roundtrip and asserts that WorkerStates are rountrip-able, i.e. they compare equal after the roundtrip. If we go for a compare by python ID approach, this is clearly not working any longer. I would even go as far as to say that if we compare by python ID we should not allow any serialization of the object. By extension, the same would be true for TaskState objects. Right now, we're not actually relying on this being serializable. The only place where we're serializing this right now is when we're raising a |
5ba8296 to
ba31256
Compare
|
I opted for a different approach and am using the Worker.id / Server.id attribute now to uniquely identify the WorkerState object. This is based on a type 4 UUID such that I have sufficient confidence in it's uniqueness. The scheduler is verifying that addresses are not overwritten so the |
|
I kinda dislike the UUID. There simply never should be multiple diff --git a/distributed/tests/test_scheduler.py b/distributed/tests/test_scheduler.py
index 82d40fd6..75bcd297 100644
--- a/distributed/tests/test_scheduler.py
+++ b/distributed/tests/test_scheduler.py
@@ -423,14 +423,14 @@ async def test_scheduler_init_pulls_blocked_handlers_from_config(s):
@gen_cluster()
async def test_feed(s, a, b):
def func(scheduler):
- return dumps(dict(scheduler.workers))
+ return dumps(list(scheduler.workers))
comm = await connect(s.address)
await comm.write({"op": "feed", "function": dumps(func), "interval": 0.01})
for i in range(5):
response = await comm.read()
- expected = dict(s.workers)
+ expected = list(s.workers)
assert cloudpickle.loads(response) == expected
await comm.close() |
818d207 to
fa090b6
Compare
I consider the UUID useful beyond this deadlock fix. It is the only entity that can uniquely identify a server across the entire cluster. It can be logged, it can be shared, it is meaningful outside of the scheduler process. This is also how we're identifying clients inside and outside of the scheduler. Adding the I would like us to focus on fixing this issue now and not escalate into whether or not certain objects are serializable or roundtrip-able or not. This question popped up a couple of times recently and there doesn't appear to be proper consensus, e.g.
I don't want this conversation blocking the fix of this issue. We can still come back and remove |
|
I dislike the UUID for TaskStates. It's a fairly expensive function (3us on my beefed-up host), which would make it unsavory for I think I would be happy to just state "thou cannot round-trip a TaskState from scheduler to worker and back". On the other hand, I think that a full pickle dump of the |
|
The current code is prone to hash collisions. def __eq__(self, other: object) -> bool:
if not isinstance(other, WorkerState):
return False
return hash(self) == hash(other)to def __eq__(self, other: object) -> bool:
return isinstance(other, WorkerState) and other.server_id == self.server_idIf you put this one change in, I'm happy to sign off the PR. |
Yes. This is a hard No for TaskStates. I'm simply using an already generated UUID in this case. I would like to keep this ID part of the WorkerState even if we decide to forbid any kind of serialization and we start comparing by IDs
Done. While I'm not overly concerned about hash collisions, it is indeed not a great idea to reuse hash for equal since afaik dicts are using equal to resolve hash collisions... |
Closes #6356
Closes #6198
Closes #6263
#6356 describes a situation where a worker may die and a new worker with the same IP would connect to the scheduler. This could mess up our stealing logic since the WorkerState objects we're referencing there would reference the wrong worker, i.e. state between the scheduler and stealing extension would drift.
With the test in this draft PR I could prove that this is indeed the case. However, so far nothing bad happens. Upon task completion, the scheduler would issue a
Unexpected worker completed taskmessage and send acancel-computeevent to the worker. The worker would ignore this event since the task is in state memory.Before I fix the state drift I want to poke at this further since this is supposedly responsible for a couple of deadlocks. I would like to confirm that this is the only trigger for these deadlocks and there is nothing else going on.
This could definitely explain how a dead worker is shown on the dashboard #6198 even if the deadlock was unrelated
FWIW I was always suspicious why this
Unexpected worker completed taskwas necessary and struggled to reproduce it. This finally sheds some light onto it and I actually hope that we can get rid of this message and therefore thecancel-computeevent entirely.