Skip to content

Resolve stale TaskStates having colliding hashes#7528

Open
gjoseph92 wants to merge 3 commits intodask:mainfrom
gjoseph92:taskstate-no-hash
Open

Resolve stale TaskStates having colliding hashes#7528
gjoseph92 wants to merge 3 commits intodask:mainfrom
gjoseph92:taskstate-no-hash

Conversation

@gjoseph92
Copy link
Copy Markdown
Collaborator

Closes #7504, closes #7510

  • Tests added / passed
  • Passes pre-commit run --all-files

@gjoseph92 gjoseph92 requested a review from fjetter February 9, 2023 03:11
@gjoseph92 gjoseph92 self-assigned this Feb 9, 2023
# Even without this assertion, if the old `f-1` and new `f-1` had colliding hashes,
# the compute would fail when the old `f-1` tries to transition
# forgotten->processing.
assert ts1 not in s.queued
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the only place that actually raises an assertion but this is not user visible. Can you please write the test in a way that shows a wrong behavior without asserting on internals?

For most data structures it would actually be perfectly fine to verify that the popped task is still valid. the fact that this causes problems in the heap is due to its hash based deduplication.
This test builds on many assumptions and I doubt it would survive a refactor.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As noted in the comment, the test fails without this assertion.

Traceback (most recent call last):
  File "/Users/gabe/dev/distributed/distributed/utils.py", line 741, in wrapper
    return await func(*args, **kwargs)
  File "/Users/gabe/dev/distributed/distributed/scheduler.py", line 4231, in add_worker
    await self.handle_worker(comm, address)
  File "/Users/gabe/dev/distributed/distributed/scheduler.py", line 5470, in handle_worker
    await self.handle_stream(comm=comm, extra={"worker": worker})
  File "/Users/gabe/dev/distributed/distributed/core.py", line 904, in handle_stream
    handler(**merge(extra, msg))
  File "/Users/gabe/dev/distributed/distributed/scheduler.py", line 5336, in handle_task_finished
    self.stimulus_queue_slots_maybe_opened(stimulus_id=stimulus_id)
  File "/Users/gabe/dev/distributed/distributed/scheduler.py", line 4617, in stimulus_queue_slots_maybe_opened
    assert qts.state == "queued", qts.state
AssertionError: forgotten

I added the assertion just to make it easier to read what the problem was. I could remove it though.

This test builds on many assumptions and I doubt it would survive a refactor.

Agreed. This is a very specific test for #7504, which is only possible in rare cases. I'm not sure how to test for this specific case without writing something like this, though.

If we want a more general test for #7510, that could be as simple as

ts1 = TaskState("ts", None, "released")
ts2 = TaskState("ts", None, "released")
assert ts1 != ts2
assert hash(ts1) != hash(ts2)

Or something where we construct a HeapSet and manually add and remove tasks from it, to more directly simulate the underlying problem in #7504?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed. This is a very specific test for #7504, which is only possible in rare cases. I'm not sure how to test for this specific case without writing something like this, though.

We've had many of these rare situations and found a way to provoke them when dealing with worker state issues. We managed to write code that reproduced the issues by just using high level API. We added some additional low level tests as well
Why is this any different?

Isn't this something like:

fut = client.submit(something_thatll_be_queued_and_raises, key="foo")
assert s.tasks['foo'] in s.queued  # Test assumption
fut = client.submit(something_thatll_be_queued_that_works, key="foo")
await fut

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this something like:

No, that's a different case. foo is not forgotten while on the queue. The second client.submit will just re-use the existing foo TaskState object.

Why is this any different?

It's not any different. Causing #7504 requires these specific things to occur:

  • A task is forgotten while on the queue, but not all queued tasks are forgotten
  • A new task is submitted with the same key and equal or greater priority, and that task is queued
  • The old TaskState object is not GC'd throughout this process
  • The task is transitioned to processing

The test I've written here only uses the high-level API to cause this sequence of events . Use of wait_for_state or holding a reference to a TaskState is added to ensure the test is deterministic. I could write something shorter that doesn't look at internals at all, but it would probably be flaky.

As I said, we can add more generic tests for #7510. Is that what you're looking for?

I'd also be perfectly happy with removing this test for #7504 if we think it's too specialized / hard to maintain.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added an explicit test for #7510, and made the test here shorter.

Mostly I just removed some comments and switched to using delayed, but it's still the same test. If you have an idea for a more concise way to cause that sequence of events, I'd welcome it—I haven't been able to come up with anything.

I also don't think it's worth that much of our time—if you're worried about the maintainability of the test, then let's just remove test_queued_resubmitted_key_collision and just keep test_TaskState_hash_eq_by_identity.

@github-actions
Copy link
Copy Markdown
Contributor

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       24 files  ±  0         24 suites  ±0   10h 43m 37s ⏱️ + 18m 43s
  3 339 tests +  4    3 231 ✔️ ±  0     103 💤 ±0  5 +4 
39 366 runs  +48  37 491 ✔️ +43  1 869 💤 ±0  6 +5 

For more details on these failures, see this check.

Results for commit 7d466ef. ± Comparison against base commit be3a62e.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

2 participants