Skip to content

Add Layer(Mapping) #6438

@mrocklin

Description

@mrocklin

As part of https://blog.dask.org/2020/07/21/faster-scheduling I think that we should have a Mapping subclass that we use consistently for all layers in a high level graph. This would be a superclass for classes like Blockwise, and would establish a protocol beyond the Mapping protocol that was expected.

Getting dependencies

In particular, I'd like to move the responsibility of culling and calculating dependencies to the layers. This should make calculating dependencies more efficient and also allow us to stay as high level graphs longer, rather than have to lower down to low level graphs.

To that end, I think that the Layer class should have a dependencies method, which takes in a set of output keys and returns a set of dependency keys. This would default to the following:

class Layer(Mapping):
    def dependencies(self, keys: list) -> set:
        return {dep for key in keys for dep in dask.core.get_dependencies(self, key=key)}

Then classes like Blockwise would override this method, and produce a different list based on their internal structure.

Raw graphs

We might also store raw dict graphs as Layers within HighLevelGraphs. This would give us enough uniformity that we could move the ensure_dict call in the dataframe.optimize function a few lines lower to just above the low-level fuse block:

def optimize(dsk, keys, **kwargs):

    if isinstance(dsk, HighLevelGraph):
        # Think about an API for this.
        flat_keys = list(core.flatten(keys))
        dsk = optimize_read_parquet_getitem(dsk, keys=flat_keys)
        dsk = optimize_blockwise(dsk, keys=flat_keys)
        dsk = fuse_roots(dsk, keys=flat_keys)

    dsk = ensure_dict(dsk)

    if isinstance(keys, list):
        dsk, dependencies = cull(dsk, list(core.flatten(keys)))
    else:
        dsk, dependencies = cull(dsk, [keys])

    fuse_subgraphs = config.get("optimization.fuse.subgraphs")
    if fuse_subgraphs is None:
        fuse_subgraphs = True
    dsk, dependencies = fuse(
        dsk, keys, dependencies=dependencies, fuse_subgraphs=fuse_subgraphs,
    )
    dsk, _ = cull(dsk, keys)
    return dsk

cc @quasiben @rjzamora @jakirkham @gforsyth @kkraus14 @jcrist

Long-term the motivation here is to remove culling so that we can move high level graphs up to the scheduler. Short term we probably benefit a bit by losing extra get_dependencies calls (which are non-trivially expensive) and we take on what I think is a modest amount of complexity.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions