Conversation
|
So this test seems to conflict with the following situation: def test_avoid_upwards_branching(abcde):
"""
a1
|
a2
|
a3 d1
/ \ /
b1 c1
| |
b2 c2
|
c3
Prefer b1 over c1 because it won't stick around waiting for d1 to complete
"""
a, b, c, d, e = abcde
dsk = {(a, 1): (f, (a, 2)),
(a, 2): (f, (a, 3)),
(a, 3): (f, (b, 1), (c, 1)),
(b, 1): (f, (b, 2)),
(c, 1): (f, (c, 2)),
(c, 2): (f, (c, 3)),
(d, 1): (f, (c, 1))}
o = order(dsk) In this test we want to select This brings up the question of not doing top-down depth first, but something that wanders around a bit more or swaps values during traversal. |
|
OK, there is a solution here that solves a number of problems modestly well. It is sacrifices perfection in a few cases for "good enough" in more. The traversal is more complex now and may be more expensive. I'll have to do some benchmarking. Here are the visualizations for a couple of troublesome graphs. import matplotlib.pyplot as plt
import dask.array as da
n = 10
x = da.random.normal(size=(n, 100), chunks=(1, 100))
y = da.random.normal(size=(n,), chunks=(1,))
xy = (x * y[:, None]).cumsum(axis=0)
xx = (x[:, None, :] * x[:, :, None]).cumsum(axis=0)
beta = da.stack([da.linalg.solve(xx[i], xy[i]) for i in range(xx.shape[0])],
axis=0)
ey = (x * beta).sum(axis=1)
ey.visualize('dask.png', color='order', node_attr=dict(penwidth='8'),
optimize_graph=True, cmap=plt.cm.RdBu)
A, B = 10, 99
x = da.random.normal(size=(A, B), chunks=(1, None))
for _ in range(2):
y = (x[:, None, :] * x[:, :, None]).cumsum(axis=0)
x = x.cumsum(axis=0)
w = (y * x[:, None]).sum(axis=(1,2))
w.visualize('gh3055.png', color='order', node_attr=dict(penwidth='8'),
cmap=plt.cm.RdBu) |
|
Anecdotal timing information %%time
n = 100
x = da.random.normal(size=(n, 100), chunks=(1, 100))
y = da.random.normal(size=(n,), chunks=(1,))
xy = (x * y[:, None]).cumsum(axis=0)
xx = (x[:, None, :] * x[:, :, None]).cumsum(axis=0)
beta = da.stack([da.linalg.solve(xx[i], xy[i]) for i in range(xx.shape[0])],
axis=0)
ey = (x * beta).sum(axis=1)
CPU times: user 239 ms, sys: 4.21 ms, total: 243 ms
Wall time: 233 ms%time dsk = dict(ey.__dask_graph__())
CPU times: user 74.1 ms, sys: 0 ns, total: 74.1 ms
Wall time: 73.2 msMaster: 25ms%timeit _ = order(dsk)
10 loops, best of 3: 25.9 ms per loopThis branch: 39ms%timeit _ = order(dsk)
10 loops, best of 3: 39.3 ms per loopThis is non-trivial, but possibly still worth it. |
5fc13fa to
fcc0af0
Compare
This depends on dask/dask#3066 to pass
|
Merging this soon if there are no further comments. Alternatively @jcrist, you're probably the most likely to review this. Have you had a chance to look it over? |
|
Looking now. |
jcrist
left a comment
There was a problem hiding this comment.
Overall this looks good to me. I can't comment on whether the algorithm change is a benefit overall, but the new tests look good and the implementation seems clean.
| ndependencies(dependencies, dependents) | ||
|
|
||
|
|
||
| @pytest.mark.xfail(reason="Can't please 'em all") |
dask/tests/test_order.py
Outdated
| o = order(dsk) | ||
| L = [o[k] for k in w.__dask_keys__()] | ||
| assert sorted(L) == L[::-1] | ||
| assert sorted(L) == L[::-1] or sorted(L) == L |
There was a problem hiding this comment.
Is this non-determinism necessary?
|
@jcrist ok to merge on passed tests? |
|
Fine by me. |
* Remove worker prioritization We now trust the scheduler's priority entirely * Keep previously xfailed test This depends on dask/dask#3066 to pass * use defaultdicts in GroupProgress


flake8 daskdocs/source/changelog.rstfor all changesand one of the
docs/source/*-api.rstfiles for new API