Implement pass HighLevelGraphs through _graph_to_futures#4139
Implement pass HighLevelGraphs through _graph_to_futures#4139jrbourbeau merged 8 commits intodask:masterfrom
Conversation
If we can build in some flexibility that's nice, but there have also been releases where we have strict pinnings. It's ok for us to do that if necessary. |
|
Thanks for writing this up. Two pieces of follow on work seem evident.
|
|
Restarting CI here since dask/dask#6689 has been merged |
|
Looking into CI failures now. Interestingly, some tests (like |
…_graph_to_futures
|
dask/dask#6747 hopefully fixes this issue. |
|
Restarting CI here since dask/dask#6747 has been merged |
…_graph_to_futures
…ributed into hlg_through_graph_to_futures
|
I cannot reproduce the CI errors ( from distributed import Client, LocalCluster
from dask.datasets import timeseries
from dask.dataframe.shuffle import shuffle
from dask.distributed import wait
import dask.dataframe as dd
import dask
import time
import argparse
from dask.base import tokenize
from dask.dataframe.core import new_dd_object
from dask.highlevelgraph import HighLevelGraph
import pandas as pd
def create_empty_df(npartitions):
d = dask.delayed(pd.DataFrame)
meta = dd.from_pandas(pd.DataFrame({"a": []}), npartitions=npartitions)._meta
ret = dd.from_delayed([d({"a": []}) for _ in range(npartitions)])
return ret
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("npartitions", type=int)
parser.add_argument("backend", choices=["tasks", "dynamic-tasks"])
parser.add_argument("--no-optimize", action="store_false")
parser.add_argument("--max-branch", type=int, default=None)
args = parser.parse_args()
cluster = LocalCluster(n_workers=1, threads_per_worker=1)
client = Client(cluster)
df = create_empty_df(args.npartitions)
t1 = time.time()
df = shuffle(df, "a", shuffle=args.backend, max_branch=args.max_branch)
df = df.persist(optimize_graph=args.no_optimize)
t2 = time.time()
print(f"persist: {t2-t1}")
wait(df)
t3 = time.time()
print(f"persist: {t2-t1}, compute: {t3-t2}")
client.close()@jrbourbeau, any thoughts? |
|
Yeah, I think the This PR also relies on recent changes in Dask (e.g. |
jrbourbeau
left a comment
There was a problem hiding this comment.
Thanks @madsbk! This is in
This PR implements #4119 by using the map methods introduced in dask/dask#6689
[Question]
What is the policy when a Distributed PR depend on a new Dask feature? Should the PR implement a fall back implementation if the version of Dask is older than Distributed? Or do we assume that Dask and Distributed are the same version?
Notice
CI will fail until dask/dask#6689 has been merged.