-
-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
tl;dr: what if we extend Blockwise to also take array-like objects in its inputs, along with symbols? Think of it as "move io_deps into the indices", with a few advantages that fall out:
- More consistent interface to Blockwise: think about IO in terms of broadcasting (auxiliary) data against a chunk pattern, just like Blockwise already does.
- More explicit pattern for serialization and managing opaque serialized data on the scheduler. Also, we can cull unneeded parts of the array-like data prior to serialization.
- Separation of concerns:
io_depscurrently mixes serialization info (module path) with the blockwise arguments, andmake_blockwise_graphincludes deserialization of theBlockwiseIODeps. This fully separates graph logic from serialization.
Proposal:
What if, instead of just being able to give symbolic arguments (like "x", "ij") to Blockwise, you could also give array-like things directly (like np.arange(10), "i"). By “array-like” I mean a concept similar to BlockwiseIODeps: any object with a shape that supports multidimensional indexing. Could be an ndarray, or could be a class with a __getitem__. It would let you do stuff like:
>>> y = np.arange(3)
>>> make_blockwise_graph(add, 'z', 'i', 'x', 'i', y, 'i', numblocks={"x": (3,)})
('z', 0): (add, ('x', 0), 0)
('z', 1): (add, ('x', 1), 1)
('z', 2): (add, ('x', 2), 2)
>>> y = np.arange(6).reshape(3, 2)
>>> make_blockwise_graph(np.outer, 'z', 'ij', 'x', 'i', y, 'j', numblocks={"x": (2,)})
('z', 0, 0): (<function outer at 0x7fa6c8923e50>, ('x', 0), array([0, 1]))
('z', 0, 1): (<function outer at 0x7fa6c8923e50>, ('x', 0), array([2, 3]))
('z', 0, 2): (<function outer at 0x7fa6c8923e50>, ('x', 0), array([4, 5]))
('z', 1, 0): (<function outer at 0x7fa6c8923e50>, ('x', 1), array([0, 1]))
('z', 1, 1): (<function outer at 0x7fa6c8923e50>, ('x', 1), array([2, 3]))
('z', 1, 2): (<function outer at 0x7fa6c8923e50>, ('x', 1), array([4, 5]))(notice how y is getting indexed at graph-construction time, and the contents are inlined into tasks. Just like we do with io_deps, you could do fancier things with that interface than just inline data: have __getitem__ return a block info dict, or generate a sub-task for the current block coordinates, etc.)
To support serialization, these array-like objects would have to have a __dask_distributed_pack__ and __dask_distributed_unpack__ method. For convenience, any plain object (like a NumPy array) would be wrapped in BlockwiseNdarray for you by the Blockwise constructor.
The __dask_distributed_pack__ interface is more specific than the current BlockwiseIODeps: it's passed the list of indices into that argument that will actually be used (after culling). (With current io_deps, I don't believe each BlockwiseIODeps gets this chance to drop unnecessary data before packing.) Using this list of indices, we can pre-slice out only the necessary chunks (of an ndarray, for example) on the client, serialize them, and stick them in a dict that supports equivalent __getitem__s (example here). So when generating the graph on the scheduler, we just select those opaque Serialized blobs and inline them into the graph. Then on the worker, the Serialized objects are unpacked. (This Serialized-objects-on-the-scheduler part is the same as our plan with BlockwiseIODeps right now; in both cases, this is aspirational, since we need to adjust dumps_task serialization to make this happen.)
This all is nearly isomorphic with the current io_deps. The reason it appeals to me is that conceptually, Blockwise is already all about broadcasting array-ish things together. This just extends that concept to more objects while using the same interface. For me at least, it feels less like understanding a “new thing” than io_deps, which could be helpful for other contributors/users.
I also feel like it would shift my mindset when implementing IO operations towards “how do I get this auxiliary data (CSV filenames, np.random seeds, etc.) into the right shape so it broadcasts against my output chunk pattern, then let blockwise figure out the mappings?” Whereas to me, io_deps encourages thinking about “how to I map from a coordinate in the output to the inputs that task needs,” which conceptually I’ve found harder: normally, it's blockwise’s job to figure out those mappings for you.
I think nice IO patterns emerge given this interface and a few helpers to tell your function where it is within the block structure (like Slices and BlockIndex). Instead of making subclasses of BlockwiseIODeps for different IO operations, you might be able to compose together those helpers, a reshaped ndarray of your inputs, and a function for many IO cases.
I partially implemented this at https://github.com/dask/dask/compare/main...gjoseph92:blockwise-arraylike-args?expand=1. For a smaller diff, this starts just prior to #7281. I recommend only looking at the commits for:
make_blockwise_graphchanges- pack/unpack logic
- most useful, an (I think complete?) implementation of
Array.randomusing this
Note that I haven't implemented blockwise rewriting here; intuitively I think it should work, but it's currently broken.