Skip to content

Prefer end-tasks with low numbers of dependencies when ordering#3588

Merged
mrocklin merged 17 commits intodask:masterfrom
TomAugspurger:dask-ml-order
Jun 27, 2018
Merged

Prefer end-tasks with low numbers of dependencies when ordering#3588
mrocklin merged 17 commits intodask:masterfrom
TomAugspurger:dask-ml-order

Conversation

@TomAugspurger
Copy link
Member

Starting at dask/dask-ml#206 (comment)

Just a test so far. Coming back to this later this afternoon.

('a', 3): 3}


def test_prefer_short_narrow():
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I recommend using the abcde fixture here to use that for key names. It changes order to ensure that we're not entirely dependent on the fact that, 'a' < 'b' for example.

@TomAugspurger
Copy link
Member Author

Just leaving some notes to pick up later.

The most minimal change, which breaks other tests, is

@@ -139,9 +150,13 @@ def order(dsk, dependencies=None):
         deps = [d for d in dependents[item]
                 if d not in result and not (d in seen and waiting[d])]
         if len(deps) < 1000:
-            deps = sorted(deps, key=dependents_key, reverse=True)
+            deps = sorted(deps, key=dependents_key, reverse=False)

with reverse=True, we put ('a', 1) (which represents the last data block of X) next on the stack / result list.

This fails

  • test_gh_3505
  • test_prefer_short_dependents
  • test_nearest_neighbor
  • test_string_ordering_dependents

Investigating why later.

@TomAugspurger
Copy link
Member Author

Briefly, though, it seems like that reverse=True is there so we pick tasks with shorter task chains

With reverse=True (master)

mydask

With reverse=False

mydask

@mrocklin
Copy link
Member

The objective behind that choice is to allow c to be collected early on. Otherwise it sticks around while we run the longer chain.

@jakirkham
Copy link
Member

That makes sense. It seems where it gets thrown off is when we add a shared vertex. In these cases having a short chain doesn't matter as it will still be dependent on the longer chains to finish (suggesting the longest chain of a shared vertex should get highest priority). Is there a good way to include some logic regarding shared vertices and their dependents?

@TomAugspurger
Copy link
Member Author

TomAugspurger commented Jun 12, 2018

I think a rule of "adding a common ancestor" shouldn't change the ordering is a good one. Still not sure how we would actually do that quickly though.

@mrocklin
Copy link
Member

Still not sure how we would actually do that quickly though.

Near-optimal task scheduling in linear time is hard :)

@TomAugspurger
Copy link
Member Author

TomAugspurger commented Jun 21, 2018

Picking this up again today (ignore the config / debugging code. it'll be removed).

I think the cleanest example is the following:

a, b, c, d, e, = 'abcde'

def f(*args): pass

good = {
        (a, 0): 0,
        (b, 0): 0,
        (c, 0): 0,
        (c, 1): (f, (c, 0), (a, 0), (b, 0)),
        (a, 1): 1,
        (b, 1): 1,
        (c, 2): (f, (c, 1), (a, 1), (b, 1)),
}
 

bad = {
    ('ab'): 0,
    (a, 0): (f, 'ab', 0, 0),
    (b, 0): (f, 'ab', 0, 1),
    (c, 0): 0,
    (c, 1): (f, (c, 0), (a, 0), (b, 0)),
    (a, 1): (f, 'ab', 1, 0),
    (b, 1): (f, 'ab', 1, 1),
    (c, 2): (f, (c, 1), (a, 1), (b, 1)),
}

Diagrammatically, that's

    Good:

                    c2
                   / \ \
                  /   \ \
                c1     \ \
              / | \     \ \
             c  a0 b0   a1 b1

    Bad:

                    c2
                   / \ \
                  /   \ \
                c1     \ \
              / | \     \ \
             c  a0 b0   a1 b1
                   \ \   / /
                    \ \ / /
                      a-b

And visually, it's

good:

mydask

bad:

mydask

We would like to choose (c, 1) before (a 1), since doing (c, 1) first let's use release (a, 0), (b, 0). Then we don't have to have (a, 0) and (a, 1) in memory at the same time.

cc @eriknw if this ordering issue interests you.

@mrocklin
Copy link
Member

I took a look at this problem. Stepping through order it seems like it does correctly identify that c1 is a higher priority than a1, b2, then it goes to a0 or b1, and from there to a-b. Everything here so far is correct.

What happens next though is unfortunate, It looks up from ab and sees both a0/b0 and a1/b1 and it decides that it would prefer what it perceives to be the path that will finish the soonest. Its proxy for "soonest" is the number of total dependents, so it chooses a1/b1, which in this case is wrong.

Number of total dependents is a poor (but cheap!) proxy for soonest-finishing-path. It's wrong because all paths are ending in the same place here, c2. In this case we would actually prefer to take the branch that has the most work between itself and the end point. I wonder if something like the following would be productive (I give this a 30% chance of being a good idea):

  1. For each node compute the maximum total_dependencies among all of its dependents. In this case all nodes would have the same value around 10. I'm calling this max_dependencies for now. Computing this would be similar to the current ndependents/ndependencies functions, except that they would probably also consume the total_dependents/total_dependencies as inputs.
  2. In the dependents_key function use something like max_dependents[key] - total_dependents[key] to select for nodes that are close to finished.

Anyway, just thinking out loud, it's something to try.

@mrocklin
Copy link
Member

Also, FWIW, for debugging I like putting the following just after the while stack: line

print(stack)
import pdb; pdb.set_trace()

I find that this is a good way to see how things progress, while being able to either easily step through the algorithm iteration by iteration (with c) or investigate what happens next as necessary.

@TomAugspurger
Copy link
Member Author

Thanks.

My mind went to something like "number of dependents, not included the dependents of my sibling tasks", but it wasn't clear to be how "siblings" would be tracked in ndependents, or whether that even generalizes beyond this problem.

Your proposal sounds simpler. I can try it out.

Anyway, just thinking out loud, it's something to try.

I appreciate it.

@TomAugspurger
Copy link
Member Author

FYI, for fun I was playing around with changing dask_ml._partial.fit to use the distributed scheduler and user-defined priorities. That seemed to help, but it'd be beneficial to fix it here.

@mrocklin
Copy link
Member

mrocklin commented Jun 21, 2018 via email

@mrocklin
Copy link
Member

@TomAugspurger if you have a chance can I ask you to try your workloads with this branch? If things work out then I'll modify docs and tune things a bit.

@TomAugspurger
Copy link
Member Author

Seems to be ideal on the small example I showed on Friday.

out

I'll try it out on a larger problem.

@TomAugspurger
Copy link
Member Author

As a side, note, it seems like the previous fits are no longer held in memory. I don't think that change was in dask-ml.

@mrocklin
Copy link
Member

mrocklin commented Jun 26, 2018 via email

@mrocklin
Copy link
Member

I've now rolled this into the pass with ndependents so, while it adds a bit of code complexity, doesn't affect performance as much.

@mrocklin
Copy link
Member

@TomAugspurger your review of the module-level docstring would be particularly welcome

"""
From https://github.com/dask/dask-ml/issues/206#issuecomment-395869929

Two cases, one where chunks of an array or indepented, and one where the
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I meant "are independent", not "or indepented"

From https://github.com/dask/dask-ml/issues/206#issuecomment-395869929

Two cases, one where chunks of an array or indepented, and one where the
chunks of an array of a shared source. We handled the independent one
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"of a" -> "have a"

dask/order.py Outdated
operate efficiently in linear time. We strongly recommend that readers look at
the docstrings of tests in dask/tests/test_order.py. These tests usually have
graph types laid out very carefully to show the kinds of situations that often
arise.
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

", and the order we would like to be determined`."

dask/order.py Outdated

1. **Small goals**: prefer tasks whose final dependents have few dependencies.

By final dependent we mean a task that depends on this task that is the end
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite follow this first sentence.

while current:
key = current.pop()
result[key] = 1 + sum(result[parent] for parent in dependents[key])
try:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the try / except to handle the empty case? If so, min takes a default= argument. So

min_result[key] = min(min_result[parent] for parent in dependents[key], default=total_dependencies[key])

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While more verbose, try-except is often performance optimal when the try case is much more common.

In [1]: d = {'x': 1}

In [2]: %timeit min([], default=d['x'])
378 ns ± 16.1 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

In [3]: %timeit min([1, 2], default=d['x'])
413 ns ± 1.26 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

In [4]: %timeit min([1, 2])
166 ns ± 0.642 ns per loop (mean ± std. dev. of 7 runs, 10000000 loops each)

In [6]: %%timeit
   ...: try:
   ...:     min([1, 2])
   ...: except TypeError:
   ...:     d['x']
   ...:     
178 ns ± 2.74 ns per loop (mean ± std. dev. of 7 runs, 10000000 loops each)

In [8]: %%timeit
   ...: try:
   ...:     min([])
   ...: except ValueError:
   ...:     d['x']
   ...:     
493 ns ± 4.52 ns per loop (mean ± std. dev. of 7 runs, 1000000 loops each)

This difference is small, but we do actually care about things at the microsecond level here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Huh, I'm surprised by the overhead added by the default argument. Good to know.

@mrocklin
Copy link
Member

mrocklin commented Jun 26, 2018 via email

@TomAugspurger
Copy link
Member Author

Playing with this larger example, 16GB, 200 blocks.

from distributed import Client
client = Client(processes=False)
client

import os
from dask_ml.datasets import make_classification
from dask_ml.wrappers import Incremental
from sklearn.linear_model import SGDClassifier
import dask.array as da

X, y = make_classification(n_samples=2_000_000, n_features=1000, chunks=10_000, random_state=0)

if not os.path.exists("Xy.zarr"):
    Xy = da.concatenate([X, y.reshape(-1, 1)], axis=1)
    Xy.to_zarr('Xy.zarr', overwrite=True)

Xy = da.from_zarr("Xy.zarr")
X = Xy[:, :-1]
y = Xy[:, -1]

clf = SGDClassifier(random_state=0)
inc = Incremental(clf, scoring='accuracy')

%time inc.fit(X, y, classes=[0, 1])

Status pane: https://streamable.com/upsan

We get some nice overlap between data IO and model training. We're loading the next blocks while we train on previous ones.

However, I think the scheduler decides to process all the load zarrs as soon as possible. Because we haven't released enough yet, since we haven't gone through all the fits, we end up spilling to disk. Is this the desired behavior?

@mrocklin
Copy link
Member

Yeah, that's what I meant to highlight in #3588 (comment) . It's a problem, but a separate problem. I've been thinking about it a bit recently and haven't come to any very strong conclusions.

@mrocklin
Copy link
Member

I plan to merge this early tomorrow if there are no further comments.

@mrocklin mrocklin changed the title [WIP] Ordering for dask-ml use case Prefer end-tasks with low numbers of dependencies when ordering Jun 26, 2018
@jakirkham
Copy link
Member

However, I think the scheduler decides to process all the load zarrs as soon as possible. Because we haven't released enough yet, since we haven't gone through all the fits, we end up spilling to disk. Is this the desired behavior?

Should we move this to a new issue or is there one already?

dsk = dict(w.__dask_graph__())
o = order(dsk)
# dask.visualize(dsk, color='order', node_attr={'penwidth': '6'},
# filename='dask.pdf')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be dropped?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've added these lines the last few times I've played with order. I'm inclined to leave them in for myself or others. The image made by this test is particularly valuable to check correctness.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally don't find commented code to be a good idea to keep around (especially in a nice library like Dask). It is seldom updated or used for that matter. Also it raises questions for later users as to its value (at which point it may end up being scrapped anyways). Not to mention it makes the code base a bit uglier. If the code is simple like this, it is easy for anyone to add should it be needed. Would recommend dropping it. However if you feel strongly about keeping it, would encourage including it as part of the test explicitly (uncommented) so it doesn't fall victim to the aforementioned issues. Given how easy this is to solve, would encourage picking one of these two options and addressing it before merging.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As you like. Removed

@mrocklin
Copy link
Member

I would like to merge this. Any objections?

@mrocklin
Copy link
Member

Should we move this to a new issue or is there one already?

I did a quick look and didn't see anything. I wouldn't be surprised if it has come up a few times already though in various issues.

It's the sort of thing that, unfortunately, has a bus factor of 1. I've mostly been thinking about it on my own.

@mrocklin
Copy link
Member

Merging in a few hours if there are no further comments

@jakirkham
Copy link
Member

However, I think the scheduler decides to process all the load zarrs as soon as possible. Because we haven't released enough yet, since we haven't gone through all the fits, we end up spilling to disk. Is this the desired behavior?

Should we move this to a new issue or is there one already?

I did a quick look and didn't see anything. I wouldn't be surprised if it has come up a few times already though in various issues.

It's the sort of thing that, unfortunately, has a bus factor of 1. I've mostly been thinking about it on my own.

That's fair. Have no strong feelings about this. So happy with whatever you decide here. Though an open issue might track relevance if nothing else. Who knows maybe there are users interested in working on this problem. 😉

@mrocklin
Copy link
Member

Anything else to do here @jakirkham ?

@jakirkham
Copy link
Member

Not from me. :) Feel free to merge it when ready.

@mrocklin mrocklin merged commit 6741495 into dask:master Jun 27, 2018
@TomAugspurger TomAugspurger deleted the dask-ml-order branch June 29, 2018 13:02
convexset added a commit to convexset/dask that referenced this pull request Jul 1, 2018
….com/convexset/dask into fix-tsqr-case-chunk-with-zero-height

* 'fix-tsqr-case-chunk-with-zero-height' of https://github.com/convexset/dask:
  fixed typo in documentation and improved clarity
  Implement .blocks accessor (dask#3689)
  Fix wrong names (dask#3695)
  Adds endpoint and retstep support for linspace (dask#3675)
  Add the @ operator to the delayed objects (dask#3691)
  Align auto chunks to provided chunks, rather than shape (dask#3679)
  Adds quotes to source pip install (dask#3678)
  Prefer end-tasks with low numbers of dependencies when ordering (dask#3588)
  Reimplement argtopk to release the GIL (dask#3610)
  Note `da.pad` can be used with `map_overlap` (dask#3672)
  Allow tasks back onto ordering stack if they have one dependency (dask#3652)
  Fix extra progressbar (dask#3669)
  Break apart uneven array-of-int slicing to separate chunks (dask#3648)
  fix for `dask.array.linalg.tsqr` fails tests (intermittently) with arrays of uncertain dimensions (dask#3662)
@estebanag
Copy link
Contributor

estebanag commented Jul 4, 2018

I've been looking into the effects of the changes made to dask/order.py from release 0.16.1 up to this commit.

In particular, I've noticed that from release 0.17.2 onwards, the memory footprint can increase significantly in some cases (see this issue). My workaround is to take an older version of dask/order.py and replace it on my master branch.

I wonder whether the task prioritization strategy should be parametrized (after defining some common use cases) instead of trying to have a "one size fits all" strategy.

@mrocklin
Copy link
Member

mrocklin commented Jul 4, 2018

I would be concerned that maintaining several ordering algorithms within the codebase would increase maintenance burden. I'm inclined to try to find one way that works well for everyone. However, I'd be open to making this more pluggable so that others could experiment with their own.

@jakirkham
Copy link
Member

However, I think the scheduler decides to process all the load zarrs as soon as possible. Because we haven't released enough yet, since we haven't gone through all the fits, we end up spilling to disk. Is this the desired behavior?

Wound up running into some problems recently due to this behavior and ended up back in this PR. 😄

So have gone ahead and raised issue ( #4310 ) to track it. AFAICT I'm seeing the same issue though may have a slightly simpler example of it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants