Optionally return stored data after storing#2980
Conversation
|
This works fine based on the tests added thus far. That said, would be good to get some feedback on this approach and whether it seems reasonable for Dask. Happy to provide more context if needed. |
|
Interesting. I like the approach. Just so that I'm clear the objective here is something like In this case it seems like |
d51b914 to
84a0b38
Compare
|
Correct. That is the idea. Have a few other thoughts about how this can be used. Caching being one of them. Though there is already value in being able to checkpoint this computation. Yeah was thinking about the meaning of Another option would be to make Something else to consider is we could break this up into |
I'm against this. People should just use persist in this case.
I like this. It seems fairly clean. I think we've recommended this workflow anecdotally to people in the past. |
|
Agreed. I'll retool this for the last proposal. Expect the code to get a lot cleaner (fewer branches) as a consequence. |
4bffe44 to
75bc1dc
Compare
|
Ok, I think this is starting to look the way we want. Would be curious to hear what you think. |
a9128bf to
0825850
Compare
mrocklin
left a comment
There was a problem hiding this comment.
I haven't given this a thorough review, just put in a couple comments.
In general this is more code than I expected to be necessary. My hope is that this would be closer to something like
finished_writing = delayed(...)
if keep:
return da.from_array(out, chunks=inp.chunks, wait_token=finished_writing)
else:
return finished_writing
dask/array/core.py
Outdated
| src.dask | ||
| )) | ||
| ).persist().dask[each_store_key] | ||
| ) |
There was a problem hiding this comment.
.persist is blocking when using the single machine scheduler. We definitely don't want to call it in a loop.
FWIW I find this statement difficult to parse. That's subjective though.
There was a problem hiding this comment.
Sure we can drop all of these into one persist call. Expect that will make it easier to parse as well.
There was a problem hiding this comment.
Should add that generally what we are trying to accomplish here is submitting the Dask Graph associated with storing each chunk.
There was a problem hiding this comment.
Also valuable to combine persist calls to reuse intermediates.
FWIW I think this statement should now be a bit cleaner and more performant.
dask/array/core.py
Outdated
| load_key = ('load-%s' % store_key[0].lstrip("store-"),) + store_key[1:] | ||
| dsk[load_key] = (load, store_key,) + out_dsk[store_key][3:-1] | ||
|
|
||
| return dsk |
There was a problem hiding this comment.
Why is this necessary on top of from_array? They seem very similar
There was a problem hiding this comment.
😕 I don't think we are calling from_array or are you meaning that you feel the behavior is similar to from_array? If the latter, more on this in the comment to follow.
There was a problem hiding this comment.
This logic seemed similar to from_array. I was curious if there was a reason why we can't reuse the existing functionality. Mostly I'm concerned with the amount of code being written into dask.array.core to support this feature. If it isn't easy to accomplish what we want to do with existing tools then is there something we can do to improve our current tools?
|
To be clear, current machinery doesn't implement the necessary bits to do this currently, but my hope was that we could modify the existing machinery in relatively minor ways rather than create a parallel system alongside. |
|
There may be reasons against this of course. As a disclaimer I haven't yet given this a very thorough review. |
|
Thanks for the feedback. From a big picture standpoint, we'd really like to be able to view results as they are being computed. Also we'd really like to make sure that what we view is an accurate reflection of what we have saved to disk. In order to handle both of these, we really need to be able to view a chunk as soon as its done computing and saved to disk even if other ones are still being computed and/or written. Hence having this one-to-one mapping of store to load steps aids that goal by letting us pull in computed chunks from disk once they are ready. It's pretty easy to handle the problem in this context as we have all the relevant pieces. Moving them elsewhere may be possible, but could make it harder to reason about. Ultimately I'd like to condense the branching in this PR. Expect that the end result will be able to reuse the bulk of |
|
OK, I see how what I was proposing doesn't achieve the same goals as what you're doing here. I agree that what you're proposing sounds attractive. |
1b25996 to
77d586b
Compare
ff7ca87 to
69971d9
Compare
|
This could use another look if you have time. |
69971d9 to
4c46657
Compare
Instead of merging each of `load_dsks`' graphs within a `for`-loop, merge them altogether before entering the `for`-loop and reuse that graph for all Dask Arrays constructed.
Make sure there is only one `return` statement to make it easier to see that a `return` does occur.
Includes some tests to exercise `store`'s `keep=True` cases to make sure this behaves as intended. Namely that a Dask Array is returned and that storage only occurs if `compute=True` or if `compute=False` and a later call to `compute` triggers this.
2a13328 to
b22631e
Compare
Follow a suggestion to rename the argument to make it a little clearer.
Namely explain how arguments are reused from the `store_chunk` calls to construct the `load_chunk` calls. Also explain what is dropped and replaced in these new calls.
|
Thanks @jcrist. Pushed a bunch of changes based on your suggestions above. There is one point regarding the |
dask/array/core.py
Outdated
| # `(store_chunk, t, out, slc, lock, region, return_stored)` | ||
| # Namely drop the first 3 arguments as they are the function and two | ||
| # arrays that were already used up by `store_chunk`. In their place use | ||
| # `load_chunk` and the result from `store_chunk` (i.e. `each_key`). |
There was a problem hiding this comment.
Verbosity makes it harder for people to read when skimming code. I'd just write:
# Reuse extra arguments from `store_chunk` in `load_chunk`.Again, not a huge deal. I don't mean to bikeshed your code, I think we have two very different styles when writing. The comment is fine as is.
There was a problem hiding this comment.
No worries. Wasn't sure the level of detail we wanted here. Happy to cut to one line.
Should make it a little clearer what is returned under which conditions.
|
Think that addresses the last round of comments. Please let me know if there is anything else. |
dask/array/core.py
Outdated
| src.chunks, | ||
| src.dtype | ||
| )) | ||
| result = tuple(result) |
There was a problem hiding this comment.
Apologies. One last nit: this loop/append/tuple can be reduced to a one liner, which IMO improves readability.
return tuple(Array(load_dsks_mrg, name, src.chunks, src.dtype)
for name in load_names)Take it or leave it. Otherwise this looks good to me.
There was a problem hiding this comment.
Hmm...I think we got too clever here and introduced a bug. Shouldn't be src as that would only repeat the last array's metadata for all arrays. Will submit a follow-up PR to fix this.
Edit: The src was my doing originally though.
There was a problem hiding this comment.
Added a test to demonstrate the bug and included a fix for it in PR ( #3064 ).
| sources_dsk = Array.__dask_optimize__( | ||
| sources_dsk, | ||
| [e.__dask_keys__() for e in sources] | ||
| ) |
There was a problem hiding this comment.
While you are looking @jcrist, would be good if you could give these optimization lines a quick look. Based on the docs this seemed ok, but you certainly know better since you wrote all of these __dask_*__ functions. 😉
There was a problem hiding this comment.
That's exactly how they should be used.
Follow-up on a suggestion to make this code more compact.
|
|
||
| result = None | ||
| if return_stored: | ||
| result = out |
There was a problem hiding this comment.
Sorry, I missed this bit before. Is this still used? From below it looks like the result is always re-read from the store, but I may be missing something?
There was a problem hiding this comment.
Yes, this is still used.
In an effort to chain the store_chunk calls to the load_chunk calls, we return out unchanged so that it can be fed as an argument into load_chunk via a key. Does that make sense? It could be there is a better way to do this. Just seemed simple enough to implement at the time.
We could also move this branch to the end and simply return either out or None if that would make it more clear.
There was a problem hiding this comment.
Ah, missed that it was out and not x. Fine by me.
|
Thanks @jakirkham! |
|
Thank you @jakirkham for both your work and your patience here.
…On Tue, Jan 2, 2018 at 8:12 PM, jakirkham ***@***.***> wrote:
Thanks @mrocklin <https://github.com/mrocklin> and @jcrist
<https://github.com/jcrist> for the reviews.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#2980 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AASszHrbH9P0Rtqth9PRmHWhBsoBLx4rks5tGv4ngaJpZM4Q70-h>
.
|
| result = Delayed(name, dsk) | ||
|
|
||
| if compute: | ||
| result.compute() |
There was a problem hiding this comment.
Missed that this call should still get **kwargs. Fixing with PR ( #3300 ).
Related to issue ( #2156 )
Adds an argument Dask Array's
store, calledkeep. When set,keepwill return the stored result(s). The behavior ofkeepdiffers a little depending on whethercomputeis set. IfcomputeisFalse, Dask Arrays corresponding to the stored values are returned, which can be chained into other computations, persisted, etc. IfcomputeisTrue, then it is like callingcomputeon Dask Arrays that would have been returned ifkeepwasFalse.