Skip to content

P2P for array rechunking#7534

Merged
fjetter merged 77 commits intodask:mainfrom
hendrikmakait:rechunking
Feb 24, 2023
Merged

P2P for array rechunking#7534
fjetter merged 77 commits intodask:mainfrom
hendrikmakait:rechunking

Conversation

@hendrikmakait
Copy link
Copy Markdown
Member

@hendrikmakait hendrikmakait commented Feb 9, 2023

Blocked by and sibling to dask/dask#9939

Partially addresses #7507

  • Tests added / passed
  • Passes pre-commit run --all-files

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Feb 9, 2023

Unit Test Results

See test report for an extended history of previous test failures. This is useful for diagnosing flaky tests.

       24 files  ±    0         24 suites  ±0   11h 25m 10s ⏱️ + 39m 48s
  3 486 tests +  74    3 382 ✔️ +  75     103 💤 ±0  1  - 1 
40 962 runs  +805  39 121 ✔️ +806  1 840 💤 +1  1  - 2 

For more details on these failures, see this check.

Results for commit d554c62. ± Comparison against base commit f9306da.

♻️ This comment has been updated with latest results.

Copy link
Copy Markdown
Member

@fjetter fjetter left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are a couple of nits but overall looks good.

out: dict[str, list[tuple[ArrayRechunkShardID, bytes]]] = defaultdict(list)
for id, nslice in self._slicing[input_partition]:
out[self.worker_for[id.new_index]].append(
(id, pickle.dumps((id.sub_index, data[nslice])))
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: It's not entirely clear why the pickle.dumps includes the sub_index if the first tuple entry is already the full index. Is this intentional?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've extended the docstring above. Does that help?

Comment on lines +108 to +111
#: Index of the new chunk the shard belongs
new_index: NIndex
#: Sub-index of the shard within the new chunk
sub_index: NIndex
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate what "new" index and "sub_index" is? Can this be expressed with a simple example?

I assume new_index corresponds to the "partition"

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've tread clearing things up with additional docs and better (?) naming.

Comment on lines +372 to +373
with self.time("cpu"):
arr = convert_chunk(data, subdims)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
with self.time("cpu"):
arr = convert_chunk(data, subdims)
arr = await self.offload(convert_chunk, data, subdims)

pretty sure this should be offloaded. you even instrument it with CPU time so this is likely nontrivial

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, I just noticed we don't do that on the P2P shuffle either (which is probably why I haven't done so here).

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll improve offloading for both rechunking and shuffling in a follow-up PR.

Comment on lines +304 to +307
def get_worker_for_hash_sharding(output_partition: NIndex, workers: list[str]) -> str:
"""Get address of target worker for this output partition using hash sharding"""
i = hash(output_partition) % len(workers)
return workers[i]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are output_partitions always symmetrical or can they vary significantly?

I assume this hash-mod mapping is a sufficiently uniform distribution but assuming the output chunks are asymmetric we can likely do better with a different mapping that creates a better output distribution.

Out of scope for this PR, just a question

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Theoretically, output partitions can vary significantly, e.g., you could have a chunked array of ((1, 4), (64, 1024)), which would correspond to partitions of sizes [(1, 64), (1, 1024), (4, 64), (4, 1024)].

In practice, I'd assume that they are mostly homogeneous. Then again, I'm not an arrays person.

Co-authored-by: Florian Jetter <fjetter@users.noreply.github.com>
@fjetter
Copy link
Copy Markdown
Member

fjetter commented Feb 24, 2023

@hendrikmakait I pushed another commit to fix the linter issues that were introduced by my earlier suggestion

Edit: Sorry, wasn't enough. I'll let you handle it :)

@hendrikmakait hendrikmakait marked this pull request as draft February 24, 2023 14:26
@hendrikmakait
Copy link
Copy Markdown
Member Author

Moved to draft to avoid accidental merge before dask/dask#9939 has been merged and the requirements in this PR have been cleaned up.

@fjetter fjetter mentioned this pull request Feb 24, 2023
4 tasks
@hendrikmakait
Copy link
Copy Markdown
Member Author

CI goes red because I pushed dask/dask#9939 too late and CI still expects the old keyword. Waiting for dask/dask#9939 to push the commit that points to dask@main again and turns CI green. I'll then move this PR to Ready for Review. Tests work locally.

@hendrikmakait hendrikmakait marked this pull request as ready for review February 24, 2023 18:06
@hendrikmakait
Copy link
Copy Markdown
Member Author

hendrikmakait commented Feb 24, 2023

CI seems to keep running on an outdated commit on dask/dask. https://github.com/dask/distributed/actions/runs/4265042270/jobs/7424015784#step:11:66

The commit is from yesterday (dask/dask@970da68), does CI cache something here? If so, how to invalidate those caches?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants