Skip to content

Conversation

@mrocklin
Copy link
Member

@mrocklin mrocklin commented Oct 12, 2018

Fixes #4038

This implements a HighLevelGraph that stores the task graphs of our collections in layers, generally one layer per high level operation. Today this doesn't add any new features except for easier inspectability and visualization, but it opens the door for new and exciting features in the future, notably high-level expression optimization.

I recommend reading through the high-level-graphs.rst doc page first, and then take a look at dask dataframe to see a simple example of use.

@mrocklin
Copy link
Member Author

mrocklin commented Oct 12, 2018

I think that generally my plan here is as follows:

  • pick up easy wins in dask array
  • refactor top
  • move onto dask dataframe
  • refactor top again so that it can also be used in dask dataframe
  • move onto bag

Then future work for others is probably:

  • add a slicing operation, and figure out how to transpose it with atop operations
  • add parquet and column access operations and transpose them


import toolz

class HighGraph(sharedict.ShareDict):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two thoughts:

  1. I like the name HighLevelGraph better than HighGraph.
  2. Can you make this extend ShareDict with composition instead of inheritance? I find that easier to understand and less error prone.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No objection to HighLevelGraph

I would like to just replace ShareDict entirely.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to just replace ShareDict entirely.

OK, works for me. I just wouldn't bother with inheritance, then.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed, its only there short term as I trade things out

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I second HighLevelGraph

@mrocklin
Copy link
Member Author

import dask.array as da
a = da.ones(100, chunks=(20,))
b = a + 1
c = a + 2
d = (b + c).sum()
d.visualize('low-level.png')
d.dask.visualize('high-level.png')

Low level

low-level

High level

high-level

@mrocklin
Copy link
Member Author

mrocklin commented Oct 15, 2018

OK, so I've been through most of the code and changed things around. So far all this does is introduce HighGraphs in graph generation code. It doesn't add optimization handling, use top in dataframes, or do anything with the high level graphs (I'd like to defer these to future PRs). I did have to screw around a bit in the atop code and with delayed to_task_dasks.

I plan to change the name later (but before merging) in a global find-replace.

I think that now this could use some review. I recommend the following:

  1. Take a look at the HighGraph implementation
  2. Take a look at the changes to dataframe/core.py . This is representative of the workflow that most devs would have to adopt day-to-day when working on Dask
  3. Take a look at array/linalg.py::tsqr . This is representative of worst-case.

There is still work to do here with some corners, but I think that this is now at a stage where it's ready for review. (cc @jcrist)

@mrocklin mrocklin changed the title WIP - High Level Graphs High Level Graphs Oct 15, 2018
Copy link
Member

@jcrist jcrist left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On initial review this (and this comment) seems sane to me. Documentation on HighGraph, as well as the intent of __dask_layers__ would be welcome. From what I understand these are the keys in the HighGraph that the collection output depends on directly (usually just the key)? It would be good to formally state this, as well as the desired output type (in the code I saw lists, sets, and tuples all being returned by different collections).

if hasattr(dask, 'dask'):
dask = dask.dask
assert isinstance(dask, dict)
assert isinstance(dask, collections.Mapping)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should be dask.compatibility.Mapping, abc classes on the collections module directly are deprecated.

return out


def finalize(collection):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: This conflicts in my mind with the old version of dask finalize functions (results -> in memory version). It's also not a very descriptive name. Perhaps collection_to_delayed? Or just inline in unpack_collections below (my preference).

return self.layers

@classmethod
def from_collections(cls, name, layer, dependencies=()):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The overloading of "dependencies" here was confusing to me (especially without a docstring). Took me a second to realize that while dependencies above are collections, the collections aren't being stored directly in the .dependencies attribute.

dask/delayed.py Outdated
Returns
-------
task : normalized task to be run
collections : a set of collections
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While this may be true for our implementations, I don't think we should enforce dask collections be hashable. Elsewhere you use toolz.unique by id, which seems safer to me.

# divisions is ignored, only present to be compatible with other
# objects.
if not isinstance(dsk, HighGraph):
dsk = HighGraph.from_collections(name, dsk, dependencies=[])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is specifying dependencies=[] necessary? I'd expect the default to be fine.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The defaul is fine, but I like to call out that we're not including dependencies here, which is undesired.


import toolz

class HighGraph(sharedict.ShareDict):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I second HighLevelGraph

dependencies[id(g)] = set()
else:
raise TypeError(g)
return HighGraph(layers, dependencies)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should probably use cls here and above, makes subclassing easier in the future if that's ever needed.

dask/dot.py Outdated
return handle_graphviz(g, filename, format)


def handle_graphviz(g, filename, format):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could use a better name. Perhaps graph_to_file?


# replace keys in kwargs with _0 tokens
new_keys = list(core.get_dependencies(dsk_kwargs, task=kwargs))
# new_keys = list(core.get_dependencies(dsk_kwargs, task=kwargs))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Should this line be dropped?

pdb.set_trace()
if any(isinstance(layer, HighLevelGraph) for layer in dsk.layers.values()):
import pdb
pdb.set_trace()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these pdb parts still suppose to be here?

'add': {('add', 0): (operator.add, 'myfile.0.csv', 100),
('add', 1): (operator.add, 'myfile.1.csv', 100),
('add', 2): (operator.add, 'myfile.2.csv', 100),
('add', 3): (operator.add, 'myfile.3.csv', 100)}
Copy link
Member

@jakirkham jakirkham Oct 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry trying to follow this here, should 'myfile.*.csv' be ('read-csv', *) or am I missing something?

Edit: FWIW this shows up in the docs too. Not sure that it needs to change (since I'm still learning). Just footnoting it in case it does.

f = dask.pop(task)
assert f == (tuple, ['a', 'b', 'c'])
assert dask == x._dask
with warnings.catch_warnings(record=True):
Copy link
Member

@jakirkham jakirkham Oct 30, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's this needed for?

Edit: NVM because to_task_dask is being deprecated in this PR.

@jakirkham
Copy link
Member

Thanks @mrocklin. Looks pretty good.

Mostly found a few nits upon first reading the code. Probably need to let it sink in a bit more before any deeper discussion. That said, the idea and logic of it all seemed good.

Currently we are just building towards high level optimizations, correct? Just wanted to make sure there weren't any optimizations included in this PR that I missed and probably deserve a closer look.

Seems like some thought was given to handling existing custom graph code (as long as it was being turned into a Dask included collection), which is great. That should make it easier for older codes to adopt I would think.

Trying to think if there is anything we can do to make it easier for people using ShareDict currently to migrate (especially if they will be straddling different versions of Dask). It's just deprecated at this stage. So this isn't too bad really. Just wondering really.

@mrocklin
Copy link
Member Author

Mostly found a few nits upon first reading the code

Thanks for the review @jakirkham . Those were helpful to identify.

Currently we are just building towards high level optimizations, correct? Just wanted to make sure there weren't any optimizations included in this PR that I missed and probably deserve a closer look.

You're correct here. We don't actually add any new functionality in this PR. It should make features like high level optimizations much easier to achieve though.

Trying to think if there is anything we can do to make it easier for people using ShareDict currently to migrate (especially if they will be straddling different versions of Dask). It's just deprecated at this stage. So this isn't too bad really. Just wondering really.

I'm inclined not to care too much about this. I suspect that this only affects a few fairly sophisticated users.

Copy link
Member

@jakirkham jakirkham left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the additional nits. Tried to group them into a PR review to reduce noise while still isolating them. May have missed a few myself in the last review.

This makes me wonder if there is a way to get Sphinx to reference one example in multiple places to make maintenance efforts more focused. Guess there are tradeoffs in terms of potential added indirection. Maybe something to think about though.

('add', 0): (operator.add, 'myfile.0.csv', 100),
('add', 1): (operator.add, 'myfile.1.csv', 100),
('add', 2): (operator.add, 'myfile.2.csv', 100),
('add', 3): (operator.add, 'myfile.3.csv', 100),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As in the HighLevelGraph docstring, guessing 'myfile.*.csv' should be ('read-csv', *).

('add', 0): (operator.add, 'myfile.0.csv', 100),
('add', 1): (operator.add, 'myfile.1.csv', 100),
('add', 2): (operator.add, 'myfile.2.csv', 100),
('add', 3): (operator.add, 'myfile.3.csv', 100),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As in the HighLevelGraph docstring, guessing 'myfile.*.csv' should be ('read-csv', *).

'add': {('add', 0): (operator.add, 'myfile.0.csv', 100),
('add', 1): (operator.add, 'myfile.1.csv', 100),
('add', 2): (operator.add, 'myfile.2.csv', 100),
('add', 3): (operator.add, 'myfile.3.csv', 100)}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As in the HighLevelGraph docstring, guessing 'myfile.*.csv' should be ('read-csv', *).

'add': {('add', 0): (operator.add, 'myfile.0.csv', 100),
('add', 1): (operator.add, 'myfile.1.csv', 100),
('add', 2): (operator.add, 'myfile.2.csv', 100),
('add', 3): (operator.add, 'myfile.3.csv', 100)}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As in the HighLevelGraph docstring, guessing 'myfile.*.csv' should be ('read-csv', *).

@jakirkham
Copy link
Member

Thanks for the follow-up, @mrocklin.

Caught a few more nits. Should be easy fixes though. Otherwise am pretty happy with this as is.

Also thanks for the clarification regarding intent of the PR. Am happy with how this simplifies the graph construction code. Was pleasing to read these much shorter and clear lines in a few places to just gain an appreciation of that effect alone.

Had some other ideas about optimizations we might try that we can discuss back in the issue.

Not to worried about the ShareDict deprecation either. Mainly mentioned as a smoother path might be helpful for the advanced users that are employing ShareDict generously in their codebases. Though there is really no substitute for migrating at some point.

@mrocklin mrocklin mentioned this pull request Nov 20, 2018
2 tasks
@jakirkham
Copy link
Member

Where do you think this fits relative to 1.0.0?

@mrocklin
Copy link
Member Author

mrocklin commented Nov 27, 2018 via email

@mrocklin
Copy link
Member Author

mrocklin commented Dec 7, 2018

If there are no objections then I plan to merge this on Monday.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants