Conversation
Follows on from dask#6508
|
Thanks for pushing something up @madsbk . Having something concrete makes it a lot easier to think about this problem. For the process of clearing out high level layers that aren't being used, this is clearly a win. For lower level culling I still don't know exactly how to handle culling of the graphs within layers. Eventually, we're going to want to do this on the scheduler side. That means that we're going to want to cull the concrete low level graphs that we have here on the client side (the layers backed by dicts) but not explicitly expand the layers that have high level representations, like Blockwise, or a future Shuffle layer. We're going to want to keep those in abstract form so that we can pass them to the Scheduler. With that in mind, I wonder if we maybe want the following two functions on every layer:
As a test case we might create these methods on the df = dd.read_parquet("...")
df.head()And then follow things through and see that we never actually generated the full graph. We could then expand this to ... df = dd.read_parquet("...")
df["z"] = df.x + df.y
df.head()And hopefully see the same thing, even now that there are now two layers and also a Blockwise in there. This is just my impressions after looking at this code though. There may be better approaches here. |
|
@madsbk I kicked CI to re-run the tests |
mrocklin
left a comment
There was a problem hiding this comment.
A couple of small comments, but it's probably too early for this kind of review.
When <dask#6509> passes, we can remove this fix, which introduces a significant overhead
b305e37 to
62bc542
Compare
62bc542 to
9bad423
Compare
mrocklin
left a comment
There was a problem hiding this comment.
In general this looks good to me. I'm actually surprised at how small this change ended up being. In my mind it was considerably larger. Thank you for exploring this space @madsbk . I feel like we both have a much better sense for how this work is going to go now.
I've made a few comments, but they're all pretty minor or future-leaning. If you have time to clean things up tomorrow (I'm guessing that this will take 30m ) I'm happy to merge when I wake up.
|
|
||
| def get_dependencies(self, all_hlg_keys): | ||
| _ = self._dict # trigger materialization | ||
| return self._cached_dict["basic_layer"].get_dependencies(all_hlg_keys) |
There was a problem hiding this comment.
Eventually I think that we're going to want to separate dependencies from graph generation. This can safely be future work though.
dask/dataframe/io/parquet/core.py
Outdated
| part_ids=[i for i in self.part_ids if (self.name, i) in keys], | ||
| ) |
There was a problem hiding this comment.
I don't know of any currently. It's probably cheaper to sort on an as-needed basis though than do what we are here though.
|
Thanks @madsbk ! This is in. |
|
@madsbk and @mrocklin this PR seems to have broken some custom graph construction which was working for me prior to |
|
Thanks JSKenyon. If you can provide a full traceback that would probably
help us to identify the cause. A new issue would be good. Thanks!
…On Mon, Sep 28, 2020 at 7:21 AM JSKenyon ***@***.***> wrote:
@madsbk <https://github.com/madsbk> and @mrocklin
<https://github.com/mrocklin> this PR seems to have broken some custom
graph construction which was working for me prior to 2.28.0. If
optimize_graph is True I now get errors like AttributeError: 'dict'
object has no attribute 'discard'. I will likely look into raising a bug
report tomorrow, but before I do, I was wondering if either of you might
have an intuition regarding what could be going wrong? My instinct is that
there is a dictionary where there should be a set, but I am unsure how
these changes would have exposed this, given that it definitely worked
prior to 2.28.0.
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#6510 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTE7EKAKTWKBGAKDSQTSICLYJANCNFSM4P6XX6UA>
.
|
This reverts commit f0b2ac2.
Follows on from dask#6508 * Implemented culling of high level graphs * Skip layers nobody depend on * Added class Layer(Mapping) * Implemented a default Layer.cull() * Added core.keys_in_tasks() * Layer.get_external_dependencies() to use keys_in_tasks() * Blockwise to implement the Layer protocol * optimize_blockwise(): fixed dependencies * ParquetSubgraph(): implemented Layer * Implemented HLG.keyset() * Layer.cull(): now returns a new Layer and key dependencies
In dask#8452 I realized that an incorrect pattern had emerged from dask#6510 of including ```python if not isinstance(dsk, HighLevelGraph): dsk = HighLevelGraph.from_collections(id(dsk), dsk, dependencies=()) ``` in optimization functions. Specifically, `id(dsk)` is incorrect as the layer name here. The layer name must match the `.name` of the resulting collection that gets created by `__dask_postpersist__()`, otherwise `__dask_layers__()` on the optimized collection will be wrong. Since `optimize` doesn't know about collections and isn't passed a layer name, the only reasonable thing to do here is to error when given a low-level graph. This is safe to do for Arrays and DataFrames, since their constructors convert any low-level graphs to HLGs. This PR doesn't really fix anything—the code path removed should be unused—but it eliminates a confusing pattern that has already wandered its way into other places dask#8316 (comment).
As a step towards defining a
Layer(Mapping)class (#6438), I have been exploring how we can cull high level graphs directly. I think it is useful to have a concrete implementation before deciding on aLayerinterface.This PR implements
class Layer(Mapping), which is an abstract class that establish a protocol for high level graph layers.The class defines three methods that sub-classes can overwrite in order to use domain knowledge to reduce overhead:
Blockwise
This PR also implements a sub-class
Blockwise(Layer)that uses the blockwise structure to compute key dependencies and culling efficiently..cull() and .get_external_dependencies()returns an instance ofBasicLayer(Layer)that embeds the information needed to do culling efficiently.ParquetSubgraph
This PR also implements a sub-class
ParquetSubgraph(Layer)that implementsget_external_dependencies()by always returning the empty set andcull()by filteringparts.cc. @mrocklin, @rjzamora, @quasiben