Conversation
Unit Test ResultsSee test report for an extended history of previous test failures. This is useful for diagnosing flaky tests. 24 files ± 0 24 suites ±0 11h 25m 10s ⏱️ + 39m 48s For more details on these failures, see this check. Results for commit d554c62. ± Comparison against base commit f9306da. ♻️ This comment has been updated with latest results. |
fjetter
left a comment
There was a problem hiding this comment.
There are a couple of nits but overall looks good.
| out: dict[str, list[tuple[ArrayRechunkShardID, bytes]]] = defaultdict(list) | ||
| for id, nslice in self._slicing[input_partition]: | ||
| out[self.worker_for[id.new_index]].append( | ||
| (id, pickle.dumps((id.sub_index, data[nslice]))) |
There was a problem hiding this comment.
nit: It's not entirely clear why the pickle.dumps includes the sub_index if the first tuple entry is already the full index. Is this intentional?
There was a problem hiding this comment.
I've extended the docstring above. Does that help?
distributed/shuffle/_rechunk.py
Outdated
| #: Index of the new chunk the shard belongs | ||
| new_index: NIndex | ||
| #: Sub-index of the shard within the new chunk | ||
| sub_index: NIndex |
There was a problem hiding this comment.
Can you elaborate what "new" index and "sub_index" is? Can this be expressed with a simple example?
I assume new_index corresponds to the "partition"
There was a problem hiding this comment.
I've tread clearing things up with additional docs and better (?) naming.
| with self.time("cpu"): | ||
| arr = convert_chunk(data, subdims) |
There was a problem hiding this comment.
| with self.time("cpu"): | |
| arr = convert_chunk(data, subdims) | |
| arr = await self.offload(convert_chunk, data, subdims) |
pretty sure this should be offloaded. you even instrument it with CPU time so this is likely nontrivial
There was a problem hiding this comment.
Good catch, I just noticed we don't do that on the P2P shuffle either (which is probably why I haven't done so here).
There was a problem hiding this comment.
I'll improve offloading for both rechunking and shuffling in a follow-up PR.
| def get_worker_for_hash_sharding(output_partition: NIndex, workers: list[str]) -> str: | ||
| """Get address of target worker for this output partition using hash sharding""" | ||
| i = hash(output_partition) % len(workers) | ||
| return workers[i] |
There was a problem hiding this comment.
Are output_partitions always symmetrical or can they vary significantly?
I assume this hash-mod mapping is a sufficiently uniform distribution but assuming the output chunks are asymmetric we can likely do better with a different mapping that creates a better output distribution.
Out of scope for this PR, just a question
There was a problem hiding this comment.
Theoretically, output partitions can vary significantly, e.g., you could have a chunked array of ((1, 4), (64, 1024)), which would correspond to partitions of sizes [(1, 64), (1, 1024), (4, 64), (4, 1024)].
In practice, I'd assume that they are mostly homogeneous. Then again, I'm not an arrays person.
Co-authored-by: Florian Jetter <fjetter@users.noreply.github.com>
|
@hendrikmakait I pushed another commit to fix the linter issues that were introduced by my earlier suggestion Edit: Sorry, wasn't enough. I'll let you handle it :) |
|
Moved to draft to avoid accidental merge before dask/dask#9939 has been merged and the requirements in this PR have been cleaned up. |
|
CI goes red because I pushed dask/dask#9939 too late and CI still expects the old keyword. Waiting for dask/dask#9939 to push the commit that points to |
|
CI seems to keep running on an outdated commit on The commit is from yesterday (dask/dask@970da68), does CI cache something here? If so, how to invalidate those caches? |
Blocked by and sibling to dask/dask#9939
Partially addresses #7507
pre-commit run --all-files