Skip to content

Support Blockwise-based IO for DataFrame collections#7042

Closed
rjzamora wants to merge 100 commits intodask:mainfrom
rjzamora:blockwise-rewrite
Closed

Support Blockwise-based IO for DataFrame collections#7042
rjzamora wants to merge 100 commits intodask:mainfrom
rjzamora:blockwise-rewrite

Conversation

@rjzamora
Copy link
Member

@rjzamora rjzamora commented Jan 7, 2021

Blocked by #7281

  • Introduces DataFrameLayer, DataFrameIOLayer, and blockwise_io_layer to simplify the construction of Blockwise-based DataFrame layers for IO. Introducing DataFrameLayer also generalizes the existing read_parquet/getitem optimization. The getitem optimization now works for read_parquet, read_orc and read_csv (and can be extended to other IO operations that can target a subset of columns).
  • Refactors read_parquet to use Blockwise.
  • Refactors read_orc to use Blockwise.
  • Refactors read_csv to use Blockwise.



class CSVSubgraph(Mapping):
class CSVFunctionWrapper:
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jakirkham - This is the wrapper function I mentioned offline. Note that the__call__ definition needs to be smart enough to map the key to the data (even if trhe key was stringified by the scheduler).

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I think it's probably not difficult to have Dispatch support strs along with types. That would help us move away from custom objects a bit if that's causing us issues.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could also just have a dict of functions like we do with serialization

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The client may need to persist various objects in these FunctionWrapper functions to enable to worker to translate a key to an output chunk/partition. It feels like a unified dict of functions or Dispatch approach may get messy, but I agree it is worth exploring something like that to provide a clear template for future Blockwise-IO implementations.

@rjzamora rjzamora marked this pull request as ready for review January 7, 2021 23:43
Copy link
Contributor

@madsbk madsbk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I haven't had time for a in-depth review but overall I think it looks great!
Awesome that you got Array support implemented as well.

Comment on lines +186 to +190
io_deps: set (optional)
Set of "placeholder" collection names that will be generated within
this Blockwise layer. Since these collections do not actually exist
outside the layer, any key with a name in this set will be excluded
from the external dependencies.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need io_deps?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be possible to remove the need for this, but it would require a more-significant change to the existing logic in Blockwise. It hink the IO-function wrappers would need to accept an integer (or tuple of integers), and would need to handle the fact that these args are not actually keys. It seemed simpler (and more intuitive) to pass actual keys to the IO-function wrappers, and simply use the io_deps set to make sure these keys are never treated as "real" dependencies.

@mrocklin
Copy link
Member

mrocklin commented Jan 8, 2021

Taking a brief look through this:

In [1]: import dask.dataframe as dd

In [2]: df = dd.read_csv("/home/mrocklin/data/nyctaxi/yellow_tripdata_2019-01.csv")

In [3]: dict(df.dask)
Out[3]: 
{('read-csv-138fcf1d3bba3d059e1b82eba08742cb', 0): (subgraph_callable,
  ('blockwise-io-read-csv-138fcf1d3bba3d059e1b82eba08742cb', 0)),
 ('read-csv-138fcf1d3bba3d059e1b82eba08742cb', 1): (subgraph_callable,
  ('blockwise-io-read-csv-138fcf1d3bba3d059e1b82eba08742cb', 1)),
 ('read-csv-138fcf1d3bba3d059e1b82eba08742cb', 2): (subgraph_callable,
  ('blockwise-io-read-csv-138fcf1d3bba3d059e1b82eba08742cb', 2)),
 ('read-csv-138fcf1d3bba3d059e1b82eba08742cb', 3): (subgraph_callable,
  ('blockwise-io-read-csv-138fcf1d3bba3d059e1b82eba08742cb', 3)),
 ('read-csv-138fcf1d3bba3d059e1b82eba08742cb', 4): (subgraph_callable,
  ('blockwise-io-read-csv-138fcf1d3bba3d059e1b82eba08742cb', 4)),
 ('read-csv-138fcf1d3bba3d059e1b82eba08742cb', 5): (subgraph_callable,
  ('blockwise-io-read-csv-138fcf1d3bba3d059e1b82eba08742cb', 5)),
 ('read-csv-138fcf1d3bba3d059e1b82eba08742cb', 6): (subgraph_callable,
  ('blockwise-io-read-csv-138fcf1d3bba3d059e1b82eba08742cb', 6)),
 ('read-csv-138fcf1d3bba3d059e1b82eba08742cb', 7): (subgraph_callable,
  ('blockwise-io-read-csv-138fcf1d3bba3d059e1b82eba08742cb', 7)),
 ('read-csv-138fcf1d3bba3d059e1b82eba08742cb', 8): (subgraph_callable,
  ('blockwise-io-read-csv-138fcf1d3bba3d059e1b82eba08742cb', 8)),
 ('read-csv-138fcf1d3bba3d059e1b82eba08742cb', 9): (subgraph_callable,
  ('blockwise-io-read-csv-138fcf1d3bba3d059e1b82eba08742cb', 9)),
 ('read-csv-138fcf1d3bba3d059e1b82eba08742cb', 10): (subgraph_callable,
  ('blockwise-io-read-csv-138fcf1d3bba3d059e1b82eba08742cb', 10))}

I'll admit that it would be nice some day to remove subgraph_callable. It's a heavy and opaque thing that I think adds inertia to the codebase. I would prefer to avoid using it more often when it might not be necessary.

I'm also curious about ('blockwise-io-read-csv-138fcf1d3bba3d059e1b82eba08742cb', 10). It looks like a key but it's not in the graph. I would be sad if it were in the graph though. Ideally all of these tasks don't have a shared dependency (which as you may know can screw with ordering).

In [4]: df.dask
Out[4]: <dask.highlevelgraph.HighLevelGraph at 0x7f5565472c70>

In [5]: df.dask.layers
Out[5]: ---------------------------------------------------------------------------
AttributeError                            Traceback (most recent call last)
~/miniconda/lib/python3.8/site-packages/IPython/core/formatters.py in __call__(self, obj)
    700                 type_pprinters=self.type_printers,
    701                 deferred_pprinters=self.deferred_printers)
--> 702             printer.pretty(obj)
    703             printer.flush()
    704             return stream.getvalue()

~/miniconda/lib/python3.8/site-packages/IPython/lib/pretty.py in pretty(self, obj)
    375                 if cls in self.type_pprinters:
    376                     # printer registered in self.type_pprinters
--> 377                     return self.type_pprinters[cls](obj, self, cycle)
    378                 else:
    379                     # deferred printer

~/miniconda/lib/python3.8/site-packages/IPython/lib/pretty.py in inner(obj, p, cycle)
    605             p.pretty(key)
    606             p.text(': ')
--> 607             p.pretty(obj[key])
    608         p.end_group(step, end)
    609     return inner

~/miniconda/lib/python3.8/site-packages/IPython/lib/pretty.py in pretty(self, obj)
    392                         if cls is not object \
    393                                 and callable(cls.__dict__.get('__repr__')):
--> 394                             return _repr_pprint(obj, self, cycle)
    395 
    396             return _default_pprint(obj, self, cycle)

~/miniconda/lib/python3.8/site-packages/IPython/lib/pretty.py in _repr_pprint(obj, p, cycle)
    698     """A pprint that just redirects to the normal repr function."""
    699     # Find newlines and replace them with p.break_()
--> 700     output = repr(obj)
    701     lines = output.splitlines()
    702     with p.group():

~/workspace/dask/dask/dataframe/io/csv.py in __repr__(self)
    166     def __repr__(self):
    167         return "BlockwiseReadCSV<name='{}', n_parts={}, columns={}>".format(
--> 168             self.name, len(self.blocks), list(self.columns)
    169         )
    170 

AttributeError: 'BlockwiseReadCSV' object has no attribute 'columns'

Small bug. But let's go in and find a single task

list(list(dict(df.dask).values())[0][0].dsk.values())[0][0].__dict__
Out[16]: 
{'name': 'blockwise-io-read-csv-138fcf1d3bba3d059e1b82eba08742cb',
 'reader': <function pandas.io.parsers.read_csv(filepath_or_buffer: Union[str, pathlib.Path, IO[~AnyStr]], sep=',', delimiter=None, header='infer', names=None, index_col=None, usecols=None, squeeze=False, prefix=None, mangle_dupe_cols=True, dtype=None, engine=None, converters=None, true_values=None, false_values=None, skipinitialspace=False, skiprows=None, skipfooter=0, nrows=None, na_values=None, keep_default_na=True, na_filter=True, verbose=False, skip_blank_lines=True, parse_dates=False, infer_datetime_format=False, keep_date_col=False, date_parser=None, dayfirst=False, cache_dates=True, iterator=False, chunksize=None, compression='infer', thousands=None, decimal: str = '.', lineterminator=None, quotechar='"', quoting=0, doublequote=True, escapechar=None, comment=None, encoding=None, dialect=None, error_bad_lines=True, warn_bad_lines=True, delim_whitespace=False, low_memory=True, memory_map=False, float_precision=None)>,
 'blocks': ((<function dask.bytes.core.read_block_from_file(lazy_file, off, bs, delimiter)>,
   <OpenFile '/home/mrocklin/data/nyctaxi/yellow_tripdata_2019-01.csv'>,
   0,
   64000000,
   b'\n'),
  (<function dask.bytes.core.read_block_from_file(lazy_file, off, bs, delimiter)>,
   <OpenFile '/home/mrocklin/data/nyctaxi/yellow_tripdata_2019-01.csv'>,
   64000000,
   64000000,
   b'\n'),
  (<function dask.bytes.core.read_block_from_file(lazy_file, off, bs, delimiter)>,
   <OpenFile '/home/mrocklin/data/nyctaxi/yellow_tripdata_2019-01.csv'>,
   128000000,
   64000000,
   b'\n'),
  (<function dask.bytes.core.read_block_from_file(lazy_file, off, bs, delimiter)>,
   <OpenFile '/home/mrocklin/data/nyctaxi/yellow_tripdata_2019-01.csv'>,
   192000000,
   64000000,
   b'\n'),
  (<function dask.bytes.core.read_block_from_file(lazy_file, off, bs, delimiter)>,
   <OpenFile '/home/mrocklin/data/nyctaxi/yellow_tripdata_2019-01.csv'>,
   256000000,
   64000000,
   b'\n'),
  (<function dask.bytes.core.read_block_from_file(lazy_file, off, bs, delimiter)>,
   <OpenFile '/home/mrocklin/data/nyctaxi/yellow_tripdata_2019-01.csv'>,
   320000000,
   64000000,
   b'\n'),
  (<function dask.bytes.core.read_block_from_file(lazy_file, off, bs, delimiter)>,
   <OpenFile '/home/mrocklin/data/nyctaxi/yellow_tripdata_2019-01.csv'>,
   384000000,
   64000000,
   b'\n'),
  (<function dask.bytes.core.read_block_from_file(lazy_file, off, bs, delimiter)>,
   <OpenFile '/home/mrocklin/data/nyctaxi/yellow_tripdata_2019-01.csv'>,
   448000000,
   64000000,
   b'\n'),
  (<function dask.bytes.core.read_block_from_file(lazy_file, off, bs, delimiter)>,
   <OpenFile '/home/mrocklin/data/nyctaxi/yellow_tripdata_2019-01.csv'>,
   512000000,
   64000000,
   b'\n'),
  (<function dask.bytes.core.read_block_from_file(lazy_file, off, bs, delimiter)>,
   <OpenFile '/home/mrocklin/data/nyctaxi/yellow_tripdata_2019-01.csv'>,
   576000000,
   64000000,
   b'\n'),
  (<function dask.bytes.core.read_block_from_file(lazy_file, off, bs, delimiter)>,
   <OpenFile '/home/mrocklin/data/nyctaxi/yellow_tripdata_2019-01.csv'>,
   640000000,
   64000000,
   b'\n')),
 'is_first': (True,
  False,
  False,
  False,
  False,
  False,
  False,
  False,
  False,
  False,
  False),
 'head':       VendorID tpep_pickup_datetime tpep_dropoff_datetime  passenger_count  ...  tolls_amount  improvement_surcharge total_amount  congestion_surcharge
 0            1  2019-01-01 00:46:40   2019-01-01 00:53:20                1  ...           0.0                    0.3         9.95                   NaN
 1            1  2019-01-01 00:59:47   2019-01-01 01:18:59                1  ...           0.0                    0.3        16.30                   NaN
 2            2  2018-12-21 13:48:30   2018-12-21 13:52:40                3  ...           0.0                    0.3         5.80                   NaN
 3            2  2018-11-28 15:52:25   2018-11-28 15:55:45                5  ...           0.0                    0.3         7.55                   NaN
 4            2  2018-11-28 15:56:57   2018-11-28 15:58:33                5  ...           0.0                    0.3        55.55                   NaN
 ...        ...                  ...                   ...              ...  ...           ...                    ...          ...                   ...
 2824         2  2019-01-01 00:34:49   2019-01-01 00:39:23                1  ...           0.0                    0.3         6.30                   NaN
 2825         2  2019-01-01 00:41:40   2019-01-01 00:53:26                1  ...           0.0                    0.3        13.57                   NaN
 2826         2  2019-01-01 00:26:13   2019-01-01 00:42:15                1  ...           0.0                    0.3        16.64                   NaN
 2827         2  2019-01-01 00:48:40   2019-01-01 01:00:40                1  ...           0.0                    0.3        12.50                   NaN
 2828         1  2019-01-01 00:16:55   2019-01-01 00:26:08                2  ...           0.0                    0.3        11.00                   NaN
 
 [2829 rows x 18 columns],
 'header': b'VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge\r\n',
 'kwargs': {},
 'dtypes': {'VendorID': dtype('int64'),
  'tpep_pickup_datetime': dtype('O'),
  'tpep_dropoff_datetime': dtype('O'),
  'passenger_count': dtype('int64'),
  'trip_distance': dtype('float64'),
  'RatecodeID': dtype('int64'),
  'store_and_fwd_flag': dtype('O'),
  'PULocationID': dtype('int64'),
  'DOLocationID': dtype('int64'),
  'payment_type': dtype('int64'),
  'fare_amount': dtype('float64'),
  'extra': dtype('float64'),
  'mta_tax': dtype('float64'),
  'tip_amount': dtype('float64'),
  'tolls_amount': dtype('float64'),
  'improvement_surcharge': dtype('float64'),
  'total_amount': dtype('float64'),
  'congestion_surcharge': dtype('float64')},
 'columns': ['VendorID',
  'tpep_pickup_datetime',
  'tpep_dropoff_datetime',
  'passenger_count',
  'trip_distance',
  'RatecodeID',
  'store_and_fwd_flag',
  'PULocationID',
  'DOLocationID',
  'payment_type',
  'fare_amount',
  'extra',
  'mta_tax',
  'tip_amount',
  'tolls_amount',
  'improvement_surcharge',
  'total_amount',
  'congestion_surcharge'],
 'enforce': False,
 'colname': None,
 'paths': None}

Two things concern me here:

  1. There is a lot of structure to go down in order to find the actual logic here. This is concerning. I would far prefer to have a task like (function, filename, start stop). I think that we're making debugging here much harder for future maintainers, which concerns me.
  2. It looks like very task has information for every other task. This will likely become a large problem with larger datasets.

So in general I think that while this may have removed some complexity from the Blockwise code, it buried into each task. I'm not sure that this is an improvement.

@rjzamora
Copy link
Member Author

rjzamora commented Jan 8, 2021

I'll admit that it would be nice some day to remove subgraph_callable. It's a heavy and opaque thing that I think adds inertia to the codebase. I would prefer to avoid using it more often when it might not be necessary.

Right - For a single IO operation, there is no reason to use SubgraphCallable. However, we still use it here to make fusion logic with other layers as simple as possible.

I'm also curious about ('blockwise-io-read-csv-138fcf1d3bba3d059e1b82eba08742cb', 10). It looks like a key but it's not in the graph. I would be sad if it were in the graph though. Ideally all of these tasks don't have a shared dependency (which as you may know can screw with ordering).

('blockwise-io-read-csv-138fcf1d3bba3d059e1b82eba08742cb', 10) is the "placeholder" key for a collection that does not exist outside that blockwise layer. It does not exist in the graph, and we explicitly exclude these keys from the dependencies (the purpose of the io_deps set). Another option is to pass a tuple of integers here (removing the "placeholder" colection name), but that would require some tweaks in the original Blockwise logic since it assumes we are always operating on a collection with "legal" keys.

Note that these keys also existed before this PR, but they needed to be surgically replaced with the underlying IO-function arguments at graph-construction time. This turned out to be a nightmare in distributed, since the argument list was being serialized on the client, but the graph needed to be materialized on the scheduler.

Small bug. But let's go in and find a single task

Oops - Fixed.

  1. There is a lot of structure to go down in order to find the actual logic here. This is concerning. I would far prefer to have a task like (function, filename, start stop). I think that we're making debugging here much harder for future maintainers, which concerns me.

Sure, but it seems pretty difficult to achieve that after Blockwise fusion. Are you suggesting that a standalone IO operation should be constructed using different logic than other kinds of Blockwise layers? (sorry if I am missunderstanding)

  1. It looks like very task has information for every other task. This will likely become a large problem with larger datasets.

Not sure I follow. Every task includes a callable object that knows how to interpret a key (actually the index component of the key) and return a partition. Therefore, that callable object just needs to embed the input arguments that would already need to exist as arguments in the task.

So in general I think that while this may have removed some complexity from the Blockwise code, it buried into each task. I'm not sure that this is an improvement.

I'm having a hard time following how this is not an improvement over BlockwiseIO. If you are suggesting that this is not an improvement over the Blockwise-free version of IO, then I do understand your concern. That said - Hopefully blockwise-fusion makes this worth it??

@mrocklin
Copy link
Member

mrocklin commented Jan 8, 2021

Sure, but it seems pretty difficult to achieve that after Blockwise fusion. Are you suggesting that a standalone IO operation should be constructed using different logic than other kinds of Blockwise layers? (sorry if I am missunderstanding)

Ah, yeah that makes sense.

Every task includes a callable object that knows how to interpret a key

That callable object seems to know about all possible keys. If the collection has many keys then that object is going to be very large. If our collection has a million keys then the scheduler will have a million serialized versions of callable objects, each holding information about a million keys.

@rjzamora
Copy link
Member Author

rjzamora commented Jan 8, 2021

That callable object seems to know about all possible keys. If the collection has many keys then that object is going to be very large. If our collection has a million keys then the scheduler will have a million serialized versions of callable objects, each holding information about a million keys.

Okay - Sorry. You are definitely right. It seems that the only way to avoid this is to move back to the "injection-at-graph-construction-time" approach for some of the arguments. Instead of doing it for "all" arguments, however, we can do it only for the subset of simple msgpack-serializable args that vary between partitions (anything that is the same in all partitions should be embedded in the function wrapper).

@mrocklin
Copy link
Member

mrocklin commented Jan 8, 2021 via email

@rjzamora
Copy link
Member Author

rjzamora commented Jan 8, 2021

In the meantime, maybe we should revert the previous change that caused
things to fail so that we can release and get people back on track?

Okay - The Blockwise-IO changes were introduced through several PRs dating back to early November. I will just put together a temporary change to remove BlockwiseIO and avoid using Blockwise for any IO-related operations.

[EDIT: I opened #7048 to revert the existing BlockwiseIO logic, so we can be more patient/careful about this PR]

@rjzamora rjzamora mentioned this pull request Jan 8, 2021
1 task
for idx in itertools.product(*[range(len(c)) for c in chunks]):
ndim = len(chunks)
chunk = tuple(chunks[i][idx[i]] for i in range(ndim))
chunk_key_map[tuple(idx)] = chunk
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the very slow look at this @rjzamora! One thing I'm struggling to understand here: the previous implementation of BlockwiseCreateArray produced a mapping between chunk keys and their shapes lazily, allowing a much smaller footprint for large arrays. If I understand this correctly, this now goes back to keeping all of the keys and chunk shapes around.

Now, I could envision this (and similar Layers) packing/unpacking the data needed to create this keymap so that it can be done lazily. Is that where you see this going longer-term?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah - It seems that I was going out of the way to avoid passing chunks to every task (required for lazy chunk resolution), but I don't think that priority makes any sense. We should be able to embed chunks in a simple function wrapper, and then use a similar logic as before.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that makes sense to me, chunks takes up a lot less space than the fully expanded mapping.

Going further -- this implementation only works for uniform functions (ones, zeros, etc), but I have some work-in-progress with the old BlockwiseIO setup that takes into account the position of each chunk. In that case that data will also need to be constructed on the fly. I don't see it as a major impediment with the way you have set up things here, though.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more thought: we should add some basic sanity tests ensuring that all the layer implementations in dask/dask can be serialized so we don't get into trouble again :)

I'd be happy to do that in a follow-up, or make a PR to your branch here.

Copy link
Member Author

@rjzamora rjzamora Feb 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

chunks takes up a lot less space than the fully expanded mapping.

It is worth considering that the expanded mapping does not need to be duplicated in every task when the graph is materialized, but chunks does need to be duplicated. Therefore, we may actually have an even larger memory footprint with chunk_key_map removed.

EDIT: Yeah - Embedding chunks in every task was not a scalable solution. I had to revert 2234ed8

One more thought: we should add some basic sanity tests ensuring that all the layer implementations in dask/dask can be serialized so we don't get into trouble again :)

Yeah - It's actually a bit silly that this is not the first thing I did in this PR :)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although this "func" approach seems to work, it is only a minor optimization. It will also require a bit more work to add a layer of protection to ensure that the user cannot relpace the callable object with arbitrary python code (a security vulnerability). Therefore, I suggest we attack this in a separate/follow-up PR

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we have been dancing around just implementing some extra logic in the __dask_distributed_{pack, unpack}__ for BlockwiseCreateArray. If we store the chunks at the top level, pack and unpack them simply, then produce the key map as needed, we could satisfy your constraint to not duplicate the chunks on every task, as well as mine to not send a mostly-materialized layer across the wire.

If we did that, we could also implement whatever gating/checks would be necessary to make sure only allowed "func"s get expanded on the scheduler. Or am I misunderstanding what you mean @rjzamora?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @ian-r-rose - I don't think you are misunderstanding. __dask_distributed_{pack, unpack}__ is certainly the most promising place to handle dynamic chunk generation. The mechanism itself is very simple, but the gating/check is what I am not quite sure about yet (and have not prototyped anything yet).

Note that I opened #7281 with the intention to hash out these blockwise/array basics before adding DataFrame to the mix. At first, I was thinking that we could start with the "mostly-materialized" layer approach, but I can probably revise that PR with a new proposal for dynamic chunk materialization tomorrow.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @rjzamora, that makes sense to me, I may have some bandwidth this week to look at dynamic chunk materialization if that would be helpful

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool - I'm definitely interested to get your thoughts/ideas.

The main challenge is that a BlockwiseCreateArray will often become a vanilla Blockwise Layer after optimize_blockwise is called, so we cannot simply rely on code in BlockwiseCreateArray.__dask_distributed_{pack, unpack}__. To understand why, consider the case where two arrays are generated with different creation techniques (with one possibly requiring non-trivial io_deps). If the two arrays are combined with a blockwise addition operation, optimize_blockwise will fuse the three layers together, and the final layer type cannot always be BlockwiseCreateArray.

Since we are already planning to support the io_deps attribute for all Blockwise layers, it would be nice to use the same structure to specify dynamic "unpacking" options. For example, we may be able to register a list of options (e.g "generate_chunk") that will map onto "cleared" functions known by the scheduler:

# Dynamic `io_deps` generation function
def generate_chunk(idx, chunks):
    ndim = len(chunks)
    return tuple(chunks[i][idx[i]] for i in range(ndim))

...

# "registered" function dict
io_dep_functions = {
    "generate_chunk" : generate_chunk,
    ...
}

# Example `io_deps` definition.
# Scheduler would know to call `generate_chunk(idx, ((4, 4, 2), (4, 4, 2)))`
# to construct the input for each chunk/index of 'ones-4627d3bbed71139f53cd1f686350e412'
io_deps = {'ones-4627d3bbed71139f53cd1f686350e412': {"generate_chunk": ((4, 4, 2), (4, 4, 2))}}

@jsignell
Copy link
Member

@rjzamora it seems like this won't make it into this release, but do you want to try to get it into the next one?

@rjzamora
Copy link
Member Author

@rjzamora it seems like this won't make it into this release, but do you want to try to get it into the next one?

Totally agree that it cannot get into this weeks release. Yes - I'll make it a personal priority to get it merged before the next release :)

@jsignell
Copy link
Member

actually with the released bumped to next week it might make it in :)

@rjzamora
Copy link
Member Author

@jrbourbeau - I know we discussed the possibility of removing the DataFrameIOLayer/DataFrameLayer stuff from this PR. However, I feel that it simplifies the strategy/code needed to move DataFrame-based IO under the Blockwise "umbrella." I realize that it would be nice simplification to separate the Blockwise changes from the improvement in "column-culling" (related to the move of optimize_read_parquet_getitem to optimize_dataframe_getitem). However, in the absence of DataFrameIOLayer/DataFrameLayer, the Blockwise code will need to be a bit more complicated (the changes are not completely perpendicular).

With the above in mind - I would be happy to break out a simple PR with the the blockwise.py and array-only changes (to capture the key Blockwise + IO foundations). That PR could be merged first, and then this PR could be applied on top of it with the DataFrame-focused changes. Does that sound reasonable?

@rjzamora rjzamora changed the title Redesign/remove BlockwiseIO Support Blockwise-based IO for DataFrame collections Feb 26, 2021
@rjzamora rjzamora marked this pull request as draft February 26, 2021 19:12
shape,
chunks,
):
self.name = name
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any benefit in this? Will there ever be a case of self.name != self.output?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the name is only saved here for the __repr__ - I'd be happy to remove it if you think the parent Blockwise.__repr__ is sufficient (perhaps we just add information about io_deps in that method?).

If you have the bandwidth @crusaderky, I'd be very interested to get your thoughts on #7281 (a smaller subset of this PR that should be easier to review).

Base automatically changed from master to main March 8, 2021 20:19
@rjzamora
Copy link
Member Author

Closing this in favor of #7415 (I think the timeline/discussion here is becoming difficult to follow)

@rjzamora rjzamora closed this Mar 18, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants