Skip to content

[Prototype] Task Generator for Shuffle#6173

Closed
madsbk wants to merge 3 commits intodask:masterfrom
madsbk:task_generator
Closed

[Prototype] Task Generator for Shuffle#6173
madsbk wants to merge 3 commits intodask:masterfrom
madsbk:task_generator

Conversation

@madsbk
Copy link
Contributor

@madsbk madsbk commented May 4, 2020

Warning, this is a prototype and very hacky!

This PR, together with dask/distributed#3765, address the scheduler overhead #6163 by replacing rearrange_by_column_tasks() with a task generator.

Instead of creating n**2 tasks or n log n tasks when staging on the client upfront, this PR makes it possible to delay tasks creations until later.

To enable the task generator set USE_TASK_GENERATOR=1

Preliminary Results

Shuffle with 536554 tasks (no staging) now takes ~6 seconds instead of ~60 seconds on my machine.

from distributed import Client, LocalCluster
from dask.datasets import timeseries
from dask.dataframe.shuffle import shuffle
from dask.distributed import wait
import time

if __name__ == "__main__":
    cluster = LocalCluster(nthreads=1)
    client = Client(cluster)

    ddf_d = timeseries(start='2000-01-01', end='2002-01-01', partition_freq='1d')
    ddf_d_2 = shuffle(ddf_d, "id", shuffle="tasks", max_branch=10**8)  # disable staging

    t1 = time.time()
    ddf_d_2 = ddf_d_2.persist(optimize_graph=False)
    t2 = time.time()
    print("persist returned: ", t2-t1)
    wait(ddf_d_2)
    t3 = time.time()
    print("wait returned: ", t3-t2)
    client.close()

@TomAugspurger
Copy link
Member

Would you be able to give a high-level summary of what this is doing? An element of your task graph is this All2All class, which when "materialized" represents the actual task graph to do the shuffle? Are you familiar with Blockwise (

dask/dask/blockwise.py

Lines 125 to 133 in 666b53a

class Blockwise(Mapping):
""" Tensor Operation
This is a lazily constructed mapping for tensor operation graphs.
This defines a dictionary using an operation and an indexing pattern.
It is built for many operations like elementwise, transpose, tensordot, and
so on. We choose to keep these as symbolic mappings rather than raw
dictionaries because we are able to fuse them during optimization,
sometimes resulting in much lower overhead.
) and SubgraphCallable? On the surface, these sound somewhat similar, but I might misunderstand.

However I think one notable difference is that Blockwise graphs are materialized as plain dicts before being sent to the scheduler. Your changes in dask/distributed#3765 would delay the materialization until the graph reaches the scheduler?

@madsbk madsbk force-pushed the task_generator branch from 3021287 to 6f12e85 Compare May 4, 2020 17:28
@madsbk
Copy link
Contributor Author

madsbk commented May 5, 2020

Thanks @TomAugspurger, I did look at Blockwise and SubgraphCallable but I will study them in more detail; you are right they look similar. I think we can generalize all of them into to a delayed task generation API.
I will collect my thoughts and write an issue.

@madsbk madsbk force-pushed the task_generator branch from 91bace1 to 93804b3 Compare May 5, 2020 08:47
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants