Conversation
fjetter
left a comment
There was a problem hiding this comment.
Apart from the inline comments this does not include any functional changes. If there are any review comments, I'll address them in a follow up PR unless it is something major since I intend to continue with this PR right away.
| @overload | ||
| async def _get_shuffle( | ||
| self, | ||
| shuffle_id: ShuffleId, | ||
| ) -> Shuffle: | ||
| ... | ||
|
|
||
| @overload | ||
| async def _get_shuffle( | ||
| self, | ||
| shuffle_id: ShuffleId, | ||
| empty: pd.DataFrame, | ||
| column: str, | ||
| npartitions: int, | ||
| ) -> Shuffle: | ||
| ... |
There was a problem hiding this comment.
This kind of functions must either be called with all or nothing, hence the overload
| self.worker = worker | ||
| self.output_workers = output_workers | ||
| self.executor = executor | ||
| self.executor = ThreadPoolExecutor(worker.state.nthreads) |
There was a problem hiding this comment.
This slipped in. I wanted to clean up some abstraction leackage between Shuffle and WorkerExtension. Will follow up with a couple of other changes. This is small enough that I didn't cherry pick it
|
Type annotations are not perfect, yet, but this is more or less the minimal thing to enable the stricted configs. We can refine later on |
995f74a to
79b06db
Compare
79b06db to
97d2293
Compare
jrbourbeau
left a comment
There was a problem hiding this comment.
Type annotations are not perfect, yet, but this is more or less the minimal thing to enable the stricted configs. We can refine later on
+1
cc @crusaderky -- maybe you'd find this PR interesting?
crusaderky
left a comment
There was a problem hiding this comment.
Looks mostly good to me. Some nitpicks below
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 15 files ±0 15 suites ±0 6h 1m 8s ⏱️ - 33m 19s For more details on these failures, see this check. Results for commit 6ae1bb4. ± Comparison against base commit 0983731. ♻️ This comment has been updated with latest results. |
distributed/shuffle/arrow.py
Outdated
|
|
||
|
|
||
| def dump_batch(batch, file: BinaryIO, schema=None) -> None: # type: ignore[no-untyped-def] | ||
| def dump_batch(batch: Any, file: BinaryIO, schema: pa.Schema) -> None: |
There was a problem hiding this comment.
I think the "tight" type annotation here is
if TYPE_CHECKING:
from _typeshed import ReadableBuffer
...
def dump_batch(batch: ReadableBuffer, ...)
There was a problem hiding this comment.
That does not work for me. mypy says, it wants just bytes. Good enough for me either way
distributed/shuffle/multi_comm.py
Outdated
|
|
||
| with self.lock: | ||
| address = max(self.sizes, key=self.sizes.get) | ||
| address = max(self.sizes, key=self.sizes.get) # type: ignore |
There was a problem hiding this comment.
| address = max(self.sizes, key=self.sizes.get) # type: ignore | |
| address = max(self.sizes, key=self.sizes.__getitem__) |
Note there's an extra bug here in that if self.sizes is empty, this raises ValueError.
There was a problem hiding this comment.
Thanks. A bit confuse but this code is only reached if it is non-empty. sizes and shards are coupled and this loop checks this in the beginning and continues until this is filled.
|
There is a shuffle related test failure in |
This is mostly a type annotation refactor of p2p shuffle. I made dealing of optional arguments more explicit which required me to fix a minor bug, see inline comments