Skip to content

Commit 579b191

Browse files
authored
Reverse precedence between collection and distributed default (#9869)
1 parent eb5d9d0 commit 579b191

2 files changed

Lines changed: 62 additions & 5 deletions

File tree

dask/base.py

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1345,7 +1345,8 @@ def get_scheduler(get=None, scheduler=None, collections=None, cls=None):
13451345
13461346
1. Passing in scheduler= parameters
13471347
2. Passing these into global configuration
1348-
3. Using defaults of a dask collection
1348+
3. Using a dask.distributed default Client
1349+
4. Using defaults of a dask collection
13491350
13501351
This function centralizes the logic to determine the right scheduler to use
13511352
from those many options
@@ -1407,16 +1408,16 @@ def get_scheduler(get=None, scheduler=None, collections=None, cls=None):
14071408
if config.get("get", None):
14081409
raise ValueError(get_err_msg)
14091410

1410-
if cls is not None:
1411-
return cls.__dask_scheduler__
1412-
14131411
try:
14141412
from distributed import get_client
14151413

14161414
return get_client().get
14171415
except (ImportError, ValueError):
14181416
pass
14191417

1418+
if cls is not None:
1419+
return cls.__dask_scheduler__
1420+
14201421
if collections:
14211422
collections = [c for c in collections if c is not None]
14221423
if collections:

dask/tests/test_distributed.py

Lines changed: 57 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import dask
2323
import dask.bag as db
2424
from dask import compute, delayed, persist
25-
from dask.base import get_scheduler
25+
from dask.base import compute_as_if_collection, get_scheduler
2626
from dask.blockwise import Blockwise
2727
from dask.delayed import Delayed
2828
from dask.distributed import futures_of, wait
@@ -148,6 +148,62 @@ def test_fused_blockwise_dataframe_merge(c, fuse):
148148
)
149149

150150

151+
@pytest.mark.parametrize(
152+
"computation",
153+
[
154+
None,
155+
"compute_as_if_collection",
156+
"dask.compute",
157+
],
158+
)
159+
@pytest.mark.parametrize(
160+
"scheduler, use_distributed",
161+
[
162+
(None, True),
163+
# If scheduler is explicitly provided, this takes precedence
164+
("sync", False),
165+
],
166+
)
167+
def test_default_scheduler_on_worker(c, computation, use_distributed, scheduler):
168+
"""Should a collection use its default scheduler or the distributed
169+
scheduler when being computed within a task?
170+
"""
171+
172+
pd = pytest.importorskip("pandas")
173+
dd = pytest.importorskip("dask.dataframe")
174+
175+
def foo():
176+
size = 10
177+
df = pd.DataFrame({"x": range(size), "y": range(size)})
178+
ddf = dd.from_pandas(df, npartitions=2)
179+
if computation is None:
180+
ddf.compute(scheduler=scheduler)
181+
elif computation == "dask.compute":
182+
dask.compute(ddf, scheduler=scheduler)
183+
elif computation == "compute_as_if_collection":
184+
compute_as_if_collection(
185+
ddf.__class__, ddf.dask, list(ddf.dask), scheduler=scheduler
186+
)
187+
else:
188+
assert False
189+
190+
return True
191+
192+
res = c.submit(foo)
193+
assert res.result() is True
194+
# Count how many submits/update-graph were received by the scheduler
195+
assert (
196+
c.run_on_scheduler(
197+
lambda dask_scheduler: sum(
198+
len(comp.code) for comp in dask_scheduler.computations
199+
)
200+
)
201+
== 2
202+
if use_distributed
203+
else 1
204+
)
205+
206+
151207
def test_futures_to_delayed_bag(c):
152208
L = [1, 2, 3]
153209

0 commit comments

Comments
 (0)