data_needed exclusively contains tasks in fetch state#6481
data_needed exclusively contains tasks in fetch state#6481crusaderky merged 5 commits intodask:mainfrom
Conversation
63014ad to
a032754
Compare
| # Ensure maxsize is respected | ||
| l["d"] = 4 | ||
| assert len(l) == 3 | ||
| assert list(l.keys()) == ["c", "a", "d"] |
There was a problem hiding this comment.
Moved from test_utils.py
|
|
||
| await fut2 | ||
|
|
||
|
|
There was a problem hiding this comment.
These tests don't make sense anymore. Also, one of them directly tampers with the state which is a big no-no.
|
|
||
| @gen_cluster(client=True) | ||
| async def test_gather_dep_exception_one_task(c, s, a, b): | ||
| """Ensure an exception in a single task does not tear down an entire batch of gather_dep |
There was a problem hiding this comment.
This is misleading.
This test was testing resilience to an exception in the transitions of a single task after gather_dep - which should be dealt with through @fail_hard.
A legitimate exception in a single key of the bundle in gather_dep, namely a task that fails to unpickle, does make the whole gather_dep fail for all tasks. There's no code whatsoever to deal with this use case.
3ee77db to
b6f5467
Compare
b6f5467 to
74640b8
Compare
|
This PR also removes a leak condition where a TaskState object remains in data_needed_per_worker after it's been forgotten. |
|
Ready for final review and merge @fjetter |
| elif ts not in recommendations: | ||
| ts.who_has.discard(worker) | ||
| self.has_what[worker].discard(ts.key) | ||
| self.data_needed_per_worker[worker].discard(ts) |
There was a problem hiding this comment.
How could it even ben in here? Shouldn't it have been removed when the task transitioned to flight?
There was a problem hiding this comment.
No, there could have been a transition to fetch in the meantime.
5e1c229 to
23e6698
Compare
|
All of @gjoseph92's comments have been addressed |
fjetter
left a comment
There was a problem hiding this comment.
I just peeked over it and would trust @gjoseph92 with a more thorough review. I'm looking forward to this
| return value | ||
| heapq.heappop(self._heap) | ||
|
|
||
| def pop(self) -> T: |
There was a problem hiding this comment.
While the Heap prefix of the class already implies some non-constant operation, I wouldn't mind documenting the actual complexity since this can differ for heaps. However, I think the stdlib doesn't document this properly either and I'm ok with skipping this.
peek() is O(1) if you treat the bit that calls heappop as delayed housekeeping - e.g. you account for it in discard().
I think the amortized time for both peek and discard are constant. I don't think we should dive into deep algorithm complexity analysis here, though :)
| break | ||
| tasks.pop() | ||
| deps.add(ts.key) | ||
| self.data_needed.remove(ts) |
| if ts.state == "fetch": | ||
| self.data_needed_per_worker[worker].remove(ts) |
There was a problem hiding this comment.
| if ts.state == "fetch": | |
| self.data_needed_per_worker[worker].remove(ts) | |
| self.data_needed_per_worker[worker].discard(ts) |
There was a problem hiding this comment.
I'd much rather fail loudly if for any reason the assumption that fetch state and existence in data_needed are inextricably bound together fails
Uh oh!
There was an error while loading. Please reload this page.