Conversation
|
Does using a tree merge cause performance degradation for the single node case? If not, we might want to default to that behavior. |
|
For a wide enough tree, no. It actually might improve performance, as the aggregate step can be done in parallel (probably negligible though). Trick is determining what is a sane default number. |
|
These produce different results In [11]: x.sum(max_leaves=4).visualize('dask.pdf')
In [12]: x.sum(max_leaves={0: 4}).visualize('dask.pdf') |
|
I wouldn't expect much degredation in non-pathological cases. The worst thing we're doing is adding Suggest renaming |
There was a problem hiding this comment.
If you feel particularly underburdened we could move the partial call here into atop, allowing it to support keyword args.
There was a problem hiding this comment.
Are you saying that atop should handle the partial itself?
|
Any thoughts on how hard the |
|
I haven't worked through the gritty details, but at first glance this looks pretty cool to me. |
What is the shape of x in that case? If it's not 1d, then I'd expect them to. I definitely need to clean up input verification though, as it occurs to me that I never verify that |
There is some input validation missing. This is also the motivation behind the "we should test the branching factor explicitly somewhere" comment |
- Input cleanup - More tests - Fix integer `max_leaves` input bug
|
I cleaned up the tests, and fixed the bug you pointed out above. Could always test more, but I think the coverage here is pretty good.
It's proving harder than I would like - my tired brain isn't up for mathing it out right now :/. Should be doable though, just need to do some math. |
dask/array/tests/test_reductions.py
Outdated
There was a problem hiding this comment.
You could use dask.core.get_deps to get the dependencies dict and then use this dictionary in more sophisticated tests like the following:
dependencies, dependents = get_deps(x.sum(max_leaves={0: 2, 1: 3}).dask)
assert max(map(len, dependencies.values()) == 2 * 3
dependencies, dependents = get_deps(x.sum(axis=1, max_leaves={0: 2, 1: 3}).dask)
assert max(map(len, dependencies.values()) == 3There are presumably several such interesting configurations that would be useful to verify now, rather than several months from now when someone screws with this code unknowingly.
There was a problem hiding this comment.
Yeah, I saw that. Will fix.
|
@jcrist what is the status on this? I wouldn't mind using it to redo recent array experiments. |
|
Doesn't support |
|
I'm not sure if these should have the same name or not. In [1]: import dask.array as da
In [2]: x = da.random.random((100, 100), chunks=(10, 10))
In [3]: x.mean(axis=0).name
Out[3]: 'reduce-460ea5b3313d36450410978acc2ace95'
In [4]: x.mean(axis=0, max_leaves=6).name
Out[4]: 'reduce-460ea5b3313d36450410978acc2ace95' |
|
Using this branch I've found a speedup from 30s to 5s when doing a reduction of 2GB across a small network of 3 machines. This is only when the reduction required data sharing and was only because we happened to have scattered the data in a way that aligns with this (both defaults cause nice behavior.) There was no speedup or slowdown when the reduction didn't require significant data transfer, though that case was fast anyway (1-2s). |
|
Reasons to keep the keys the same:
Reasons to keep the keys separate and include
|
|
I made them have the same keys so that it would play well with caching. I could revert this, but it seemed like the correct behavior in my mind. Where do we make assumptions that identical key names have the same dependencies? |
|
Within the distributed scheduler. Or, more generally, within any scheduler that supports graph updates. |
|
Ah, that makes sense. I don't have strong opinions on this, happy to go either way. |
|
My life will be easier if we include the branching factor within tokenize. It also follows the "when in doubt, disambiguate keys" principle. |
|
I'm still in favor of changing the name away from |
|
Better name suggestion? Some ideas:
|
|
I like |
|
Heuristics sound like they would be valuable. I think that we'll be better able to do this after some use. I could just @jcrist 's help on some other things so I'm still behind a default that is just-a-number for now. What are your thoughts on the kwarg names above? |
|
I think More options: I think |
Previously `a.sum().name == a.sum(split_threshold=2).name`. This has been removed, as distributed assumes keys with the same name have the same dependencies.
|
This could use another review. I changed the keyword to |
There was a problem hiding this comment.
This is a reassuring test. Thanks!
|
This looks great to me. |
|
+1 |
|
I expect this to need some use before we can determine a good default behavior/smart heuristic. I'm going to experiment with the ocean dataset to see how different configurations fair. If others have some workflows they could try this on, it would be much appreciated. |
|
I was playing with this with distributed with @stefanv . Some issues came up
|
|
There are no docs for this yet, so any suggestions on how to make this more intuitive to use would be much appreciated.
I assume you weren't trying to compute a reduction on a struct array (which doesn't work even in numpy), but instead a bug somewhere in dask/distributed that resulted in the reduction being computed on a struct array? If you remember what computation caused it, it would be nice to be able to reproduce. |
|
The reason So, while I don't propose this name, something like |
|
I like Stefan's names! On Thu, Dec 3, 2015 at 7:51 PM, Stefan van der Walt
|
|
Sorry, this somehow slipped through. Of the new names, I like |
|
I'd prefer using the same name in both places if possible. On Fri, Dec 11, 2015 at 3:01 PM, Jim Crist notifications@github.com wrote:
|
|
Yeah, I wasn't suggesting different names in different spots. Perhaps just go with the more verbose one, as it will probably be set contextually/globally? |
|
I'd prefer |
|
I think split_every is probably verbose enough. I suspect it would usually On Fri, Dec 11, 2015 at 3:12 PM, Matthew Rocklin notifications@github.com
|
|
Done. See #876. |
This enables support for tree reductions, which should improve efficiency of using
dask.arrayacross multiple processes/machines, or when arrays are composed of a large number of chunks.The idea is to set a maximum number of chunks to be gathered and combined (either overall, or by axis) when performing reductions - breaking the reduce step of map-reduce into a tree of smaller reductions.
At an api level, reductions expose a
max_leaveskwarg, which defaults to the current behavior. It accepts either adictof{axis: max_chunks}, or an integer, which is used to computemax_chunksfor each dimension such that the total number of chunks gathered in each reduction is approximatelymax_leaves. For example,axis=0, max_leaves=16 -> max_leaves={0: 16},axis=(0, 1), max_leaves=16 -> max_leaves={0: 4, 1: 4}.Example:
What would normally be a single step reduction has been broken into a tree of depth 2.
Todo:
moment(and thusstd,var, etc...)arg*reductions