-
-
Notifications
You must be signed in to change notification settings - Fork 1.8k
graph manipulation: clone, bind, checkpoint, wait_on #7109
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
| return self._rebuild, () | ||
|
|
||
| def _rebuild(self, dsk, name=None): | ||
| return Array(dsk, name or self.name, self.chunks, self.dtype, self._meta) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This has the side effect that, if anybody were to pickle the output of __dask_postpersist__, he would needlessly pickle the old dask graph too. However I cannot think of any real-life use case where this could happen?
|
As a meta point, I would prefer if we tried to avoid names with violent implications. |
...again, happy to accept suggestions...
What I wanted to convey is that we're altering an already existing graph but, unlike optimization, the output is not functionally equivalent to the input. |
|
I like |
|
Also +1 for |
commit 2ceaf46 Author: crusaderky <crusaderky@gmail.com> Date: Thu Jan 28 18:53:42 2021 +0000 polish commit 2643032 Author: crusaderky <crusaderky@gmail.com> Date: Thu Jan 28 17:13:57 2021 +0000 black commit 1b29cd8 Author: crusaderky <crusaderky@gmail.com> Date: Thu Jan 28 17:13:45 2021 +0000 bugfix commit 0a50ceb Merge: 8286a36 9bb586a Author: crusaderky <crusaderky@gmail.com> Date: Thu Jan 28 16:48:34 2021 +0000 Merge remote-tracking branch 'upstream/master' into graphsurgery commit 8286a36 Author: crusaderky <crusaderky@gmail.com> Date: Thu Jan 28 16:48:12 2021 +0000 bugfix commit 802ece7 Author: crusaderky <crusaderky@gmail.com> Date: Thu Jan 28 16:22:47 2021 +0000 bugfixes commit 8d31b85 Author: crusaderky <crusaderky@gmail.com> Date: Thu Jan 28 15:38:33 2021 +0000 Drop unused layers commit 966d5a7 Author: crusaderky <crusaderky@gmail.com> Date: Thu Jan 28 15:20:42 2021 +0000 bugfixes commit afdc65d Merge: 1b30c21 5694253 Author: crusaderky <crusaderky@gmail.com> Date: Tue Jan 26 19:00:35 2021 +0000 Merge remote-tracking branch 'upstream/master' into graphsurgery commit 1b30c21 Author: crusaderky <crusaderky@gmail.com> Date: Tue Jan 26 14:07:59 2021 +0000 polish commit 4d03052 Author: crusaderky <crusaderky@gmail.com> Date: Tue Jan 26 13:46:02 2021 +0000 bind implementation commit 741b0e4 Author: crusaderky <crusaderky@gmail.com> Date: Tue Jan 26 11:30:12 2021 +0000 fix commit 5005c60 Author: crusaderky <crusaderky@gmail.com> Date: Tue Jan 26 11:26:46 2021 +0000 refactor commit 050aac8 Merge: c68a459 fbbc271 Author: crusaderky <crusaderky@gmail.com> Date: Tue Jan 26 11:16:05 2021 +0000 Merge remote-tracking branch 'upstream/master' into graphsurgery commit c68a459 Author: crusaderky <crusaderky@gmail.com> Date: Mon Jan 25 21:57:43 2021 +0000 bind implementation commit 6589c18 Merge: 7ac6817 4e2d838 Author: crusaderky <crusaderky@gmail.com> Date: Mon Jan 25 21:24:05 2021 +0000 Merge remote-tracking branch 'upstream/master' into graphsurgery commit 7ac6817 Author: crusaderky <crusaderky@gmail.com> Date: Mon Jan 25 21:23:53 2021 +0000 Rename choke -> checkpoint commit deec2b3 Author: crusaderky <crusaderky@gmail.com> Date: Mon Jan 25 21:19:49 2021 +0000 Rename graphsurgery -> graph_manipulation commit 60e928d Author: crusaderky <crusaderky@gmail.com> Date: Mon Jan 25 21:18:27 2021 +0000 more bind implementation commit fe17526 Author: crusaderky <crusaderky@gmail.com> Date: Mon Jan 25 13:55:54 2021 +0000 implement bind (partial) d commit 515f64f Author: crusaderky <crusaderky@gmail.com> Date: Mon Jan 25 11:33:17 2021 +0000 polish commit 4046708 Merge: 16fbb10 5c96d82 Author: crusaderky <crusaderky@gmail.com> Date: Mon Jan 25 10:43:51 2021 +0000 Merge remote-tracking branch 'dask/master' into graphsurgery commit 16fbb10 Author: crusaderky <crusaderky@gmail.com> Date: Mon Jan 25 10:35:21 2021 +0000 Implement name= commit d0f0348 Author: crusaderky <crusaderky@gmail.com> Date: Fri Jan 22 18:15:34 2021 +0000 postpersist name= argument commit c1d695f Author: crusaderky <crusaderky@gmail.com> Date: Fri Jan 22 18:15:17 2021 +0000 bind implementation (incomplete) commit 1ae20fe Author: crusaderky <crusaderky@gmail.com> Date: Fri Jan 22 14:44:20 2021 +0000 polish commit a30f0ed Author: crusaderky <crusaderky@gmail.com> Date: Fri Jan 22 14:34:25 2021 +0000 polish commit b979ea2 Merge: 793df1c 4395973 Author: crusaderky <crusaderky@gmail.com> Date: Fri Jan 22 14:05:49 2021 +0000 Merge remote-tracking branch 'dask/master' into graphsurgery commit 793df1c Author: crusaderky <crusaderky@gmail.com> Date: Fri Jan 22 14:05:42 2021 +0000 impl commit 1f66b9a Merge: ba43137 cb9fc26 Author: crusaderky <crusaderky@gmail.com> Date: Thu Jan 21 17:55:44 2021 +0000 Merge remote-tracking branch 'dask/master' into graphsurgery commit ba43137 Author: crusaderky <crusaderky@gmail.com> Date: Thu Jan 21 17:55:30 2021 +0000 graphsurgery (incomplete) commit 0ad5fe3 Author: crusaderky <crusaderky@gmail.com> Date: Thu Jan 21 14:04:59 2021 +0000 O(1) access to keys in HighLevelGraph commit 405205a Merge: b4a7197 30c9a3a Author: crusaderky <crusaderky@gmail.com> Date: Thu Jan 21 10:59:55 2021 +0000 Merge remote-tracking branch 'dask/master' into graphsurgery commit b4a7197 Author: crusaderky <crusaderky@gmail.com> Date: Wed Jan 20 21:00:48 2021 +0000 drop prototype bugfix
2ceaf46 to
19ddfcf
Compare
| Notes | ||
| ----- | ||
| This method should be overridden by subclasses to avoid materializing the layer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overriding this method is left to a subsequent iterative PR.
|
@jrbourbeau you may start code review (but see TODOs above) |
jrbourbeau
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for all your work on this @crusaderky! I was able to look though the __dask_postpersist__ updates, base.py utilities, and drop + checkpoint methods. Overall things look good and I've left a few comments. I'll get through clone + bind tomorrow and may have additional comments then.
In terms of where docs on these methods should live, I think a new section in https://docs.dask.org/en/latest/api.html seems like good place
dask/base.py
Outdated
| except (AttributeError, TypeError): | ||
| raise TypeError(f"Expected Dask collection; got {type(collection)}") | ||
| except StopIteration: | ||
| # e.g. Array of size 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Raising an error when there are no keys seems reasonable. I just don't totally understand this comment. IIUC even arrays of size 0 will still have keys:
In [1]: import dask.array as da
In [2]: x = da.ones(10, chunks=2)
In [3]: y = x[:0]
In [4]: y.size
Out[4]: 0
In [5]: y.__dask_keys__()
Out[5]: [('getitem-6200faa2e39034c306f3eb60bd76b805', 0)]There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't realize. I could not manage to generate a dask.bag, dataframe, or array without chunks. Still, I would treat it as an implementation detail, as it would make sense for them to have no chunks; it's just probably not worth it to optimize away the single chunk they have now. The collection protocol also doesn't (and I don't think it should) mention that there must be at least one key.
I'll update the comment
dask/base.py
Outdated
| prefix = key_split(key) | ||
| if isinstance(key, str): | ||
| old_token = key[len(prefix) + 1 :] | ||
| new_token = tokenize(old_token or key, seed) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice use of tokenize 👍
| return rebuild( | ||
| HighLevelGraph(new_layers, new_deps), | ||
| *args, | ||
| name=clone_key(prev_coll_name, seed), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Okay, this is why you proposed __dask_postpersist__ should always have an options name= keyword over in #7093. I was going to ask about that, but now I see why it's needed
dask/graph_manipulation.py
Outdated
| layer = {name: (chunks.drop, collection.__dask_keys__())} | ||
| dsk = HighLevelGraph.from_collections(name, layer, dependencies=(collection,)) | ||
| else: | ||
| # Two-step map->reduce to minimize RAM usage and data transfer over the network |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just for my own understanding, we're doing a map-reduce instead of just a reduce here so that we transfer a bunch of Nones to the reduce step instead of, say, a bunch of array chunks, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
correct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
clarified in the comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome, good thinking here 👍
|
Also cc @willirath who might find these graph manipulations methods useful |
Conflicts: dask/base.py dask/tests/test_base.py
|
Thanks for the updates @crusaderky! I look forward to reviewing them later today |
jrbourbeau
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for all the updates @crusaderky -- in particular the thorough set of tests. Most of the comments below are suggestions on the new documentation page
docs/source/graph_manipulation.rst
Outdated
| >>> import dask.array as da | ||
| >>> x = da.random.normal(size=5e9, chunks=1e7) | ||
| >>> x_avg = x.avg() | ||
| >>> y = (x - x_avg).topk(50000)[-1] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a very effective example to walk through. A few suggestions:
.avg()->.mean().topk(50000)[-1]->.max()as this may be more familiar to users while still resulting the the entirexarray being kept in memory- Decrease the size of
x. We should try to keep this example something that most users can run on their laptop, as they will often copy/paste examples and run them locally. We can still illustrate the underlying issue (the entirexarray is loaded into memory) will a smaller example array - Added a
.compute()call for completeness
| >>> import dask.array as da | |
| >>> x = da.random.normal(size=5e9, chunks=1e7) | |
| >>> x_avg = x.avg() | |
| >>> y = (x - x_avg).topk(50000)[-1] | |
| >>> import dask.array as da | |
| >>> x = da.random.normal(size=1_000, chunks=100) | |
| >>> x_mean = x.mean() | |
| >>> y = (x - x_mean).max().compute() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll size it so that the unoptimized example runs in 4 GiB, so that a user can still observe the difference by glancing at his system monitor.
I had chosen topk(50000)[-1] as it produces a statistically stable output, whereas max() is completely arbitrary. However I see your point.
dask/graph_manipulation.py
Outdated
| return out[0] if len(collections) == 1 else out | ||
|
|
||
|
|
||
| def block_until_done(*collections): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about this again, what do you about wait_on for this method instead? It's shorter and doesn't use the word "block" which might lead to confusion about when the blocking occurs (Apologies for suggesting a name change so late in the process)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fine for me; I thought you wanted to avoid the word "wait" as it may lead to confusion with asyncio and blocking calls
Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
jrbourbeau
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for all your work on this @crusaderky! It'll be good to get this out to users
| def import_or_none(path): | ||
| with ignoring(BaseException): | ||
| return pytest.importorskip(path) | ||
| return None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am working on getting the min-deps tests passing again in #7247 and running into some issues with the changes in this PR around import_or_none. Can you explain what the reasoning was for the change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jsignell I moved import_or_none to test_utils because it is now used by two separate modules. Not sure I understood your question though?
DONE:
OUT OF SCOPE, left to later PRs:
Layer.clonein subclasses for performancexarray.Datasetobjects with more than one dask variable (xarray.Dataset can't respect the new collection protocol #7203)