Prefer end-tasks with low numbers of dependencies when ordering#3588
Prefer end-tasks with low numbers of dependencies when ordering#3588mrocklin merged 17 commits intodask:masterfrom
Conversation
dask/tests/test_order.py
Outdated
| ('a', 3): 3} | ||
|
|
||
|
|
||
| def test_prefer_short_narrow(): |
There was a problem hiding this comment.
I recommend using the abcde fixture here to use that for key names. It changes order to ensure that we're not entirely dependent on the fact that, 'a' < 'b' for example.
|
Just leaving some notes to pick up later. The most minimal change, which breaks other tests, is @@ -139,9 +150,13 @@ def order(dsk, dependencies=None):
deps = [d for d in dependents[item]
if d not in result and not (d in seen and waiting[d])]
if len(deps) < 1000:
- deps = sorted(deps, key=dependents_key, reverse=True)
+ deps = sorted(deps, key=dependents_key, reverse=False)
with This fails
Investigating why later. |
|
The objective behind that choice is to allow c to be collected early on. Otherwise it sticks around while we run the longer chain. |
|
That makes sense. It seems where it gets thrown off is when we add a shared vertex. In these cases having a short chain doesn't matter as it will still be dependent on the longer chains to finish (suggesting the longest chain of a shared vertex should get highest priority). Is there a good way to include some logic regarding shared vertices and their dependents? |
|
I think a rule of "adding a common ancestor" shouldn't change the ordering is a good one. Still not sure how we would actually do that quickly though. |
Near-optimal task scheduling in linear time is hard :) |
cb279a6 to
a61d0cb
Compare
|
Picking this up again today (ignore the config / debugging code. it'll be removed). I think the cleanest example is the following: a, b, c, d, e, = 'abcde'
def f(*args): pass
good = {
(a, 0): 0,
(b, 0): 0,
(c, 0): 0,
(c, 1): (f, (c, 0), (a, 0), (b, 0)),
(a, 1): 1,
(b, 1): 1,
(c, 2): (f, (c, 1), (a, 1), (b, 1)),
}
bad = {
('ab'): 0,
(a, 0): (f, 'ab', 0, 0),
(b, 0): (f, 'ab', 0, 1),
(c, 0): 0,
(c, 1): (f, (c, 0), (a, 0), (b, 0)),
(a, 1): (f, 'ab', 1, 0),
(b, 1): (f, 'ab', 1, 1),
(c, 2): (f, (c, 1), (a, 1), (b, 1)),
}Diagrammatically, that's And visually, it's good: bad: We would like to choose cc @eriknw if this ordering issue interests you. |
|
I took a look at this problem. Stepping through order it seems like it does correctly identify that c1 is a higher priority than a1, b2, then it goes to a0 or b1, and from there to a-b. Everything here so far is correct. What happens next though is unfortunate, It looks up from ab and sees both a0/b0 and a1/b1 and it decides that it would prefer what it perceives to be the path that will finish the soonest. Its proxy for "soonest" is the number of total dependents, so it chooses a1/b1, which in this case is wrong. Number of total dependents is a poor (but cheap!) proxy for soonest-finishing-path. It's wrong because all paths are ending in the same place here, c2. In this case we would actually prefer to take the branch that has the most work between itself and the end point. I wonder if something like the following would be productive (I give this a 30% chance of being a good idea):
Anyway, just thinking out loud, it's something to try. |
|
Also, FWIW, for debugging I like putting the following just after the print(stack)
import pdb; pdb.set_trace()I find that this is a good way to see how things progress, while being able to either easily step through the algorithm iteration by iteration (with |
|
Thanks. My mind went to something like "number of dependents, not included the dependents of my sibling tasks", but it wasn't clear to be how "siblings" would be tracked in Your proposal sounds simpler. I can try it out.
I appreciate it. |
|
FYI, for fun I was playing around with changing |
|
Yeah, I'd prefer that we focus efforts on dask.order if possible, even if
it's a harder problem to improve.
…On Thu, Jun 21, 2018 at 4:49 PM, Tom Augspurger ***@***.***> wrote:
FYI, for fun I was playing around with changing dask_ml._partial.fit to
use the distributed scheduler and user-defined priorities. That seemed to
help, but it'd be beneficial to fix it here.
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
<#3588 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AASszJkDIZ_X4ktCENivzknKReRGUO0vks5t_AbpgaJpZM4Uiuor>
.
|
|
@TomAugspurger if you have a chance can I ask you to try your workloads with this branch? If things work out then I'll modify docs and tune things a bit. |
|
As a side, note, it seems like the previous |
|
Hrm, it does *generate* a lot of data though while its waiting for the
model fit to proceed. This is a separate problem though.
…On Tue, Jun 26, 2018 at 1:37 PM, Tom Augspurger ***@***.***> wrote:
As a side, note, it seems like the previous fits are no longer held in
memory. I don't think that change was in dask-ml.
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
<#3588 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AASszMAnXfptMv5u9hofj45ZeVe5WwY3ks5uAnFEgaJpZM4Uiuor>
.
|
|
I've now rolled this into the pass with |
|
@TomAugspurger your review of the module-level docstring would be particularly welcome |
dask/tests/test_order.py
Outdated
| """ | ||
| From https://github.com/dask/dask-ml/issues/206#issuecomment-395869929 | ||
|
|
||
| Two cases, one where chunks of an array or indepented, and one where the |
There was a problem hiding this comment.
I think I meant "are independent", not "or indepented"
dask/tests/test_order.py
Outdated
| From https://github.com/dask/dask-ml/issues/206#issuecomment-395869929 | ||
|
|
||
| Two cases, one where chunks of an array or indepented, and one where the | ||
| chunks of an array of a shared source. We handled the independent one |
There was a problem hiding this comment.
"of a" -> "have a"
dask/order.py
Outdated
| operate efficiently in linear time. We strongly recommend that readers look at | ||
| the docstrings of tests in dask/tests/test_order.py. These tests usually have | ||
| graph types laid out very carefully to show the kinds of situations that often | ||
| arise. |
There was a problem hiding this comment.
", and the order we would like to be determined`."
dask/order.py
Outdated
|
|
||
| 1. **Small goals**: prefer tasks whose final dependents have few dependencies. | ||
|
|
||
| By final dependent we mean a task that depends on this task that is the end |
There was a problem hiding this comment.
I don't quite follow this first sentence.
| while current: | ||
| key = current.pop() | ||
| result[key] = 1 + sum(result[parent] for parent in dependents[key]) | ||
| try: |
There was a problem hiding this comment.
Is the try / except to handle the empty case? If so, min takes a default= argument. So
min_result[key] = min(min_result[parent] for parent in dependents[key], default=total_dependencies[key])There was a problem hiding this comment.
While more verbose, try-except is often performance optimal when the try case is much more common.
In [1]: d = {'x': 1}
In [2]: %timeit min([], default=d['x'])
378 ns ± 16.1 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
In [3]: %timeit min([1, 2], default=d['x'])
413 ns ± 1.26 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)
In [4]: %timeit min([1, 2])
166 ns ± 0.642 ns per loop (mean ± std. dev. of 7 runs, 10000000 loops each)
In [6]: %%timeit
...: try:
...: min([1, 2])
...: except TypeError:
...: d['x']
...:
178 ns ± 2.74 ns per loop (mean ± std. dev. of 7 runs, 10000000 loops each)
In [8]: %%timeit
...: try:
...: min([])
...: except ValueError:
...: d['x']
...:
493 ns ± 4.52 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)This difference is small, but we do actually care about things at the microsecond level here.
There was a problem hiding this comment.
Huh, I'm surprised by the overhead added by the default argument. Good to know.
|
it's probably the dictionary access d[x]
…On Tue, Jun 26, 2018 at 5:34 PM, Tom Augspurger ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In dask/order.py
<#3588 (comment)>:
> num_needed = {k: len(v) for k, v in dependents.items()}
current = {k for k, v in num_needed.items() if v == 0}
while current:
key = current.pop()
result[key] = 1 + sum(result[parent] for parent in dependents[key])
+ try:
Huh, I'm surprised by the overhead added by the default argument. Good to
know.
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
<#3588 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AASszPI4Ief1upl78b164BuInNXHnd1Gks5uAqj4gaJpZM4Uiuor>
.
|
|
Playing with this larger example, 16GB, 200 blocks. from distributed import Client
client = Client(processes=False)
client
import os
from dask_ml.datasets import make_classification
from dask_ml.wrappers import Incremental
from sklearn.linear_model import SGDClassifier
import dask.array as da
X, y = make_classification(n_samples=2_000_000, n_features=1000, chunks=10_000, random_state=0)
if not os.path.exists("Xy.zarr"):
Xy = da.concatenate([X, y.reshape(-1, 1)], axis=1)
Xy.to_zarr('Xy.zarr', overwrite=True)
Xy = da.from_zarr("Xy.zarr")
X = Xy[:, :-1]
y = Xy[:, -1]
clf = SGDClassifier(random_state=0)
inc = Incremental(clf, scoring='accuracy')
%time inc.fit(X, y, classes=[0, 1])Status pane: https://streamable.com/upsan We get some nice overlap between data IO and model training. We're loading the next blocks while we train on previous ones. However, I think the scheduler decides to process all the load zarrs as soon as possible. Because we haven't released enough yet, since we haven't gone through all the |
|
Yeah, that's what I meant to highlight in #3588 (comment) . It's a problem, but a separate problem. I've been thinking about it a bit recently and haven't come to any very strong conclusions. |
|
I plan to merge this early tomorrow if there are no further comments. |
Should we move this to a new issue or is there one already? |
dask/tests/test_order.py
Outdated
| dsk = dict(w.__dask_graph__()) | ||
| o = order(dsk) | ||
| # dask.visualize(dsk, color='order', node_attr={'penwidth': '6'}, | ||
| # filename='dask.pdf') |
There was a problem hiding this comment.
I've added these lines the last few times I've played with order. I'm inclined to leave them in for myself or others. The image made by this test is particularly valuable to check correctness.
There was a problem hiding this comment.
Generally don't find commented code to be a good idea to keep around (especially in a nice library like Dask). It is seldom updated or used for that matter. Also it raises questions for later users as to its value (at which point it may end up being scrapped anyways). Not to mention it makes the code base a bit uglier. If the code is simple like this, it is easy for anyone to add should it be needed. Would recommend dropping it. However if you feel strongly about keeping it, would encourage including it as part of the test explicitly (uncommented) so it doesn't fall victim to the aforementioned issues. Given how easy this is to solve, would encourage picking one of these two options and addressing it before merging.
|
I would like to merge this. Any objections? |
I did a quick look and didn't see anything. I wouldn't be surprised if it has come up a few times already though in various issues. It's the sort of thing that, unfortunately, has a bus factor of 1. I've mostly been thinking about it on my own. |
|
Merging in a few hours if there are no further comments |
That's fair. Have no strong feelings about this. So happy with whatever you decide here. Though an open issue might track relevance if nothing else. Who knows maybe there are users interested in working on this problem. 😉 |
|
Anything else to do here @jakirkham ? |
|
Not from me. :) Feel free to merge it when ready. |
….com/convexset/dask into fix-tsqr-case-chunk-with-zero-height * 'fix-tsqr-case-chunk-with-zero-height' of https://github.com/convexset/dask: fixed typo in documentation and improved clarity Implement .blocks accessor (dask#3689) Fix wrong names (dask#3695) Adds endpoint and retstep support for linspace (dask#3675) Add the @ operator to the delayed objects (dask#3691) Align auto chunks to provided chunks, rather than shape (dask#3679) Adds quotes to source pip install (dask#3678) Prefer end-tasks with low numbers of dependencies when ordering (dask#3588) Reimplement argtopk to release the GIL (dask#3610) Note `da.pad` can be used with `map_overlap` (dask#3672) Allow tasks back onto ordering stack if they have one dependency (dask#3652) Fix extra progressbar (dask#3669) Break apart uneven array-of-int slicing to separate chunks (dask#3648) fix for `dask.array.linalg.tsqr` fails tests (intermittently) with arrays of uncertain dimensions (dask#3662)
|
I've been looking into the effects of the changes made to dask/order.py from release 0.16.1 up to this commit. In particular, I've noticed that from release 0.17.2 onwards, the memory footprint can increase significantly in some cases (see this issue). My workaround is to take an older version of dask/order.py and replace it on my master branch. I wonder whether the task prioritization strategy should be parametrized (after defining some common use cases) instead of trying to have a "one size fits all" strategy. |
|
I would be concerned that maintaining several ordering algorithms within the codebase would increase maintenance burden. I'm inclined to try to find one way that works well for everyone. However, I'd be open to making this more pluggable so that others could experiment with their own. |
Wound up running into some problems recently due to this behavior and ended up back in this PR. 😄 So have gone ahead and raised issue ( #4310 ) to track it. AFAICT I'm seeing the same issue though may have a slightly simpler example of it. |





Starting at dask/dask-ml#206 (comment)
Just a test so far. Coming back to this later this afternoon.