High level graph pack/unpack for Distributed#6786
Conversation
|
@jrbourbeau @mrocklin, this is ready for review (and merge). We might have to add some extra arguments to |
| return out_d | ||
|
|
||
| def is_materialized(self): | ||
| return hasattr(self, "_cached_dict") |
There was a problem hiding this comment.
What are these for? Long-term, I would hope that we would not materialize the graphs, or even if we did materialize them we might not want to ship the materialized forms. Am I right to hope that these become unnecessary?
There was a problem hiding this comment.
Yes, long-term this shouldn't be necessary. But I suspect that it will take some time before we get to that point :)
dask/dataframe/shuffle.py
Outdated
| input_keys.update(v) | ||
|
|
||
| raw = dict(obj) | ||
| raw = str_graph(raw, extra_values=input_keys) |
There was a problem hiding this comment.
I was a bit curious about the call to str_graph here. I understand that we need to do this at some point but I was hoping that it could be general. Thinking about this again though, this seems like the kind of thing that we do want to special-case in some circumstances (maybe we never make the tuples) so I guess it's maybe a good idea regardless.
dask/dataframe/shuffle.py
Outdated
| "npartitions_input": self.npartitions_input, | ||
| "ignore_index": self.ignore_index, | ||
| "name_input": self.name_input, | ||
| "meta_input": self.meta_input.to_json(), |
There was a problem hiding this comment.
With to_json I think we can loose data type information. For example:
In [22]: df
Out[22]:
a b
0 1.0 4
1 2.0 5
2 3.0 6
In [23]: df.dtypes
Out[23]:
a float32
b UInt8
dtype: object
In [24]: pd.read_json(df.to_json())
Out[24]:
a b
0 1 4
1 2 5
2 3 6
In [25]: pd.read_json(df.to_json()).dtypes
Out[25]:
a int64
b int64
dtype: objectThere was a problem hiding this comment.
Good catch. I have fixed it by using to_serialize(), which also works with other types like cudf.DataFrame.
| The main motivation of a layer is to represent a collection of tasks | ||
| symbolically in order to speedup a series of operations significantly. | ||
| Ideally, a layer should stay in this symbolic state until execution | ||
| but in practice some operations will force the layer to generate all | ||
| its internal tasks. We say that the layer has been materialized. | ||
|
|
||
| Most of the default implementations in this class will materialize the | ||
| layer. It is up to derived classes to implement non-materializing | ||
| implementations. |
There was a problem hiding this comment.
Thank you for taking the time to add this note
|
Everything passes when running the new Distributed test: https://github.com/dask/distributed/blob/9a0504981ef14cf5cd2b804497ec4d4c301359ec/distributed/protocol/tests/test_highlevelgraph.py |
In [1]: import dask.array as da
In [2]: x = da.ones(10)
In [3]: y = x + 1
In [4]: y.__dask_graph__().layers[y.name]
Out[4]: Blockwise<(('ones-a7883aa065f53e381957f6940baaf48b', ('.0',)), (1, None)) -> add-a701e8a9322af8d1565010d507adf66a>
In [5]: y.__dask_graph__().layers[y.name].__dask_distributed_pack__() |
|
Whoops, misfire on the comment. Should we be concerned that for some layers this returns |
|
Ah, I see that that is indeed the case: |
|
|
||
| def is_materialized(self): | ||
| return hasattr(self, "_cached_dict") | ||
|
|
There was a problem hiding this comment.
Would it make sense to impement Blockwise.__dask_distributed_pack__? Maybe this would be a good task for @rjzamora given his current work on Blockwise
There was a problem hiding this comment.
Yes, Blockwise.__dask_distributed_pack__() is definitely next!
There was a problem hiding this comment.
But we might need some extra API to handle unpacking of remote data.
|
This is in. Thanks @madsbk ! |
This PR introduce packing and unpacking of HLG layer:
black dask/flake8 dask