Skip to content

Commit a58a2a7

Browse files
P2P rechunking (#9939)
1 parent d178233 commit a58a2a7

5 files changed

Lines changed: 52 additions & 16 deletions

File tree

dask/array/core.py

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2747,7 +2747,12 @@ def squeeze(self, axis=None):
27472747
return squeeze(self, axis)
27482748

27492749
def rechunk(
2750-
self, chunks="auto", threshold=None, block_size_limit=None, balance=False
2750+
self,
2751+
chunks="auto",
2752+
threshold=None,
2753+
block_size_limit=None,
2754+
balance=False,
2755+
algorithm=None,
27512756
):
27522757
"""Convert blocks in dask array x for new chunks.
27532758
@@ -2759,7 +2764,7 @@ def rechunk(
27592764
"""
27602765
from dask.array.rechunk import rechunk # avoid circular import
27612766

2762-
return rechunk(self, chunks, threshold, block_size_limit, balance)
2767+
return rechunk(self, chunks, threshold, block_size_limit, balance, algorithm)
27632768

27642769
@property
27652770
def real(self):

dask/array/rechunk.py

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,14 @@ def intersect_chunks(old_chunks, new_chunks):
225225
return cross
226226

227227

228-
def rechunk(x, chunks="auto", threshold=None, block_size_limit=None, balance=False):
228+
def rechunk(
229+
x,
230+
chunks="auto",
231+
threshold=None,
232+
block_size_limit=None,
233+
balance=False,
234+
algorithm=None,
235+
):
229236
"""
230237
Convert blocks in dask array x for new chunks.
231238
@@ -249,6 +256,9 @@ def rechunk(x, chunks="auto", threshold=None, block_size_limit=None, balance=Fal
249256
This means ``balance=True`` will remove any small leftover chunks, so
250257
using ``x.rechunk(chunks=len(x) // N, balance=True)``
251258
will almost certainly result in ``N`` chunks.
259+
algorithm: {'tasks', 'p2p'}, optional.
260+
Algorithm to use.
261+
252262
253263
Examples
254264
--------
@@ -314,13 +324,24 @@ def rechunk(x, chunks="auto", threshold=None, block_size_limit=None, balance=Fal
314324
if new != old and not math.isnan(old) and not math.isnan(new):
315325
raise ValueError("Provided chunks are not consistent with shape")
316326

317-
steps = plan_rechunk(
318-
x.chunks, chunks, x.dtype.itemsize, threshold, block_size_limit
319-
)
320-
for c in steps:
321-
x = _compute_rechunk(x, c)
327+
algorithm = algorithm or config.get("array.rechunk.algorithm")
322328

323-
return x
329+
if algorithm == "tasks":
330+
steps = plan_rechunk(
331+
x.chunks, chunks, x.dtype.itemsize, threshold, block_size_limit
332+
)
333+
for c in steps:
334+
x = _compute_rechunk(x, c)
335+
336+
return x
337+
338+
elif algorithm == "p2p":
339+
from distributed.shuffle import rechunk_p2p
340+
341+
return rechunk_p2p(x, chunks)
342+
343+
else:
344+
raise NotImplementedError(f"Unknown rechunking algorithm '{algorithm}'")
324345

325346

326347
def _number_of_blocks(chunks):
@@ -542,15 +563,13 @@ def plan_rechunk(
542563
if isinstance(block_size_limit, str):
543564
block_size_limit = parse_bytes(block_size_limit)
544565

545-
ndim = len(new_chunks)
546-
steps = []
547-
has_nans = [any(math.isnan(y) for y in x) for x in old_chunks]
566+
has_nans = (any(math.isnan(y) for y in x) for x in old_chunks)
548567

549-
if ndim <= 1 or not all(new_chunks) or any(has_nans):
568+
if len(new_chunks) <= 1 or not all(new_chunks) or any(has_nans):
550569
# Trivial array / unknown dim => no need / ability for an intermediate
551-
return steps + [new_chunks]
570+
return [new_chunks]
552571

553-
# Make it a number ef elements
572+
# Make it a number of elements
554573
block_size_limit /= itemsize
555574

556575
# Fix block_size_limit if too small for either old_chunks or new_chunks
@@ -565,6 +584,7 @@ def plan_rechunk(
565584

566585
current_chunks = old_chunks
567586
first_pass = True
587+
steps = []
568588

569589
while True:
570590
graph_size = estimate_graph_size(current_chunks, new_chunks)

dask/dask-schema.yaml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -97,6 +97,15 @@ properties:
9797
Backend to use for supported array-creation functions.
9898
Default is "numpy".
9999
100+
rechunk:
101+
type: object
102+
properties:
103+
algorithm:
104+
type: string
105+
description: |
106+
The algorithm to use for rechunking. Must be either "tasks" or "p2p";
107+
default is "tasks". Using "p2p" requires a distributed cluster.
108+
100109
svg:
101110
type: object
102111
properties:

dask/dask.yaml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ dataframe:
1717

1818
array:
1919
backend: "numpy" # Backend array library for input IO and data creation
20+
rechunk:
21+
algorithm: "tasks" # Rechunking algorithm to use
2022
svg:
2123
size: 120 # pixels
2224
slicing:

dask/dataframe/groupby.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ def _determine_shuffle(shuffle, split_out):
115115
# For more context, see
116116
# https://github.com/dask/dask/pull/9826/files#r1072395307
117117
# https://github.com/dask/distributed/issues/5502
118-
return shuffle or config.get("shuffle", None) or "tasks"
118+
return config.get("shuffle", None) or "tasks"
119119
else:
120120
return False
121121
return shuffle

0 commit comments

Comments
 (0)