Skip to content

Type annotations for shuffle#7185

Merged
fjetter merged 8 commits intodask:mainfrom
fjetter:shuffle_extension_refactor
Oct 26, 2022
Merged

Type annotations for shuffle#7185
fjetter merged 8 commits intodask:mainfrom
fjetter:shuffle_extension_refactor

Conversation

@fjetter
Copy link
Copy Markdown
Member

@fjetter fjetter commented Oct 25, 2022

This is mostly a type annotation refactor of p2p shuffle. I made dealing of optional arguments more explicit which required me to fix a minor bug, see inline comments

Copy link
Copy Markdown
Member Author

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apart from the inline comments this does not include any functional changes. If there are any review comments, I'll address them in a follow up PR unless it is something major since I intend to continue with this PR right away.

Comment on lines +347 to +366
@overload
async def _get_shuffle(
self,
shuffle_id: ShuffleId,
) -> Shuffle:
...

@overload
async def _get_shuffle(
self,
shuffle_id: ShuffleId,
empty: pd.DataFrame,
column: str,
npartitions: int,
) -> Shuffle:
...
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This kind of functions must either be called with all or nothing, hence the overload

self.worker = worker
self.output_workers = output_workers
self.executor = executor
self.executor = ThreadPoolExecutor(worker.state.nthreads)
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This slipped in. I wanted to clean up some abstraction leackage between Shuffle and WorkerExtension. Will follow up with a couple of other changes. This is small enough that I didn't cherry pick it

@fjetter
Copy link
Copy Markdown
Member Author

fjetter commented Oct 25, 2022

Type annotations are not perfect, yet, but this is more or less the minimal thing to enable the stricted configs. We can refine later on

@fjetter fjetter force-pushed the shuffle_extension_refactor branch from 79b06db to 97d2293 Compare October 25, 2022 12:45
Copy link
Copy Markdown
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type annotations are not perfect, yet, but this is more or less the minimal thing to enable the stricted configs. We can refine later on

+1

cc @crusaderky -- maybe you'd find this PR interesting?

Copy link
Copy Markdown
Collaborator

@crusaderky crusaderky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks mostly good to me. Some nitpicks below

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Oct 25, 2022

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       15 files  ±0         15 suites  ±0   6h 1m 8s ⏱️ - 33m 19s
  3 156 tests ±0    3 063 ✔️  - 1    87 💤 +1    6 ±0 
23 349 runs  ±0  22 409 ✔️  - 3  919 💤 +3  21 ±0 

For more details on these failures, see this check.

Results for commit 6ae1bb4. ± Comparison against base commit 0983731.

♻️ This comment has been updated with latest results.

Copy link
Copy Markdown
Contributor

@wence- wence- left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor



def dump_batch(batch, file: BinaryIO, schema=None) -> None: # type: ignore[no-untyped-def]
def dump_batch(batch: Any, file: BinaryIO, schema: pa.Schema) -> None:
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the "tight" type annotation here is

if TYPE_CHECKING:
    from _typeshed import ReadableBuffer
...

def dump_batch(batch: ReadableBuffer, ...)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That does not work for me. mypy says, it wants just bytes. Good enough for me either way

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah :(.


with self.lock:
address = max(self.sizes, key=self.sizes.get)
address = max(self.sizes, key=self.sizes.get) # type: ignore
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
address = max(self.sizes, key=self.sizes.get) # type: ignore
address = max(self.sizes, key=self.sizes.__getitem__)

Note there's an extra bug here in that if self.sizes is empty, this raises ValueError.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. A bit confuse but this code is only reached if it is non-empty. sizes and shards are coupled and this loop checks this in the beginning and continues until this is filled.

@fjetter
Copy link
Copy Markdown
Member Author

fjetter commented Oct 26, 2022

There is a shuffle related test failure in test_bad_disk. I can reproduce locally but only with a rate of about ~0.1%.
I can also reproduce on main so I'll ignore this for now and will fix this in a later iteration

@fjetter fjetter merged commit a11dd02 into dask:main Oct 26, 2022
@fjetter fjetter deleted the shuffle_extension_refactor branch October 26, 2022 13:38
@fjetter fjetter mentioned this pull request Oct 26, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants