Skip to content

Conversation

@crusaderky
Copy link
Collaborator

@crusaderky crusaderky commented Jan 25, 2021

DONE:

  • new functions implementation
  • unit tests
  • documentation

OUT OF SCOPE, left to later PRs:

return self._rebuild, ()

def _rebuild(self, dsk, name=None):
return Array(dsk, name or self.name, self.chunks, self.dtype, self._meta)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This has the side effect that, if anybody were to pickle the output of __dask_postpersist__, he would needlessly pickle the old dask graph too. However I cannot think of any real-life use case where this could happen?

@jsignell
Copy link
Member

As a meta point, I would prefer if we tried to avoid names with violent implications.

@crusaderky
Copy link
Collaborator Author

crusaderky commented Jan 25, 2021

As a meta point, I would prefer if we tried to avoid names with violent implications.

...again, happy to accept suggestions...

  • graph_manipulation
  • graph_edit
  • graph_rebuild
  • regraph
  • other?

What I wanted to convey is that we're altering an already existing graph but, unlike optimization, the output is not functionally equivalent to the input.

@jsignell
Copy link
Member

I like graph_manipulation. For choke I might suggest restrict or throttle.

@jrbourbeau
Copy link
Member

Also +1 for graph_manipulation.py. Another alternative for choke could be checkpoint -- though perhaps @crusaderky has a sense for which name aligns best with the functionality of that particular graph manipulation

@crusaderky crusaderky changed the title WIP graph surgery: clone, bind, drop, choke WIP graph manipulation: clone, bind, drop, checkpoint Jan 25, 2021
commit 2ceaf46
Author: crusaderky <crusaderky@gmail.com>
Date:   Thu Jan 28 18:53:42 2021 +0000

    polish

commit 2643032
Author: crusaderky <crusaderky@gmail.com>
Date:   Thu Jan 28 17:13:57 2021 +0000

    black

commit 1b29cd8
Author: crusaderky <crusaderky@gmail.com>
Date:   Thu Jan 28 17:13:45 2021 +0000

    bugfix

commit 0a50ceb
Merge: 8286a36 9bb586a
Author: crusaderky <crusaderky@gmail.com>
Date:   Thu Jan 28 16:48:34 2021 +0000

    Merge remote-tracking branch 'upstream/master' into graphsurgery

commit 8286a36
Author: crusaderky <crusaderky@gmail.com>
Date:   Thu Jan 28 16:48:12 2021 +0000

    bugfix

commit 802ece7
Author: crusaderky <crusaderky@gmail.com>
Date:   Thu Jan 28 16:22:47 2021 +0000

    bugfixes

commit 8d31b85
Author: crusaderky <crusaderky@gmail.com>
Date:   Thu Jan 28 15:38:33 2021 +0000

    Drop unused layers

commit 966d5a7
Author: crusaderky <crusaderky@gmail.com>
Date:   Thu Jan 28 15:20:42 2021 +0000

    bugfixes

commit afdc65d
Merge: 1b30c21 5694253
Author: crusaderky <crusaderky@gmail.com>
Date:   Tue Jan 26 19:00:35 2021 +0000

    Merge remote-tracking branch 'upstream/master' into graphsurgery

commit 1b30c21
Author: crusaderky <crusaderky@gmail.com>
Date:   Tue Jan 26 14:07:59 2021 +0000

    polish

commit 4d03052
Author: crusaderky <crusaderky@gmail.com>
Date:   Tue Jan 26 13:46:02 2021 +0000

    bind implementation

commit 741b0e4
Author: crusaderky <crusaderky@gmail.com>
Date:   Tue Jan 26 11:30:12 2021 +0000

    fix

commit 5005c60
Author: crusaderky <crusaderky@gmail.com>
Date:   Tue Jan 26 11:26:46 2021 +0000

    refactor

commit 050aac8
Merge: c68a459 fbbc271
Author: crusaderky <crusaderky@gmail.com>
Date:   Tue Jan 26 11:16:05 2021 +0000

    Merge remote-tracking branch 'upstream/master' into graphsurgery

commit c68a459
Author: crusaderky <crusaderky@gmail.com>
Date:   Mon Jan 25 21:57:43 2021 +0000

    bind implementation

commit 6589c18
Merge: 7ac6817 4e2d838
Author: crusaderky <crusaderky@gmail.com>
Date:   Mon Jan 25 21:24:05 2021 +0000

    Merge remote-tracking branch 'upstream/master' into graphsurgery

commit 7ac6817
Author: crusaderky <crusaderky@gmail.com>
Date:   Mon Jan 25 21:23:53 2021 +0000

    Rename choke -> checkpoint

commit deec2b3
Author: crusaderky <crusaderky@gmail.com>
Date:   Mon Jan 25 21:19:49 2021 +0000

    Rename graphsurgery -> graph_manipulation

commit 60e928d
Author: crusaderky <crusaderky@gmail.com>
Date:   Mon Jan 25 21:18:27 2021 +0000

    more bind implementation

commit fe17526
Author: crusaderky <crusaderky@gmail.com>
Date:   Mon Jan 25 13:55:54 2021 +0000

    implement bind (partial)

    d

commit 515f64f
Author: crusaderky <crusaderky@gmail.com>
Date:   Mon Jan 25 11:33:17 2021 +0000

    polish

commit 4046708
Merge: 16fbb10 5c96d82
Author: crusaderky <crusaderky@gmail.com>
Date:   Mon Jan 25 10:43:51 2021 +0000

    Merge remote-tracking branch 'dask/master' into graphsurgery

commit 16fbb10
Author: crusaderky <crusaderky@gmail.com>
Date:   Mon Jan 25 10:35:21 2021 +0000

    Implement name=

commit d0f0348
Author: crusaderky <crusaderky@gmail.com>
Date:   Fri Jan 22 18:15:34 2021 +0000

    postpersist name= argument

commit c1d695f
Author: crusaderky <crusaderky@gmail.com>
Date:   Fri Jan 22 18:15:17 2021 +0000

    bind implementation (incomplete)

commit 1ae20fe
Author: crusaderky <crusaderky@gmail.com>
Date:   Fri Jan 22 14:44:20 2021 +0000

    polish

commit a30f0ed
Author: crusaderky <crusaderky@gmail.com>
Date:   Fri Jan 22 14:34:25 2021 +0000

    polish

commit b979ea2
Merge: 793df1c 4395973
Author: crusaderky <crusaderky@gmail.com>
Date:   Fri Jan 22 14:05:49 2021 +0000

    Merge remote-tracking branch 'dask/master' into graphsurgery

commit 793df1c
Author: crusaderky <crusaderky@gmail.com>
Date:   Fri Jan 22 14:05:42 2021 +0000

    impl

commit 1f66b9a
Merge: ba43137 cb9fc26
Author: crusaderky <crusaderky@gmail.com>
Date:   Thu Jan 21 17:55:44 2021 +0000

    Merge remote-tracking branch 'dask/master' into graphsurgery

commit ba43137
Author: crusaderky <crusaderky@gmail.com>
Date:   Thu Jan 21 17:55:30 2021 +0000

    graphsurgery (incomplete)

commit 0ad5fe3
Author: crusaderky <crusaderky@gmail.com>
Date:   Thu Jan 21 14:04:59 2021 +0000

    O(1) access to keys in HighLevelGraph

commit 405205a
Merge: b4a7197 30c9a3a
Author: crusaderky <crusaderky@gmail.com>
Date:   Thu Jan 21 10:59:55 2021 +0000

    Merge remote-tracking branch 'dask/master' into graphsurgery

commit b4a7197
Author: crusaderky <crusaderky@gmail.com>
Date:   Wed Jan 20 21:00:48 2021 +0000

    drop prototype

    bugfix
Notes
-----
This method should be overridden by subclasses to avoid materializing the layer.
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overriding this method is left to a subsequent iterative PR.

@crusaderky
Copy link
Collaborator Author

@jrbourbeau you may start code review (but see TODOs above)

Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all your work on this @crusaderky! I was able to look though the __dask_postpersist__ updates, base.py utilities, and drop + checkpoint methods. Overall things look good and I've left a few comments. I'll get through clone + bind tomorrow and may have additional comments then.

In terms of where docs on these methods should live, I think a new section in https://docs.dask.org/en/latest/api.html seems like good place

dask/base.py Outdated
except (AttributeError, TypeError):
raise TypeError(f"Expected Dask collection; got {type(collection)}")
except StopIteration:
# e.g. Array of size 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Raising an error when there are no keys seems reasonable. I just don't totally understand this comment. IIUC even arrays of size 0 will still have keys:

In [1]: import dask.array as da

In [2]: x = da.ones(10, chunks=2)

In [3]: y = x[:0]

In [4]: y.size
Out[4]: 0

In [5]: y.__dask_keys__()
Out[5]: [('getitem-6200faa2e39034c306f3eb60bd76b805', 0)]

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't realize. I could not manage to generate a dask.bag, dataframe, or array without chunks. Still, I would treat it as an implementation detail, as it would make sense for them to have no chunks; it's just probably not worth it to optimize away the single chunk they have now. The collection protocol also doesn't (and I don't think it should) mention that there must be at least one key.

I'll update the comment

dask/base.py Outdated
prefix = key_split(key)
if isinstance(key, str):
old_token = key[len(prefix) + 1 :]
new_token = tokenize(old_token or key, seed)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice use of tokenize 👍

return rebuild(
HighLevelGraph(new_layers, new_deps),
*args,
name=clone_key(prev_coll_name, seed),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, this is why you proposed __dask_postpersist__ should always have an options name= keyword over in #7093. I was going to ask about that, but now I see why it's needed

layer = {name: (chunks.drop, collection.__dask_keys__())}
dsk = HighLevelGraph.from_collections(name, layer, dependencies=(collection,))
else:
# Two-step map->reduce to minimize RAM usage and data transfer over the network
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just for my own understanding, we're doing a map-reduce instead of just a reduce here so that we transfer a bunch of Nones to the reduce step instead of, say, a bunch of array chunks, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

clarified in the comment

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome, good thinking here 👍

@jrbourbeau
Copy link
Member

Also cc @willirath who might find these graph manipulations methods useful

@crusaderky crusaderky closed this Feb 8, 2021
@crusaderky crusaderky reopened this Feb 8, 2021
@crusaderky crusaderky marked this pull request as ready for review February 8, 2021 15:39
@crusaderky crusaderky changed the title WIP graph manipulation: clone, bind, drop, checkpoint graph manipulation: clone, bind, checkpoint, block_until_done Feb 8, 2021
@crusaderky crusaderky closed this Feb 8, 2021
@crusaderky crusaderky reopened this Feb 8, 2021
@jrbourbeau
Copy link
Member

Thanks for the updates @crusaderky! I look forward to reviewing them later today

@crusaderky crusaderky closed this Feb 8, 2021
@crusaderky crusaderky reopened this Feb 8, 2021
Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all the updates @crusaderky -- in particular the thorough set of tests. Most of the comments below are suggestions on the new documentation page

Comment on lines 15 to 18
>>> import dask.array as da
>>> x = da.random.normal(size=5e9, chunks=1e7)
>>> x_avg = x.avg()
>>> y = (x - x_avg).topk(50000)[-1]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a very effective example to walk through. A few suggestions:

  • .avg() -> .mean()
  • .topk(50000)[-1] -> .max() as this may be more familiar to users while still resulting the the entire x array being kept in memory
  • Decrease the size of x. We should try to keep this example something that most users can run on their laptop, as they will often copy/paste examples and run them locally. We can still illustrate the underlying issue (the entire x array is loaded into memory) will a smaller example array
  • Added a .compute() call for completeness
Suggested change
>>> import dask.array as da
>>> x = da.random.normal(size=5e9, chunks=1e7)
>>> x_avg = x.avg()
>>> y = (x - x_avg).topk(50000)[-1]
>>> import dask.array as da
>>> x = da.random.normal(size=1_000, chunks=100)
>>> x_mean = x.mean()
>>> y = (x - x_mean).max().compute()

Copy link
Collaborator Author

@crusaderky crusaderky Feb 11, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll size it so that the unoptimized example runs in 4 GiB, so that a user can still observe the difference by glancing at his system monitor.
I had chosen topk(50000)[-1] as it produces a statistically stable output, whereas max() is completely arbitrary. However I see your point.

return out[0] if len(collections) == 1 else out


def block_until_done(*collections):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thinking about this again, what do you about wait_on for this method instead? It's shorter and doesn't use the word "block" which might lead to confusion about when the blocking occurs (Apologies for suggesting a name change so late in the process)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fine for me; I thought you wanted to avoid the word "wait" as it may lead to confusion with asyncio and blocking calls

crusaderky and others added 9 commits February 11, 2021 11:58
Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
@jrbourbeau jrbourbeau changed the title graph manipulation: clone, bind, checkpoint, block_until_done graph manipulation: clone, bind, checkpoint, wait_on Feb 12, 2021
Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for all your work on this @crusaderky! It'll be good to get this out to users

@jrbourbeau jrbourbeau merged commit 02b466d into dask:master Feb 12, 2021
@crusaderky crusaderky deleted the graphsurgery branch February 12, 2021 09:38
def import_or_none(path):
with ignoring(BaseException):
return pytest.importorskip(path)
return None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am working on getting the min-deps tests passing again in #7247 and running into some issues with the changes in this PR around import_or_none. Can you explain what the reasoning was for the change?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, sorry I missed your PR @jsignell. FWIW I think there might be some duplicate work going on in #7243 and #7247

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jsignell I moved import_or_none to test_utils because it is now used by two separate modules. Not sure I understood your question though?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants