Skip to content

Generically rebuild a collection with different keys #7093

@crusaderky

Description

@crusaderky

I'm coding a generic library function that takes in input an arbitrary dask collection and produces in output a variant of that same collection, which computes to the same values but with a different dask graph and cohexists on the original collection.

An example is #6804.

I can't find how to implement it in a way that supports third-party collections - not without making assumptions beyond those stipulated by https://docs.dask.org/en/latest/spec.html and https://docs.dask.org/en/latest/custom-collections.html.

I am therefore proposing to tighten the definitions in both documents to align them to the de-facto implementation of all builtin dask collections. This is also a good thing because I expect the dask codebase to already blindly assume the below in several places simply because it's true for Array, Bag, Dataframe, Series and Delayed:

1

In https://docs.dask.org/en/latest/spec.html#definitions, replace

A key is any hashable value that is not a task:

with

A key is either a string or a tuple of 1+ elements, whose first element is a string and the remainder are arbitrary hashables:

2

In https://docs.dask.org/en/latest/custom-collections.html#__dask_keys__, add the requirement that, if more than one key is returned, all keys must be tuples with an identical first element (commonly referred to as the name of the collection)

My problem here is that I don't know anything about how custom collections look like in other projects. However, I also strongly suspect that they already adhere to the de facto standard above, simply because otherwise the dask codebase would fall over in multiple points.

3

In https://docs.dask.org/en/latest/custom-collections.html#__dask_postpersist__, add that the returned callable must accept an optional keyword parameter, name, which will be used to replace the collection's name.

Example

Here we're introducing a 1 second delay in the compute() call of a dask bag:

>>> import time
>>> import dask.bag
>>> from dask import delayed
>>> from dask.base import tokenize
>>> from dask.highlevelgraph import HighLevelGraph

>>> a = dask.bag.from_sequence([1,2,3,4], npartitions=2)
>>> dict(a.__dask_graph__())
{('from_sequence-00e91bb38e7830800935abba6e2614f8', 0): [1, 2],
 ('from_sequence-00e91bb38e7830800935abba6e2614f8', 1): [3, 4]}
>>> s = delayed(time.sleep)(1)
>>> name = "with_sleep-" + tokenize(a, s)
>>> def _with_sleep(node, deps):
...     return node

# Simplified - actual implementation is a bit more involved to make it
# work for arbitrary collections, e.g. 2+ dimensional arrays
>>> layer = {(name, i): (_with_sleep, k, s.key) for i, k in enumerate(a.__dask_keys__())}

>>> dsk = HighLevelGraph.from_collections(name, layer, dependencies=(a, s))
>>> rebuild, args = a.__dask_postpersist__()
>>> b = rebuild(dsk, *args, name=name)
>>> dict(b.__dask_graph__())
{('from_sequence-00e91bb38e7830800935abba6e2614f8', 0): [1, 2],
 ('from_sequence-00e91bb38e7830800935abba6e2614f8', 1): [3, 4],
 'sleep-1485da81-3fc0-4a0d-b099-7dc21ccc7d92': (<function time.sleep>, 1),
 ('with_sleep-0e771ca8ce0b7fd08e0b0dc7c4dde9d7', 0): (
     <function _with_sleep>, 
     ('from_sequence-00e91bb38e7830800935abba6e2614f8', 0), 
     'sleep-1485da81-3fc0-4a0d-b099-7dc21ccc7d92'
 ),
 ('with_sleep-0e771ca8ce0b7fd08e0b0dc7c4dde9d7', 1): (
     <function _with_sleep>,
     ('from_sequence-00e91bb38e7830800935abba6e2614f8', 1),
     'sleep-1485da81-3fc0-4a0d-b099-7dc21ccc7d92'
 ),
}

Additionally, it doesn't seem that https://docs.dask.org/en/latest/custom-collections.html has been updated since the introduction of HighLevelGraph.

Specifically:

  • it states that the output of __dask_graph__ is a MutableMapping; however, HighLevelGraph is a Mapping.
  • it makes no mention of the __dask_layers__ method. I think it would be sensible to stipulate that, if __dask_layers__ is defined, then __dask_graph__ must return a HighLevelGraph?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions