Skip to content

Tree reductions for dask.array#839

Merged
jcrist merged 9 commits intodask:masterfrom
jcrist:tree_red
Dec 3, 2015
Merged

Tree reductions for dask.array#839
jcrist merged 9 commits intodask:masterfrom
jcrist:tree_red

Conversation

@jcrist
Copy link
Copy Markdown
Member

@jcrist jcrist commented Nov 19, 2015

This enables support for tree reductions, which should improve efficiency of using dask.array across multiple processes/machines, or when arrays are composed of a large number of chunks.

The idea is to set a maximum number of chunks to be gathered and combined (either overall, or by axis) when performing reductions - breaking the reduce step of map-reduce into a tree of smaller reductions.

At an api level, reductions expose a max_leaves kwarg, which defaults to the current behavior. It accepts either a dict of {axis: max_chunks}, or an integer, which is used to compute max_chunks for each dimension such that the total number of chunks gathered in each reduction is approximately max_leaves. For example, axis=0, max_leaves=16 -> max_leaves={0: 16}, axis=(0, 1), max_leaves=16 -> max_leaves={0: 4, 1: 4}.

Example:

import dask.array as da
import numpy as np

x = np.arange(1, 122).reshape((11, 11)).astype('f4')
a = da.from_array(x, chunks=(4, 4))
o = a.sum(axis=0, max_leaves=4)

mydask

What would normally be a single step reduction has been broken into a tree of depth 2.

Todo:

  • Add support for moment (and thus std, var, etc...)
  • Add support for arg* reductions
  • More tests

@shoyer
Copy link
Copy Markdown
Member

shoyer commented Nov 19, 2015

Does using a tree merge cause performance degradation for the single node case? If not, we might want to default to that behavior.

@jcrist
Copy link
Copy Markdown
Member Author

jcrist commented Nov 19, 2015

For a wide enough tree, no. It actually might improve performance, as the aggregate step can be done in parallel (probably negligible though). Trick is determining what is a sane default number.

@mrocklin
Copy link
Copy Markdown
Member

These produce different results

In [11]: x.sum(max_leaves=4).visualize('dask.pdf')

In [12]: x.sum(max_leaves={0: 4}).visualize('dask.pdf')

@mrocklin
Copy link
Copy Markdown
Member

I wouldn't expect much degredation in non-pathological cases. The worst thing we're doing is adding O(log_k n) more tasks.

Suggest renaming max_leaves. I think that your intent of the name was something akin to max_children. There might even be a slicker name than that. I'm trying to think of a term like breadth that might be the converse of a depth of a tree. Perhaps "branching factor", if it wasn't so verbose.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you feel particularly underburdened we could move the partial call here into atop, allowing it to support keyword args.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you saying that atop should handle the partial itself?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps

@mrocklin
Copy link
Copy Markdown
Member

Any thoughts on how hard the combine function for moments will be?

@mrocklin
Copy link
Copy Markdown
Member

I haven't worked through the gritty details, but at first glance this looks pretty cool to me.

@jcrist
Copy link
Copy Markdown
Member Author

jcrist commented Nov 20, 2015

These produce different results

What is the shape of x in that case? If it's not 1d, then I'd expect them to. I definitely need to clean up input verification though, as it occurs to me that I never verify that sorted(axis) == sorted(max_leaves). Need max block combination sizes for all reduction axis.

@mrocklin
Copy link
Copy Markdown
Member

What is the shape of x in that case? If it's not 1d, then I'd expect them to. I definitely need to clean up input verification though, as it occurs to me that I never verify that sorted(axis) == sorted(max_leaves). Need max block combination sizes for all reduction axis.

x = da.arange(1000, chunks=100)

There is some input validation missing. This is also the motivation behind the "we should test the branching factor explicitly somewhere" comment

- Input cleanup
- More tests
- Fix integer `max_leaves` input bug
@jcrist
Copy link
Copy Markdown
Member Author

jcrist commented Nov 20, 2015

I cleaned up the tests, and fixed the bug you pointed out above. Could always test more, but I think the coverage here is pretty good.

Any thoughts on how hard the combine function for moments will be?

It's proving harder than I would like - my tired brain isn't up for mathing it out right now :/. Should be doable though, just need to do some math.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You could use dask.core.get_deps to get the dependencies dict and then use this dictionary in more sophisticated tests like the following:

dependencies, dependents = get_deps(x.sum(max_leaves={0: 2, 1: 3}).dask)
assert max(map(len, dependencies.values()) == 2 * 3

dependencies, dependents = get_deps(x.sum(axis=1, max_leaves={0: 2, 1: 3}).dask)
assert max(map(len, dependencies.values()) == 3

There are presumably several such interesting configurations that would be useful to verify now, rather than several months from now when someone screws with this code unknowingly.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ping

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I saw that. Will fix.

@mrocklin
Copy link
Copy Markdown
Member

@jcrist what is the status on this? I wouldn't mind using it to redo recent array experiments.

@jcrist
Copy link
Copy Markdown
Member Author

jcrist commented Nov 24, 2015

Doesn't support arg_*, var, std, or moment, and could use some more tests. I'm mostly out this week, so I doubt this will be fixed until next week. Should be good to go for redoing the array experiments though, if you don't mind working off non-merged stuff.

@mrocklin
Copy link
Copy Markdown
Member

I'm not sure if these should have the same name or not.

In [1]: import dask.array as da

In [2]: x = da.random.random((100, 100), chunks=(10, 10))

In [3]: x.mean(axis=0).name
Out[3]: 'reduce-460ea5b3313d36450410978acc2ace95'

In [4]: x.mean(axis=0, max_leaves=6).name
Out[4]: 'reduce-460ea5b3313d36450410978acc2ace95'

@mrocklin
Copy link
Copy Markdown
Member

Using this branch I've found a speedup from 30s to 5s when doing a reduction of 2GB across a small network of 3 machines. This is only when the reduction required data sharing and was only because we happened to have scattered the data in a way that aligns with this (both defaults cause nice behavior.) There was no speedup or slowdown when the reduction didn't require significant data transfer, though that case was fast anyway (1-2s).

@mrocklin
Copy link
Copy Markdown
Member

Reasons to keep the keys the same:

  • They're mathematically the same result, just computed in different ways

Reasons to keep the keys separate and include max_leaves in tokenize:

  • The graphs are quite different, and so assumptions made about same keys having the same dependencies are incorrect.

@jcrist
Copy link
Copy Markdown
Member Author

jcrist commented Dec 1, 2015

I made them have the same keys so that it would play well with caching. I could revert this, but it seemed like the correct behavior in my mind. Where do we make assumptions that identical key names have the same dependencies?

@mrocklin
Copy link
Copy Markdown
Member

mrocklin commented Dec 1, 2015

Within the distributed scheduler.

Or, more generally, within any scheduler that supports graph updates.

@jcrist
Copy link
Copy Markdown
Member Author

jcrist commented Dec 1, 2015

Ah, that makes sense. I don't have strong opinions on this, happy to go either way.

@mrocklin
Copy link
Copy Markdown
Member

mrocklin commented Dec 1, 2015

My life will be easier if we include the branching factor within tokenize. It also follows the "when in doubt, disambiguate keys" principle.

@mrocklin
Copy link
Copy Markdown
Member

mrocklin commented Dec 1, 2015

I'm still in favor of changing the name away from max_leaves. To me a leaf is strictly at the bottom of a tree. This is a restriction both on leaves and on interior nodes.

@jcrist
Copy link
Copy Markdown
Member Author

jcrist commented Dec 1, 2015

Better name suggestion? Some ideas:

  • max_children
  • max_branches
  • branches
  • branch_factor
  • bfactor

@mrocklin
Copy link
Copy Markdown
Member

mrocklin commented Dec 1, 2015

I like max_children and branch_factor

@mrocklin
Copy link
Copy Markdown
Member

mrocklin commented Dec 1, 2015

Heuristics sound like they would be valuable. I think that we'll be better able to do this after some use. I could just @jcrist 's help on some other things so I'm still behind a default that is just-a-number for now.

What are your thoughts on the kwarg names above?

@shoyer
Copy link
Copy Markdown
Member

shoyer commented Dec 1, 2015

I think max_children is better than branch_factor, which is more vague. It still feels more evocative for the particular implementation (using trees) rather than the concept (splitting the reduction into sub-problems).

More options:
split_threshold
max_chunks
max_splits
reduce_limit
subproblem_size
max_subproblems

I think split_threshold is my favorite.

Previously `a.sum().name == a.sum(split_threshold=2).name`. This has
been removed, as distributed assumes keys with the same name have the
same dependencies.
@jcrist
Copy link
Copy Markdown
Member Author

jcrist commented Dec 3, 2015

This could use another review. I changed the keyword to split_threshold, added it to set_options, and defaulted at 32. Tests were improved, and all reductions are now supported.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a reassuring test. Thanks!

@mrocklin mrocklin changed the title [WIP] Tree reductions for dask.array Tree reductions for dask.array Dec 3, 2015
@mrocklin
Copy link
Copy Markdown
Member

mrocklin commented Dec 3, 2015

This looks great to me.

@mrocklin
Copy link
Copy Markdown
Member

mrocklin commented Dec 3, 2015

+1

jcrist added a commit that referenced this pull request Dec 3, 2015
Tree reductions for dask.array
@jcrist jcrist merged commit 2475078 into dask:master Dec 3, 2015
@jcrist jcrist deleted the tree_red branch December 3, 2015 22:26
@jcrist
Copy link
Copy Markdown
Member Author

jcrist commented Dec 3, 2015

I expect this to need some use before we can determine a good default behavior/smart heuristic. I'm going to experiment with the ocean dataset to see how different configurations fair. If others have some workflows they could try this on, it would be much appreciated.

@mrocklin
Copy link
Copy Markdown
Member

mrocklin commented Dec 4, 2015

I was playing with this with distributed with @stefanv . Some issues came up

  1. The split_threshold kwarg wasn't immediately clear to him (perhaps he can chime in here on his thoughts
  2. We ran into an issue about reductions on flexible types while computing a mean. This might be a distributed or a dask.array issue. Some play testing is in order. Flexible type issues arise when you try to call a typical reduction on an array of struct dtype.

@jcrist
Copy link
Copy Markdown
Member Author

jcrist commented Dec 4, 2015

There are no docs for this yet, so any suggestions on how to make this more intuitive to use would be much appreciated.

We ran into an issue about reductions on flexible types while computing a mean.

I assume you weren't trying to compute a reduction on a struct array (which doesn't work even in numpy), but instead a bug somewhere in dask/distributed that resulted in the reduction being computed on a struct array? If you remember what computation caused it, it would be nice to be able to reproduce.

@stefanv
Copy link
Copy Markdown
Contributor

stefanv commented Dec 4, 2015

The reason split_threshold did not immediately make sense to me is because it contains these two concepts I needed to think about: split and threshold. And threshold is slightly confusing because it could imply "split after this condition" or "split until this condition", with potentially different meanings ("split after n chunks" or "split until you have n chunks").

So, while I don't propose this name, something like split_every_n_chunks would read very straightforwardly. Maybe there's a shorter version of that, like chunks_per_split, split_every, group_chunks, etc.

@shoyer
Copy link
Copy Markdown
Member

shoyer commented Dec 4, 2015

I like Stefan's names!

On Thu, Dec 3, 2015 at 7:51 PM, Stefan van der Walt
notifications@github.com wrote:

The reason split_threshold did not immediately make sense to me is because it contains these two concepts I needed to think about: split and threshold. And threshold is slightly confusing because it could imply "split after this condition" or "split until this condition", with potentially different meanings ("split after n chunks" or "split until you have n chunks").

So, while I don't propose this name, something like split_every_n_chunks would read very straightforwardly. Maybe there's a shorter version of that, like chunks_per_split, split_every, group_chunks, etc.

Reply to this email directly or view it on GitHub:
#839 (comment)

@jcrist
Copy link
Copy Markdown
Member Author

jcrist commented Dec 11, 2015

Sorry, this somehow slipped through. Of the new names, I like split_every the best, when used in a function call (i.e. a.mean(axis=1, split_every=32)). As a global/context config though with set_options, I like split_ever_n_chunks the best, as it's more descriptive. Unsure what's best here. Either way, a better name would be nice.

@mrocklin
Copy link
Copy Markdown
Member

I'd prefer using the same name in both places if possible.

On Fri, Dec 11, 2015 at 3:01 PM, Jim Crist notifications@github.com wrote:

Sorry, this somehow slipped through. Of the new names, I like split_every
the best, when used in a function call (i.e. a.mean(axis=1,
split_every=32)). As a global/context config though with set_options, I
like split_ever_n_chunks the best, as it's more descriptive. Unsure
what's best here. Either way, a better name would be nice.


Reply to this email directly or view it on GitHub
#839 (comment).

@jcrist
Copy link
Copy Markdown
Member Author

jcrist commented Dec 11, 2015

Yeah, I wasn't suggesting different names in different spots. Perhaps just go with the more verbose one, as it will probably be set contextually/globally?

@mrocklin
Copy link
Copy Markdown
Member

I'd prefer split_every over split_every_n_chunks

@shoyer
Copy link
Copy Markdown
Member

shoyer commented Dec 11, 2015

I think split_every is probably verbose enough. I suspect it would usually
be poor practice to set this globally, given that it depends on the
particular reduction being performed.

On Fri, Dec 11, 2015 at 3:12 PM, Matthew Rocklin notifications@github.com
wrote:

I'd prefer split_every over split_every_n_chunks


Reply to this email directly or view it on GitHub
#839 (comment).

@jcrist
Copy link
Copy Markdown
Member Author

jcrist commented Dec 11, 2015

Done. See #876.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants