|
22 | 22 | import dask |
23 | 23 | import dask.bag as db |
24 | 24 | from dask import compute, delayed, persist |
25 | | -from dask.base import get_scheduler |
| 25 | +from dask.base import compute_as_if_collection, get_scheduler |
26 | 26 | from dask.blockwise import Blockwise |
27 | 27 | from dask.delayed import Delayed |
28 | 28 | from dask.distributed import futures_of, wait |
@@ -148,6 +148,62 @@ def test_fused_blockwise_dataframe_merge(c, fuse): |
148 | 148 | ) |
149 | 149 |
|
150 | 150 |
|
| 151 | +@pytest.mark.parametrize( |
| 152 | + "computation", |
| 153 | + [ |
| 154 | + None, |
| 155 | + "compute_as_if_collection", |
| 156 | + "dask.compute", |
| 157 | + ], |
| 158 | +) |
| 159 | +@pytest.mark.parametrize( |
| 160 | + "scheduler, use_distributed", |
| 161 | + [ |
| 162 | + (None, True), |
| 163 | + # If scheduler is explicitly provided, this takes precedence |
| 164 | + ("sync", False), |
| 165 | + ], |
| 166 | +) |
| 167 | +def test_default_scheduler_on_worker(c, computation, use_distributed, scheduler): |
| 168 | + """Should a collection use its default scheduler or the distributed |
| 169 | + scheduler when being computed within a task? |
| 170 | + """ |
| 171 | + |
| 172 | + pd = pytest.importorskip("pandas") |
| 173 | + dd = pytest.importorskip("dask.dataframe") |
| 174 | + |
| 175 | + def foo(): |
| 176 | + size = 10 |
| 177 | + df = pd.DataFrame({"x": range(size), "y": range(size)}) |
| 178 | + ddf = dd.from_pandas(df, npartitions=2) |
| 179 | + if computation is None: |
| 180 | + ddf.compute(scheduler=scheduler) |
| 181 | + elif computation == "dask.compute": |
| 182 | + dask.compute(ddf, scheduler=scheduler) |
| 183 | + elif computation == "compute_as_if_collection": |
| 184 | + compute_as_if_collection( |
| 185 | + ddf.__class__, ddf.dask, list(ddf.dask), scheduler=scheduler |
| 186 | + ) |
| 187 | + else: |
| 188 | + assert False |
| 189 | + |
| 190 | + return True |
| 191 | + |
| 192 | + res = c.submit(foo) |
| 193 | + assert res.result() is True |
| 194 | + # Count how many submits/update-graph were received by the scheduler |
| 195 | + assert ( |
| 196 | + c.run_on_scheduler( |
| 197 | + lambda dask_scheduler: sum( |
| 198 | + len(comp.code) for comp in dask_scheduler.computations |
| 199 | + ) |
| 200 | + ) |
| 201 | + == 2 |
| 202 | + if use_distributed |
| 203 | + else 1 |
| 204 | + ) |
| 205 | + |
| 206 | + |
151 | 207 | def test_futures_to_delayed_bag(c): |
152 | 208 | L = [1, 2, 3] |
153 | 209 |
|
|
0 commit comments