Skip to content

High level graph pack/unpack for Distributed#6786

Merged
mrocklin merged 8 commits intodask:masterfrom
madsbk:hlg_serialization
Nov 3, 2020
Merged

High level graph pack/unpack for Distributed#6786
mrocklin merged 8 commits intodask:masterfrom
madsbk:hlg_serialization

Conversation

@madsbk
Copy link
Contributor

@madsbk madsbk commented Nov 2, 2020

This PR introduce packing and unpacking of HLG layer:

    def distributed_pack(self) -> Optional[Any]:
        """Pack the layer for scheduler communication in Distributed

        This method should pack its current state and is called by the Client when
        communicating with the Scheduler.
        The Scheduler will then use .distributed_unpack(data, ...) to unpack the
        state, materialize the layer, and merge it into the global task graph.

        The returned state must be compatible with Distributed's scheduler, which
        means it must obey the following:
          - Serializable by msgpack
          - All remote data must be unpacked (see unpack_remotedata())
          - All keys must be converted to strings now or when unpacking
          - All tasks must be serialized (see dumps_task())

        Alternatively, the method can return None, which will make Distributed
        materialize the layer and use a default packing method.

        Returns
        -------
        state: Object serializable by msgpack
            Scheduler compatible state of the layer
        """
        return None

    @classmethod
    def distributed_unpack(
        cls, state: Any, dsk: Dict[str, Any], dependencies: Mapping[Hashable, Set]
    ) -> None:
        """Unpack the state of a layer previously packed by .distributed_pack()

        This method is called by the scheduler in Distributed in order to unpack
        the state of a layer and merge it into its global task graph. The method
        should update `dsk` and `dependencies`, which are the already materialized
        state of the preceding layers in the high level graph. The layers of the
        high level graph are unpacked in topological order.

        See Layer.distributed_pack() for packing detail.

        Parameters
        ----------
        state: Any
            The state returned by Layer.distributed_pack()
        dsk: dict
            The materialized low level graph of the already unpacked layers
        dependencies: Mapping
            The dependencies of each key in `dsk`
        """
        raise NotImplementedError(f"{type(cls)} doesn't implement distributed_unpack()")
  • Tests added / passed
  • Passes black dask / flake8 dask

@madsbk madsbk marked this pull request as ready for review November 2, 2020 16:24
@madsbk
Copy link
Contributor Author

madsbk commented Nov 2, 2020

@jrbourbeau @mrocklin, this is ready for review (and merge). We might have to add some extra arguments to distributed_pack() in order to implement blockwise but this should be sufficient for shuffle.

return out_d

def is_materialized(self):
return hasattr(self, "_cached_dict")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What are these for? Long-term, I would hope that we would not materialize the graphs, or even if we did materialize them we might not want to ship the materialized forms. Am I right to hope that these become unnecessary?

Copy link
Contributor Author

@madsbk madsbk Nov 3, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, long-term this shouldn't be necessary. But I suspect that it will take some time before we get to that point :)

input_keys.update(v)

raw = dict(obj)
raw = str_graph(raw, extra_values=input_keys)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was a bit curious about the call to str_graph here. I understand that we need to do this at some point but I was hoping that it could be general. Thinking about this again though, this seems like the kind of thing that we do want to special-case in some circumstances (maybe we never make the tuples) so I guess it's maybe a good idea regardless.

"npartitions_input": self.npartitions_input,
"ignore_index": self.ignore_index,
"name_input": self.name_input,
"meta_input": self.meta_input.to_json(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With to_json I think we can loose data type information. For example:

In [22]: df
Out[22]:
     a  b
0  1.0  4
1  2.0  5
2  3.0  6

In [23]: df.dtypes
Out[23]:
a    float32
b      UInt8
dtype: object

In [24]: pd.read_json(df.to_json())
Out[24]:
   a  b
0  1  4
1  2  5
2  3  6

In [25]: pd.read_json(df.to_json()).dtypes
Out[25]:
a    int64
b    int64
dtype: object

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. I have fixed it by using to_serialize(), which also works with other types like cudf.DataFrame.

Comment on lines +46 to +54
The main motivation of a layer is to represent a collection of tasks
symbolically in order to speedup a series of operations significantly.
Ideally, a layer should stay in this symbolic state until execution
but in practice some operations will force the layer to generate all
its internal tasks. We say that the layer has been materialized.

Most of the default implementations in this class will materialize the
layer. It is up to derived classes to implement non-materializing
implementations.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for taking the time to add this note

@madsbk
Copy link
Contributor Author

madsbk commented Nov 3, 2020

@mrocklin
Copy link
Member

mrocklin commented Nov 3, 2020

In [1]: import dask.array as da
In [2]: x = da.ones(10)
In [3]: y = x + 1
In [4]: y.__dask_graph__().layers[y.name]
Out[4]: Blockwise<(('ones-a7883aa065f53e381957f6940baaf48b', ('.0',)), (1, None)) -> add-a701e8a9322af8d1565010d507adf66a>

In [5]: y.__dask_graph__().layers[y.name].__dask_distributed_pack__()

@mrocklin
Copy link
Member

mrocklin commented Nov 3, 2020

Whoops, misfire on the comment. Should we be concerned that for some layers this returns None? Or is the plan that if this returns None we materialize?

@mrocklin
Copy link
Member

mrocklin commented Nov 3, 2020


def is_materialized(self):
return hasattr(self, "_cached_dict")

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to impement Blockwise.__dask_distributed_pack__? Maybe this would be a good task for @rjzamora given his current work on Blockwise

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, Blockwise.__dask_distributed_pack__() is definitely next!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But we might need some extra API to handle unpacking of remote data.

@mrocklin mrocklin merged commit b439233 into dask:master Nov 3, 2020
@mrocklin
Copy link
Member

mrocklin commented Nov 3, 2020

This is in. Thanks @madsbk !

@madsbk madsbk deleted the hlg_serialization branch November 4, 2020 18:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants