Skip to content

Full traversal in dask.order#3066

Merged
mrocklin merged 9 commits intodask:masterfrom
mrocklin:order-up
Feb 1, 2018
Merged

Full traversal in dask.order#3066
mrocklin merged 9 commits intodask:masterfrom
mrocklin:order-up

Conversation

@mrocklin
Copy link
Member

@mrocklin mrocklin commented Jan 14, 2018

  • Tests added / passed
  • Passes flake8 dask
  • Fully documented, including docs/source/changelog.rst for all changes
    and one of the docs/source/*-api.rst files for new API

@mrocklin
Copy link
Member Author

So this test seems to conflict with the following situation:

    def test_avoid_upwards_branching(abcde):
        """
             a1
             |
             a2
             |
             a3    d1
            /  \  /
          b1    c1
          |     |
          b2    c2
                |
                c3
    
        Prefer b1 over c1 because it won't stick around waiting for d1 to complete
        """
        a, b, c, d, e = abcde
        dsk = {(a, 1): (f, (a, 2)),
               (a, 2): (f, (a, 3)),
               (a, 3): (f, (b, 1), (c, 1)),
               (b, 1): (f, (b, 2)),
               (c, 1): (f, (c, 2)),
               (c, 2): (f, (c, 3)),
               (d, 1): (f, (c, 1))}
    
        o = order(dsk)    

In this test we want to select b, but doing so requires that we start descending from a. The test in this PR asks that we start from short top-nodes because they can be finished more quickly, allowing their children to be released once a long-running co-parent chain starts running. The test in this PR asks us to start descending from short top-nodes. The test in this comment asks that we start from long top-nodes.

This brings up the question of not doing top-down depth first, but something that wanders around a bit more or swaps values during traversal.

@mrocklin mrocklin changed the title [WIP] Add failing test for short dependent chains Full traversal in dask.order Jan 16, 2018
@mrocklin
Copy link
Member Author

OK, there is a solution here that solves a number of problems modestly well. It is sacrifices perfection in a few cases for "good enough" in more. The traversal is more complex now and may be more expensive. I'll have to do some benchmarking. Here are the visualizations for a couple of troublesome graphs.

dask
gh3055

import matplotlib.pyplot as plt

import dask.array as da
n = 10
x  = da.random.normal(size=(n, 100), chunks=(1, 100))
y  = da.random.normal(size=(n,), chunks=(1,))
xy = (x * y[:, None]).cumsum(axis=0)
xx = (x[:, None, :] * x[:, :, None]).cumsum(axis=0)
beta = da.stack([da.linalg.solve(xx[i], xy[i]) for i in range(xx.shape[0])],
                axis=0)
ey = (x * beta).sum(axis=1)
ey.visualize('dask.png', color='order', node_attr=dict(penwidth='8'),
             optimize_graph=True, cmap=plt.cm.RdBu)


A, B = 10, 99
x = da.random.normal(size=(A, B), chunks=(1, None))
for _ in range(2):
    y = (x[:, None, :] * x[:, :, None]).cumsum(axis=0)
    x = x.cumsum(axis=0)
w = (y * x[:, None]).sum(axis=(1,2))

w.visualize('gh3055.png', color='order', node_attr=dict(penwidth='8'),
            cmap=plt.cm.RdBu)

@mrocklin mrocklin closed this Jan 16, 2018
@mrocklin mrocklin reopened this Jan 16, 2018
@mrocklin
Copy link
Member Author

@eriknw and @jcrist might both find this PR of interest

@mrocklin
Copy link
Member Author

Anecdotal timing information

%%time
n = 100
x  = da.random.normal(size=(n, 100), chunks=(1, 100))
y  = da.random.normal(size=(n,), chunks=(1,))
xy = (x * y[:, None]).cumsum(axis=0)
xx = (x[:, None, :] * x[:, :, None]).cumsum(axis=0)
beta = da.stack([da.linalg.solve(xx[i], xy[i]) for i in range(xx.shape[0])],
                axis=0)
ey = (x * beta).sum(axis=1)
CPU times: user 239 ms, sys: 4.21 ms, total: 243 ms
Wall time: 233 ms
%time dsk = dict(ey.__dask_graph__())
CPU times: user 74.1 ms, sys: 0 ns, total: 74.1 ms
Wall time: 73.2 ms

Master: 25ms

%timeit _ = order(dsk)
10 loops, best of 3: 25.9 ms per loop

This branch: 39ms

%timeit _ = order(dsk)
10 loops, best of 3: 39.3 ms per loop

This is non-trivial, but possibly still worth it.

@mrocklin mrocklin force-pushed the order-up branch 2 times, most recently from 5fc13fa to fcc0af0 Compare January 31, 2018 12:39
mrocklin added a commit to mrocklin/distributed that referenced this pull request Feb 1, 2018
@mrocklin
Copy link
Member Author

mrocklin commented Feb 1, 2018

Merging this soon if there are no further comments. Alternatively @jcrist, you're probably the most likely to review this. Have you had a chance to look it over?

@jcrist
Copy link
Member

jcrist commented Feb 1, 2018

Looking now.

Copy link
Member

@jcrist jcrist left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall this looks good to me. I can't comment on whether the algorithm change is a benefit overall, but the new tests look good and the implementation seems clean.

ndependencies(dependencies, dependents)


@pytest.mark.xfail(reason="Can't please 'em all")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😆

o = order(dsk)
L = [o[k] for k in w.__dask_keys__()]
assert sorted(L) == L[::-1]
assert sorted(L) == L[::-1] or sorted(L) == L
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this non-determinism necessary?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed

@mrocklin
Copy link
Member Author

mrocklin commented Feb 1, 2018

@jcrist ok to merge on passed tests?

@jcrist
Copy link
Member

jcrist commented Feb 1, 2018

Fine by me.

@mrocklin mrocklin merged commit cceb6e2 into dask:master Feb 1, 2018
@mrocklin mrocklin deleted the order-up branch February 1, 2018 21:21
mrocklin added a commit to dask/distributed that referenced this pull request Feb 1, 2018
* Remove worker prioritization

We now trust the scheduler's priority entirely

* Keep previously xfailed test

This depends on dask/dask#3066 to pass

* use defaultdicts in GroupProgress
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants