Skip to content

Blockwise map_partitions with partition_info#8310

Merged
jrbourbeau merged 5 commits intodask:mainfrom
gjoseph92:map_partitions-partition_info-blockwise
Dec 8, 2021
Merged

Blockwise map_partitions with partition_info#8310
jrbourbeau merged 5 commits intodask:mainfrom
gjoseph92:map_partitions-partition_info-blockwise

Conversation

@gjoseph92
Copy link
Collaborator

Fixes #8309

cc @rjzamora @ian-r-rose

@jsignell
Copy link
Member

jsignell commented Nov 5, 2021

Just to note that this should be unblocked now since #6628 is in

@gjoseph92
Copy link
Collaborator Author

I actually don't think this was blocked? But it is ready to review when someone has the time.

@jsignell
Copy link
Member

jsignell commented Nov 8, 2021

I actually don't think this was blocked?

Oh I guess I misunderstood :)

@gjoseph92
Copy link
Collaborator Author

@jsignell what do you think of this? Could we merge?

Copy link
Collaborator

@ian-r-rose ian-r-rose left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is cool. Offline we've talked about the various ways that HLG-things can fail to serialize/deserialize correctly. I haven't fully traced this application, but have you checked that this works as expected in a distributed context?


args2.insert(0, BlockwiseDepDict(partition_info))
orig_func = func
func = lambda partition_info, *args, **kwargs: orig_func(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't been particularly good about this myself, but it's my impression that lambdas and closures within functions are more expensive to serialize than top-level functions. I wish I could find some actual documentation about this, but the closest I've seen is this comment from Jim: #6192 (comment)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've heard that too, but don't really know the reasoning. Certainly serializing an arbitrary function is more expensive than something defined in a module, because the one in a module can just be referenced by name, instead of inlining all the code. @jcrist?

Here though, if you're taking partition_info, orig_funcion is very, very likely something a user wrote, and probably isn't from library code, so probably needs its code serialized anyway. So in this specific case, I wonder if it's less likely to be a concern?

When I wrote this, I did originally try to avoid the lambda, and went in all sorts of blockwise circles trying to figure it out. But Blockwise only supports positional arguments (kwargs are just literals), and trying to change that was not easy. And partition_info may have to be given to the function as a kwarg. So in the end, something like this seemed the easiest.

I guess we could instead do:

# module-level
def apply_add_partition_info(func, partition_info, *args, **kwargs):
    return func(*args, **kwargs, partition_info=partition_info)

...
if has_keyword(func, "partition_info"):
    func = functools.partial(apply_add_partition_info, func)

for i, division in enumerate(divisions[:-1])
}

args2.insert(0, BlockwiseDepDict(partition_info))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an interesting application of BlockwiseDep. I don't think it's quite what was envisioned for the helper class, but it doesn't seem obviously wrong, and the win in this case is pretty clear. I wonder if it would be worth renaming that class or improving the docs, since it is not in this case strictly used for dependencies?

As far as I can tell, this is very similar to what @bmerry was trying in #7686 (though that one seems lazier, which may have some benefits). Unfortunately, that PR seems to have not gotten much attention, though it should have,

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, the lack of laziness here was just laziness on my part :). This could be its own BlockwiseDep subclass that generates the info at construction time. Though since divisions would have to be embedded anyway, I felt like the savings from lazily generating i just weren't worth the effort.

I agree that this is a perhaps-unintended but very useful application of BlockwiseDep. I have a feeling that this is a pattern we'll want to reuse in many places. This is why I felt that #7513 would be powerful: it gives you an easy interface for passing in auxiliary data that needs to be broadcast like everything else, but is literal instead of delayed. I have a feeling that with a few basic constructs and the ability to pass in arrays/lists/iterables wrapped in a marker class, you could compose pretty much anything you need without defining new BlockwiseDep subclasses.

For example, I'd want to use this for #8445, in order to pass in the filenames here:

for d, filename in enumerate(filenames):
dsk[(name, d)] = (
apply,
engine.write_partition,
[
(df._name, d),
path,
fs,
filename,
partition_on,
write_metadata_file,
],
toolz.merge(kwargs_pass, {"head": True}) if d == 0 else kwargs_pass,
)
part_tasks.append((name, d))
final_name = "metadata-" + name
# Collect metadata and write _metadata
if write_metadata_file:
dsk[(final_name, 0)] = (
apply,
engine.write_metadata,
[
part_tasks,
meta,
fs,
path,
],
{"append": append, "compression": compression},
)
else:
dsk[(final_name, 0)] = (lambda x: None, part_tasks)

And yes, this seems pretty much exactly like what @bmerry was trying! Including the fact that you added support for passing a BlockwiseDep instance into array.blockwise.blockwise, which I also did in dataframe.core.partitionwise_graph, which to me is an equivalent function.

I wonder if it would be worth renaming that class or improving the docs, since it is not in this case strictly used for dependencies?

Maybe BlockwiseLiteral? BlockwiseArg? I also perhaps like the idea of users not having to interact with it at all in basic cases—what if map_partitions/map_blocks automatically supported sequences or array-likes? (Just judging this on whether shapes align might be too much though; I could see wanting some explicit wrapping to clarify whether it's a literal or a broadcastable.) That plus block_info might get you everything you'd need as a user.

Copy link
Collaborator Author

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@ian-r-rose I don't remember if I've tested this on distributed or not yet—good point, I'll verify that.


args2.insert(0, BlockwiseDepDict(partition_info))
orig_func = func
func = lambda partition_info, *args, **kwargs: orig_func(
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've heard that too, but don't really know the reasoning. Certainly serializing an arbitrary function is more expensive than something defined in a module, because the one in a module can just be referenced by name, instead of inlining all the code. @jcrist?

Here though, if you're taking partition_info, orig_funcion is very, very likely something a user wrote, and probably isn't from library code, so probably needs its code serialized anyway. So in this specific case, I wonder if it's less likely to be a concern?

When I wrote this, I did originally try to avoid the lambda, and went in all sorts of blockwise circles trying to figure it out. But Blockwise only supports positional arguments (kwargs are just literals), and trying to change that was not easy. And partition_info may have to be given to the function as a kwarg. So in the end, something like this seemed the easiest.

I guess we could instead do:

# module-level
def apply_add_partition_info(func, partition_info, *args, **kwargs):
    return func(*args, **kwargs, partition_info=partition_info)

...
if has_keyword(func, "partition_info"):
    func = functools.partial(apply_add_partition_info, func)

for i, division in enumerate(divisions[:-1])
}

args2.insert(0, BlockwiseDepDict(partition_info))
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, the lack of laziness here was just laziness on my part :). This could be its own BlockwiseDep subclass that generates the info at construction time. Though since divisions would have to be embedded anyway, I felt like the savings from lazily generating i just weren't worth the effort.

I agree that this is a perhaps-unintended but very useful application of BlockwiseDep. I have a feeling that this is a pattern we'll want to reuse in many places. This is why I felt that #7513 would be powerful: it gives you an easy interface for passing in auxiliary data that needs to be broadcast like everything else, but is literal instead of delayed. I have a feeling that with a few basic constructs and the ability to pass in arrays/lists/iterables wrapped in a marker class, you could compose pretty much anything you need without defining new BlockwiseDep subclasses.

For example, I'd want to use this for #8445, in order to pass in the filenames here:

for d, filename in enumerate(filenames):
dsk[(name, d)] = (
apply,
engine.write_partition,
[
(df._name, d),
path,
fs,
filename,
partition_on,
write_metadata_file,
],
toolz.merge(kwargs_pass, {"head": True}) if d == 0 else kwargs_pass,
)
part_tasks.append((name, d))
final_name = "metadata-" + name
# Collect metadata and write _metadata
if write_metadata_file:
dsk[(final_name, 0)] = (
apply,
engine.write_metadata,
[
part_tasks,
meta,
fs,
path,
],
{"append": append, "compression": compression},
)
else:
dsk[(final_name, 0)] = (lambda x: None, part_tasks)

And yes, this seems pretty much exactly like what @bmerry was trying! Including the fact that you added support for passing a BlockwiseDep instance into array.blockwise.blockwise, which I also did in dataframe.core.partitionwise_graph, which to me is an equivalent function.

I wonder if it would be worth renaming that class or improving the docs, since it is not in this case strictly used for dependencies?

Maybe BlockwiseLiteral? BlockwiseArg? I also perhaps like the idea of users not having to interact with it at all in basic cases—what if map_partitions/map_blocks automatically supported sequences or array-likes? (Just judging this on whether shapes align might be too much though; I could see wanting some explicit wrapping to clarify whether it's a literal or a broadcastable.) That plus block_info might get you everything you'd need as a user.

@gjoseph92
Copy link
Collaborator Author

@ian-r-rose seems to work!

In [1]: import dask
   ...: import dask.dataframe as dd
   ...: import distributed
   ...: 
   ...: with distributed.Client() as client:
   ...:     df = dask.datasets.timeseries()
   ...:     df2 = df.map_partitions(lambda p, partition_info: p.assign(**partition_info), meta=df.assign(number=1, division=None))
   ...:     print(df2.compute())
   ...: 
                       id      name         x         y  number   division
timestamp                                                                 
2000-01-01 00:00:00   969       Bob  0.568661 -0.944266       0 2000-01-01
2000-01-01 00:00:01   951    Victor  0.654146 -0.508345       0 2000-01-01
2000-01-01 00:00:02  1013     Sarah -0.369867  0.810990       0 2000-01-01
2000-01-01 00:00:03   970  Patricia -0.240122  0.246713       0 2000-01-01
2000-01-01 00:00:04  1011    Ingrid -0.933865 -0.438737       0 2000-01-01
...                   ...       ...       ...       ...     ...        ...
2000-01-30 23:59:55  1009     Jerry  0.451126 -0.875011      29 2000-01-30
2000-01-30 23:59:56  1054   Charlie -0.942760  0.270204      29 2000-01-30
2000-01-30 23:59:57   989     Kevin -0.466265  0.525312      29 2000-01-30
2000-01-30 23:59:58  1009     Frank  0.896524  0.897975      29 2000-01-30
2000-01-30 23:59:59  1016     Jerry  0.720995  0.815837      29 2000-01-30

[2592000 rows x 6 columns]

@gjoseph92 gjoseph92 requested a review from jrbourbeau December 7, 2021 17:13
Copy link
Member

@jrbourbeau jrbourbeau left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @gjoseph92. I left a couple of small comments, but otherwise I get the sense this is good to go

Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
@jrbourbeau jrbourbeau merged commit 4c4d162 into dask:main Dec 8, 2021
@gjoseph92 gjoseph92 mentioned this pull request Jan 6, 2022
2 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

partition_info in map_partitions materializes the graph unnecessarily

4 participants