Skip to content

Implement pass HighLevelGraphs through _graph_to_futures#4139

Merged
jrbourbeau merged 8 commits intodask:masterfrom
madsbk:hlg_through_graph_to_futures
Oct 20, 2020
Merged

Implement pass HighLevelGraphs through _graph_to_futures#4139
jrbourbeau merged 8 commits intodask:masterfrom
madsbk:hlg_through_graph_to_futures

Conversation

@madsbk
Copy link
Contributor

@madsbk madsbk commented Oct 1, 2020

This PR implements #4119 by using the map methods introduced in dask/dask#6689

[Question]

What is the policy when a Distributed PR depend on a new Dask feature? Should the PR implement a fall back implementation if the version of Dask is older than Distributed? Or do we assume that Dask and Distributed are the same version?

Notice

CI will fail until dask/dask#6689 has been merged.

@mrocklin
Copy link
Member

mrocklin commented Oct 2, 2020

What is the policy when a Distributed PR depend on a new Dask feature? Should the PR implement a fall back implementation if the version of Dask is older than Distributed? Or do we assume that Dask and Distributed are the same version?

If we can build in some flexibility that's nice, but there have also been releases where we have strict pinnings. It's ok for us to do that if necessary.

@mrocklin
Copy link
Member

mrocklin commented Oct 2, 2020

Thanks for writing this up. Two pieces of follow on work seem evident.

  1. We should think about computing dependencies on the scheduler
  2. We should think about computing order / priorities on the scheduler

@jrbourbeau
Copy link
Member

Restarting CI here since dask/dask#6689 has been merged

@jrbourbeau
Copy link
Member

Looking into CI failures now. Interestingly, some tests (like distributed/tests/test_client.py::test_get_sync) don't fail when run individually but do fail when running the full test suite. Other tests like distributed/dashboard/tests/test_scheduler_bokeh.py::test_compute_per_key consistently fail individually with

E           RuntimeError: Cycle detected between the following keys:
E             ->

@madsbk
Copy link
Contributor Author

madsbk commented Oct 19, 2020

dask/dask#6747 hopefully fixes this issue.

@jrbourbeau
Copy link
Member

Restarting CI here since dask/dask#6747 has been merged

@madsbk
Copy link
Contributor Author

madsbk commented Oct 20, 2020

I cannot reproduce the CI errors (test_broken_worker_during_computation) on my own machine and I don't think it has anything to do with this PR.
The PR doesn't seems to introduce any significant overhead either when running the following.

from distributed import Client, LocalCluster
from dask.datasets import timeseries
from dask.dataframe.shuffle import shuffle
from dask.distributed import wait
import dask.dataframe as dd
import dask
import time
import argparse
from dask.base import tokenize
from dask.dataframe.core import new_dd_object
from dask.highlevelgraph import HighLevelGraph
import pandas as pd


def create_empty_df(npartitions):
    d = dask.delayed(pd.DataFrame)
    meta = dd.from_pandas(pd.DataFrame({"a": []}), npartitions=npartitions)._meta
    ret = dd.from_delayed([d({"a": []}) for _ in range(npartitions)])
    return ret


if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("npartitions", type=int)
    parser.add_argument("backend", choices=["tasks", "dynamic-tasks"])
    parser.add_argument("--no-optimize", action="store_false")
    parser.add_argument("--max-branch", type=int, default=None)
    args = parser.parse_args()

    cluster = LocalCluster(n_workers=1, threads_per_worker=1)
    client = Client(cluster)

    df = create_empty_df(args.npartitions)
    t1 = time.time()
    df = shuffle(df, "a", shuffle=args.backend, max_branch=args.max_branch)
    df = df.persist(optimize_graph=args.no_optimize)
    t2 = time.time()
    print(f"persist: {t2-t1}")
    wait(df)
    t3 = time.time()
    print(f"persist: {t2-t1}, compute: {t3-t2}")

    client.close()

@jrbourbeau, any thoughts?

@jrbourbeau
Copy link
Member

Yeah, I think the test_broken_worker_during_computation is unrelated to the changes here as I've seen it pop up in other PRs. I've opened up #4173 for tracking the failure.

This PR also relies on recent changes in Dask (e.g. HighLevelGraph.map_tasks) so I'm going to bump the minimum supported Dask version. Ping @dask/maintenance if there are any objections to bumping our Dask version here (this was the PR I brought up on our call earlier today)

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @madsbk! This is in

@jrbourbeau jrbourbeau merged commit dd28f7a into dask:master Oct 20, 2020
@madsbk madsbk deleted the hlg_through_graph_to_futures branch October 21, 2020 07:50
pentschev added a commit to pentschev/distributed that referenced this pull request Oct 23, 2020
sonicxml added a commit to sonicxml/distributed that referenced this pull request Dec 3, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants