Skip to content

Culling high level graphs#6510

Merged
mrocklin merged 88 commits intodask:masterfrom
madsbk:cull_high_level_graph
Sep 23, 2020
Merged

Culling high level graphs#6510
mrocklin merged 88 commits intodask:masterfrom
madsbk:cull_high_level_graph

Conversation

@madsbk
Copy link
Contributor

@madsbk madsbk commented Aug 13, 2020

As a step towards defining a Layer(Mapping) class (#6438), I have been exploring how we can cull high level graphs directly. I think it is useful to have a concrete implementation before deciding on a Layer interface.

This PR implements class Layer(Mapping), which is an abstract class that establish a protocol for high level graph layers.

The class defines three methods that sub-classes can overwrite in order to use domain knowledge to reduce overhead:

class Layer(collections.abc.Mapping):
    """High level graph layer

    This abstract class establish a protocol for high level graph layers.
    """

    def cull(self, keys: Set) -> "Layer":
        """Return a new Layer with only the tasks required to calculate `keys`.

        In other words, remove unnecessary tasks from the layer.

        Examples
        --------
        >>> d = Layer({'x': 1, 'y': (inc, 'x'), 'out': (add, 'x', 10)})  # doctest: +SKIP
        >>> d.cull({'out'})  # doctest: +SKIP
        {'x': 1, 'out': (add, 'x', 10)}

        Returns
        -------
        layer: Layer
            Culled layer
        """

    def get_external_dependencies(self, all_hlg_keys) -> Set:
        """Get external dependencies

        Parameters
        ----------
        all_hlg_keys : container
            All keys in the high level graph.

        Returns
        -------
        deps: set
            Set of dependencies
        """

    def get_dependencies(self, all_hlg_keys) -> Mapping[Hashable, Set]:
        """Get dependencies of all keys in the layer

        Parameters
        ----------
        all_hlg_keys : container
            All keys in the high level graph.

        Returns
        -------
        map: Mapping
            A map that maps each key in the layer to its dependencies
        """

Blockwise

This PR also implements a sub-class Blockwise(Layer) that uses the blockwise structure to compute key dependencies and culling efficiently. .cull() and .get_external_dependencies() returns an instance of BasicLayer(Layer) that embeds the information needed to do culling efficiently.

ParquetSubgraph

This PR also implements a sub-class ParquetSubgraph(Layer) that implements get_external_dependencies() by always returning the empty set and cull() by filtering parts.

cc. @mrocklin, @rjzamora, @quasiben

@mrocklin
Copy link
Member

Thanks for pushing something up @madsbk . Having something concrete makes it a lot easier to think about this problem.

For the process of clearing out high level layers that aren't being used, this is clearly a win.

For lower level culling I still don't know exactly how to handle culling of the graphs within layers. Eventually, we're going to want to do this on the scheduler side. That means that we're going to want to cull the concrete low level graphs that we have here on the client side (the layers backed by dicts) but not explicitly expand the layers that have high level representations, like Blockwise, or a future Shuffle layer. We're going to want to keep those in abstract form so that we can pass them to the Scheduler.

With that in mind, I wonder if we maybe want the following two functions on every layer:

  1. Given the keys that we are being asked to produce, generate the task graph that corresponds to those keys.

    This would be mildly more efficient than today's generate-then-cull even in the early case when everything is on the client side.

  2. Given the keys that we are being asked to produce, generate the keys that we will need from each of our dependency layers.

    This will allow us to pass down enough information to continue the local culling process on dict-backed layers, but will not force us to go through the generation/culling step on the client side just yet.

As a test case we might create these methods on the ReadParquet and then the Blockwise mapping classes. The ReadParquet class is probably very easy and the Blockwise class is probably very hard. They might make good test cases though. To verify effectiveness we could do something like

df = dd.read_parquet("...")
df.head()

And then follow things through and see that we never actually generated the full graph. We could then expand this to ...

df = dd.read_parquet("...")
df["z"] = df.x + df.y
df.head()

And hopefully see the same thing, even now that there are now two layers and also a Blockwise in there.

This is just my impressions after looking at this code though. There may be better approaches here.

@quasiben
Copy link
Member

quasiben commented Aug 18, 2020

@madsbk I kicked CI to re-run the tests

Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A couple of small comments, but it's probably too early for this kind of review.

@madsbk madsbk mentioned this pull request Aug 26, 2020
2 tasks
@madsbk madsbk force-pushed the cull_high_level_graph branch from b305e37 to 62bc542 Compare August 27, 2020 09:08
@madsbk madsbk force-pushed the cull_high_level_graph branch from 62bc542 to 9bad423 Compare August 27, 2020 11:42
@mrocklin mrocklin changed the title [WIP] Culling high level graphs Culling high level graphs Sep 23, 2020
Copy link
Member

@mrocklin mrocklin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general this looks good to me. I'm actually surprised at how small this change ended up being. In my mind it was considerably larger. Thank you for exploring this space @madsbk . I feel like we both have a much better sense for how this work is going to go now.

I've made a few comments, but they're all pretty minor or future-leaning. If you have time to clean things up tomorrow (I'm guessing that this will take 30m ) I'm happy to merge when I wake up.


def get_dependencies(self, all_hlg_keys):
_ = self._dict # trigger materialization
return self._cached_dict["basic_layer"].get_dependencies(all_hlg_keys)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually I think that we're going to want to separate dependencies from graph generation. This can safely be future work though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree and agree :)

Comment on lines +105 to +106
part_ids=[i for i in self.part_ids if (self.name, i) in keys],
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know of any currently. It's probably cheaper to sort on an as-needed basis though than do what we are here though.

@mrocklin mrocklin merged commit f0b2ac2 into dask:master Sep 23, 2020
@mrocklin
Copy link
Member

Thanks @madsbk ! This is in.

@rjzamora
Copy link
Member

Woohoo - thanks @madsbk! And thanks @mrocklin for reviewing :)

@madsbk madsbk deleted the cull_high_level_graph branch September 24, 2020 07:52
@JSKenyon
Copy link
Contributor

@madsbk and @mrocklin this PR seems to have broken some custom graph construction which was working for me prior to 2.28.0. If optimize_graph is True I now get errors like AttributeError: 'dict' object has no attribute 'discard'. I will likely look into raising a bug report tomorrow, but before I do, I was wondering if either of you might have an intuition regarding what could be going wrong? My instinct is that there is a dictionary where there should be a set, but I am unsure how these changes would have exposed this, given that it definitely worked prior to 2.28.0.

@mrocklin
Copy link
Member

mrocklin commented Sep 28, 2020 via email

TomAugspurger added a commit to TomAugspurger/dask that referenced this pull request Oct 1, 2020
jrbourbeau pushed a commit that referenced this pull request Oct 2, 2020
@sjperkins sjperkins mentioned this pull request Oct 5, 2020
TomAugspurger added a commit to TomAugspurger/dask that referenced this pull request Oct 6, 2020
jrbourbeau pushed a commit that referenced this pull request Oct 8, 2020
kumarprabhu1988 pushed a commit to kumarprabhu1988/dask that referenced this pull request Oct 29, 2020
Follows on from dask#6508

* Implemented culling of high level graphs
* Skip layers nobody depend on
* Added class Layer(Mapping)
* Implemented a default Layer.cull()
* Added core.keys_in_tasks()
* Layer.get_external_dependencies() to use keys_in_tasks()
* Blockwise to implement the Layer protocol
* optimize_blockwise(): fixed dependencies
* ParquetSubgraph(): implemented Layer
* Implemented HLG.keyset()
* Layer.cull(): now returns a new Layer and key dependencies
kumarprabhu1988 pushed a commit to kumarprabhu1988/dask that referenced this pull request Oct 29, 2020
kumarprabhu1988 pushed a commit to kumarprabhu1988/dask that referenced this pull request Oct 29, 2020
gjoseph92 added a commit to gjoseph92/dask that referenced this pull request Dec 13, 2021
In dask#8452 I realized that an incorrect pattern had emerged from dask#6510 of including
```python
    if not isinstance(dsk, HighLevelGraph):
        dsk = HighLevelGraph.from_collections(id(dsk), dsk, dependencies=())
```
in optimization functions. Specifically, `id(dsk)` is incorrect as the layer name here. The layer name must match the `.name` of the resulting collection that gets created by `__dask_postpersist__()`, otherwise `__dask_layers__()` on the optimized collection will be wrong. Since `optimize` doesn't know about collections and isn't passed a layer name, the only reasonable thing to do here is to error when given a low-level graph.
This is safe to do for Arrays and DataFrames, since their constructors convert any low-level graphs to HLGs.

This PR doesn't really fix anything—the code path removed should be unused—but it eliminates a confusing pattern that has already wandered its way into other places dask#8316 (comment).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants