Blockwise map_partitions with partition_info#8310
Conversation
|
Just to note that this should be unblocked now since #6628 is in |
|
I actually don't think this was blocked? But it is ready to review when someone has the time. |
Oh I guess I misunderstood :) |
|
@jsignell what do you think of this? Could we merge? |
ian-r-rose
left a comment
There was a problem hiding this comment.
This is cool. Offline we've talked about the various ways that HLG-things can fail to serialize/deserialize correctly. I haven't fully traced this application, but have you checked that this works as expected in a distributed context?
|
|
||
| args2.insert(0, BlockwiseDepDict(partition_info)) | ||
| orig_func = func | ||
| func = lambda partition_info, *args, **kwargs: orig_func( |
There was a problem hiding this comment.
I haven't been particularly good about this myself, but it's my impression that lambdas and closures within functions are more expensive to serialize than top-level functions. I wish I could find some actual documentation about this, but the closest I've seen is this comment from Jim: #6192 (comment)
There was a problem hiding this comment.
I've heard that too, but don't really know the reasoning. Certainly serializing an arbitrary function is more expensive than something defined in a module, because the one in a module can just be referenced by name, instead of inlining all the code. @jcrist?
Here though, if you're taking partition_info, orig_funcion is very, very likely something a user wrote, and probably isn't from library code, so probably needs its code serialized anyway. So in this specific case, I wonder if it's less likely to be a concern?
When I wrote this, I did originally try to avoid the lambda, and went in all sorts of blockwise circles trying to figure it out. But Blockwise only supports positional arguments (kwargs are just literals), and trying to change that was not easy. And partition_info may have to be given to the function as a kwarg. So in the end, something like this seemed the easiest.
I guess we could instead do:
# module-level
def apply_add_partition_info(func, partition_info, *args, **kwargs):
return func(*args, **kwargs, partition_info=partition_info)
...
if has_keyword(func, "partition_info"):
func = functools.partial(apply_add_partition_info, func)| for i, division in enumerate(divisions[:-1]) | ||
| } | ||
|
|
||
| args2.insert(0, BlockwiseDepDict(partition_info)) |
There was a problem hiding this comment.
This is an interesting application of BlockwiseDep. I don't think it's quite what was envisioned for the helper class, but it doesn't seem obviously wrong, and the win in this case is pretty clear. I wonder if it would be worth renaming that class or improving the docs, since it is not in this case strictly used for dependencies?
As far as I can tell, this is very similar to what @bmerry was trying in #7686 (though that one seems lazier, which may have some benefits). Unfortunately, that PR seems to have not gotten much attention, though it should have,
There was a problem hiding this comment.
Yup, the lack of laziness here was just laziness on my part :). This could be its own BlockwiseDep subclass that generates the info at construction time. Though since divisions would have to be embedded anyway, I felt like the savings from lazily generating i just weren't worth the effort.
I agree that this is a perhaps-unintended but very useful application of BlockwiseDep. I have a feeling that this is a pattern we'll want to reuse in many places. This is why I felt that #7513 would be powerful: it gives you an easy interface for passing in auxiliary data that needs to be broadcast like everything else, but is literal instead of delayed. I have a feeling that with a few basic constructs and the ability to pass in arrays/lists/iterables wrapped in a marker class, you could compose pretty much anything you need without defining new BlockwiseDep subclasses.
For example, I'd want to use this for #8445, in order to pass in the filenames here:
dask/dask/dataframe/io/parquet/core.py
Lines 738 to 770 in a5aecac
And yes, this seems pretty much exactly like what @bmerry was trying! Including the fact that you added support for passing a BlockwiseDep instance into array.blockwise.blockwise, which I also did in dataframe.core.partitionwise_graph, which to me is an equivalent function.
I wonder if it would be worth renaming that class or improving the docs, since it is not in this case strictly used for dependencies?
Maybe BlockwiseLiteral? BlockwiseArg? I also perhaps like the idea of users not having to interact with it at all in basic cases—what if map_partitions/map_blocks automatically supported sequences or array-likes? (Just judging this on whether shapes align might be too much though; I could see wanting some explicit wrapping to clarify whether it's a literal or a broadcastable.) That plus block_info might get you everything you'd need as a user.
gjoseph92
left a comment
There was a problem hiding this comment.
@ian-r-rose I don't remember if I've tested this on distributed or not yet—good point, I'll verify that.
|
|
||
| args2.insert(0, BlockwiseDepDict(partition_info)) | ||
| orig_func = func | ||
| func = lambda partition_info, *args, **kwargs: orig_func( |
There was a problem hiding this comment.
I've heard that too, but don't really know the reasoning. Certainly serializing an arbitrary function is more expensive than something defined in a module, because the one in a module can just be referenced by name, instead of inlining all the code. @jcrist?
Here though, if you're taking partition_info, orig_funcion is very, very likely something a user wrote, and probably isn't from library code, so probably needs its code serialized anyway. So in this specific case, I wonder if it's less likely to be a concern?
When I wrote this, I did originally try to avoid the lambda, and went in all sorts of blockwise circles trying to figure it out. But Blockwise only supports positional arguments (kwargs are just literals), and trying to change that was not easy. And partition_info may have to be given to the function as a kwarg. So in the end, something like this seemed the easiest.
I guess we could instead do:
# module-level
def apply_add_partition_info(func, partition_info, *args, **kwargs):
return func(*args, **kwargs, partition_info=partition_info)
...
if has_keyword(func, "partition_info"):
func = functools.partial(apply_add_partition_info, func)| for i, division in enumerate(divisions[:-1]) | ||
| } | ||
|
|
||
| args2.insert(0, BlockwiseDepDict(partition_info)) |
There was a problem hiding this comment.
Yup, the lack of laziness here was just laziness on my part :). This could be its own BlockwiseDep subclass that generates the info at construction time. Though since divisions would have to be embedded anyway, I felt like the savings from lazily generating i just weren't worth the effort.
I agree that this is a perhaps-unintended but very useful application of BlockwiseDep. I have a feeling that this is a pattern we'll want to reuse in many places. This is why I felt that #7513 would be powerful: it gives you an easy interface for passing in auxiliary data that needs to be broadcast like everything else, but is literal instead of delayed. I have a feeling that with a few basic constructs and the ability to pass in arrays/lists/iterables wrapped in a marker class, you could compose pretty much anything you need without defining new BlockwiseDep subclasses.
For example, I'd want to use this for #8445, in order to pass in the filenames here:
dask/dask/dataframe/io/parquet/core.py
Lines 738 to 770 in a5aecac
And yes, this seems pretty much exactly like what @bmerry was trying! Including the fact that you added support for passing a BlockwiseDep instance into array.blockwise.blockwise, which I also did in dataframe.core.partitionwise_graph, which to me is an equivalent function.
I wonder if it would be worth renaming that class or improving the docs, since it is not in this case strictly used for dependencies?
Maybe BlockwiseLiteral? BlockwiseArg? I also perhaps like the idea of users not having to interact with it at all in basic cases—what if map_partitions/map_blocks automatically supported sequences or array-likes? (Just judging this on whether shapes align might be too much though; I could see wanting some explicit wrapping to clarify whether it's a literal or a broadcastable.) That plus block_info might get you everything you'd need as a user.
|
@ian-r-rose seems to work! In [1]: import dask
...: import dask.dataframe as dd
...: import distributed
...:
...: with distributed.Client() as client:
...: df = dask.datasets.timeseries()
...: df2 = df.map_partitions(lambda p, partition_info: p.assign(**partition_info), meta=df.assign(number=1, division=None))
...: print(df2.compute())
...:
id name x y number division
timestamp
2000-01-01 00:00:00 969 Bob 0.568661 -0.944266 0 2000-01-01
2000-01-01 00:00:01 951 Victor 0.654146 -0.508345 0 2000-01-01
2000-01-01 00:00:02 1013 Sarah -0.369867 0.810990 0 2000-01-01
2000-01-01 00:00:03 970 Patricia -0.240122 0.246713 0 2000-01-01
2000-01-01 00:00:04 1011 Ingrid -0.933865 -0.438737 0 2000-01-01
... ... ... ... ... ... ...
2000-01-30 23:59:55 1009 Jerry 0.451126 -0.875011 29 2000-01-30
2000-01-30 23:59:56 1054 Charlie -0.942760 0.270204 29 2000-01-30
2000-01-30 23:59:57 989 Kevin -0.466265 0.525312 29 2000-01-30
2000-01-30 23:59:58 1009 Frank 0.896524 0.897975 29 2000-01-30
2000-01-30 23:59:59 1016 Jerry 0.720995 0.815837 29 2000-01-30
[2592000 rows x 6 columns] |
jrbourbeau
left a comment
There was a problem hiding this comment.
Thanks @gjoseph92. I left a couple of small comments, but otherwise I get the sense this is good to go
Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
Fixes #8309
cc @rjzamora @ian-r-rose
partition_infoinmap_partitionsmaterializes the graph unnecessarily #8309pre-commit run --all-files