@@ -225,7 +225,14 @@ def intersect_chunks(old_chunks, new_chunks):
225225 return cross
226226
227227
228- def rechunk (x , chunks = "auto" , threshold = None , block_size_limit = None , balance = False ):
228+ def rechunk (
229+ x ,
230+ chunks = "auto" ,
231+ threshold = None ,
232+ block_size_limit = None ,
233+ balance = False ,
234+ algorithm = None ,
235+ ):
229236 """
230237 Convert blocks in dask array x for new chunks.
231238
@@ -249,6 +256,9 @@ def rechunk(x, chunks="auto", threshold=None, block_size_limit=None, balance=Fal
249256 This means ``balance=True`` will remove any small leftover chunks, so
250257 using ``x.rechunk(chunks=len(x) // N, balance=True)``
251258 will almost certainly result in ``N`` chunks.
259+ algorithm: {'tasks', 'p2p'}, optional.
260+ Algorithm to use.
261+
252262
253263 Examples
254264 --------
@@ -314,13 +324,24 @@ def rechunk(x, chunks="auto", threshold=None, block_size_limit=None, balance=Fal
314324 if new != old and not math .isnan (old ) and not math .isnan (new ):
315325 raise ValueError ("Provided chunks are not consistent with shape" )
316326
317- steps = plan_rechunk (
318- x .chunks , chunks , x .dtype .itemsize , threshold , block_size_limit
319- )
320- for c in steps :
321- x = _compute_rechunk (x , c )
327+ algorithm = algorithm or config .get ("array.rechunk.algorithm" )
322328
323- return x
329+ if algorithm == "tasks" :
330+ steps = plan_rechunk (
331+ x .chunks , chunks , x .dtype .itemsize , threshold , block_size_limit
332+ )
333+ for c in steps :
334+ x = _compute_rechunk (x , c )
335+
336+ return x
337+
338+ elif algorithm == "p2p" :
339+ from distributed .shuffle import rechunk_p2p
340+
341+ return rechunk_p2p (x , chunks )
342+
343+ else :
344+ raise NotImplementedError (f"Unknown rechunking algorithm '{ algorithm } '" )
324345
325346
326347def _number_of_blocks (chunks ):
@@ -542,15 +563,13 @@ def plan_rechunk(
542563 if isinstance (block_size_limit , str ):
543564 block_size_limit = parse_bytes (block_size_limit )
544565
545- ndim = len (new_chunks )
546- steps = []
547- has_nans = [any (math .isnan (y ) for y in x ) for x in old_chunks ]
566+ has_nans = (any (math .isnan (y ) for y in x ) for x in old_chunks )
548567
549- if ndim <= 1 or not all (new_chunks ) or any (has_nans ):
568+ if len ( new_chunks ) <= 1 or not all (new_chunks ) or any (has_nans ):
550569 # Trivial array / unknown dim => no need / ability for an intermediate
551- return steps + [new_chunks ]
570+ return [new_chunks ]
552571
553- # Make it a number ef elements
572+ # Make it a number of elements
554573 block_size_limit /= itemsize
555574
556575 # Fix block_size_limit if too small for either old_chunks or new_chunks
@@ -565,6 +584,7 @@ def plan_rechunk(
565584
566585 current_chunks = old_chunks
567586 first_pass = True
587+ steps = []
568588
569589 while True :
570590 graph_size = estimate_graph_size (current_chunks , new_chunks )
0 commit comments