Support Blockwise-based IO for DataFrame collections#7042
Support Blockwise-based IO for DataFrame collections#7042
Conversation
|
|
||
|
|
||
| class CSVSubgraph(Mapping): | ||
| class CSVFunctionWrapper: |
There was a problem hiding this comment.
@jakirkham - This is the wrapper function I mentioned offline. Note that the__call__ definition needs to be smart enough to map the key to the data (even if trhe key was stringified by the scheduler).
There was a problem hiding this comment.
Got it. I think it's probably not difficult to have Dispatch support strs along with types. That would help us move away from custom objects a bit if that's causing us issues.
There was a problem hiding this comment.
We could also just have a dict of functions like we do with serialization
There was a problem hiding this comment.
The client may need to persist various objects in these FunctionWrapper functions to enable to worker to translate a key to an output chunk/partition. It feels like a unified dict of functions or Dispatch approach may get messy, but I agree it is worth exploring something like that to provide a clear template for future Blockwise-IO implementations.
dask/blockwise.py
Outdated
| io_deps: set (optional) | ||
| Set of "placeholder" collection names that will be generated within | ||
| this Blockwise layer. Since these collections do not actually exist | ||
| outside the layer, any key with a name in this set will be excluded | ||
| from the external dependencies. |
There was a problem hiding this comment.
It may be possible to remove the need for this, but it would require a more-significant change to the existing logic in Blockwise. It hink the IO-function wrappers would need to accept an integer (or tuple of integers), and would need to handle the fact that these args are not actually keys. It seemed simpler (and more intuitive) to pass actual keys to the IO-function wrappers, and simply use the io_deps set to make sure these keys are never treated as "real" dependencies.
|
Taking a brief look through this: In [1]: import dask.dataframe as dd
In [2]: df = dd.read_csv("/home/mrocklin/data/nyctaxi/yellow_tripdata_2019-01.csv")
In [3]: dict(df.dask)
Out[3]:
{('read-csv-138fcf1d3bba3d059e1b82eba08742cb', 0): (subgraph_callable,
('blockwise-io-read-csv-138fcf1d3bba3d059e1b82eba08742cb', 0)),
('read-csv-138fcf1d3bba3d059e1b82eba08742cb', 1): (subgraph_callable,
('blockwise-io-read-csv-138fcf1d3bba3d059e1b82eba08742cb', 1)),
('read-csv-138fcf1d3bba3d059e1b82eba08742cb', 2): (subgraph_callable,
('blockwise-io-read-csv-138fcf1d3bba3d059e1b82eba08742cb', 2)),
('read-csv-138fcf1d3bba3d059e1b82eba08742cb', 3): (subgraph_callable,
('blockwise-io-read-csv-138fcf1d3bba3d059e1b82eba08742cb', 3)),
('read-csv-138fcf1d3bba3d059e1b82eba08742cb', 4): (subgraph_callable,
('blockwise-io-read-csv-138fcf1d3bba3d059e1b82eba08742cb', 4)),
('read-csv-138fcf1d3bba3d059e1b82eba08742cb', 5): (subgraph_callable,
('blockwise-io-read-csv-138fcf1d3bba3d059e1b82eba08742cb', 5)),
('read-csv-138fcf1d3bba3d059e1b82eba08742cb', 6): (subgraph_callable,
('blockwise-io-read-csv-138fcf1d3bba3d059e1b82eba08742cb', 6)),
('read-csv-138fcf1d3bba3d059e1b82eba08742cb', 7): (subgraph_callable,
('blockwise-io-read-csv-138fcf1d3bba3d059e1b82eba08742cb', 7)),
('read-csv-138fcf1d3bba3d059e1b82eba08742cb', 8): (subgraph_callable,
('blockwise-io-read-csv-138fcf1d3bba3d059e1b82eba08742cb', 8)),
('read-csv-138fcf1d3bba3d059e1b82eba08742cb', 9): (subgraph_callable,
('blockwise-io-read-csv-138fcf1d3bba3d059e1b82eba08742cb', 9)),
('read-csv-138fcf1d3bba3d059e1b82eba08742cb', 10): (subgraph_callable,
('blockwise-io-read-csv-138fcf1d3bba3d059e1b82eba08742cb', 10))}I'll admit that it would be nice some day to remove I'm also curious about In [4]: df.dask
Out[4]: <dask.highlevelgraph.HighLevelGraph at 0x7f5565472c70>
In [5]: df.dask.layers
Out[5]: ---------------------------------------------------------------------------
AttributeError Traceback (most recent call last)
~/miniconda/lib/python3.8/site-packages/IPython/core/formatters.py in __call__(self, obj)
700 type_pprinters=self.type_printers,
701 deferred_pprinters=self.deferred_printers)
--> 702 printer.pretty(obj)
703 printer.flush()
704 return stream.getvalue()
~/miniconda/lib/python3.8/site-packages/IPython/lib/pretty.py in pretty(self, obj)
375 if cls in self.type_pprinters:
376 # printer registered in self.type_pprinters
--> 377 return self.type_pprinters[cls](obj, self, cycle)
378 else:
379 # deferred printer
~/miniconda/lib/python3.8/site-packages/IPython/lib/pretty.py in inner(obj, p, cycle)
605 p.pretty(key)
606 p.text(': ')
--> 607 p.pretty(obj[key])
608 p.end_group(step, end)
609 return inner
~/miniconda/lib/python3.8/site-packages/IPython/lib/pretty.py in pretty(self, obj)
392 if cls is not object \
393 and callable(cls.__dict__.get('__repr__')):
--> 394 return _repr_pprint(obj, self, cycle)
395
396 return _default_pprint(obj, self, cycle)
~/miniconda/lib/python3.8/site-packages/IPython/lib/pretty.py in _repr_pprint(obj, p, cycle)
698 """A pprint that just redirects to the normal repr function."""
699 # Find newlines and replace them with p.break_()
--> 700 output = repr(obj)
701 lines = output.splitlines()
702 with p.group():
~/workspace/dask/dask/dataframe/io/csv.py in __repr__(self)
166 def __repr__(self):
167 return "BlockwiseReadCSV<name='{}', n_parts={}, columns={}>".format(
--> 168 self.name, len(self.blocks), list(self.columns)
169 )
170
AttributeError: 'BlockwiseReadCSV' object has no attribute 'columns'Small bug. But let's go in and find a single task list(list(dict(df.dask).values())[0][0].dsk.values())[0][0].__dict__
Out[16]:
{'name': 'blockwise-io-read-csv-138fcf1d3bba3d059e1b82eba08742cb',
'reader': <function pandas.io.parsers.read_csv(filepath_or_buffer: Union[str, pathlib.Path, IO[~AnyStr]], sep=',', delimiter=None, header='infer', names=None, index_col=None, usecols=None, squeeze=False, prefix=None, mangle_dupe_cols=True, dtype=None, engine=None, converters=None, true_values=None, false_values=None, skipinitialspace=False, skiprows=None, skipfooter=0, nrows=None, na_values=None, keep_default_na=True, na_filter=True, verbose=False, skip_blank_lines=True, parse_dates=False, infer_datetime_format=False, keep_date_col=False, date_parser=None, dayfirst=False, cache_dates=True, iterator=False, chunksize=None, compression='infer', thousands=None, decimal: str = '.', lineterminator=None, quotechar='"', quoting=0, doublequote=True, escapechar=None, comment=None, encoding=None, dialect=None, error_bad_lines=True, warn_bad_lines=True, delim_whitespace=False, low_memory=True, memory_map=False, float_precision=None)>,
'blocks': ((<function dask.bytes.core.read_block_from_file(lazy_file, off, bs, delimiter)>,
<OpenFile '/home/mrocklin/data/nyctaxi/yellow_tripdata_2019-01.csv'>,
0,
64000000,
b'\n'),
(<function dask.bytes.core.read_block_from_file(lazy_file, off, bs, delimiter)>,
<OpenFile '/home/mrocklin/data/nyctaxi/yellow_tripdata_2019-01.csv'>,
64000000,
64000000,
b'\n'),
(<function dask.bytes.core.read_block_from_file(lazy_file, off, bs, delimiter)>,
<OpenFile '/home/mrocklin/data/nyctaxi/yellow_tripdata_2019-01.csv'>,
128000000,
64000000,
b'\n'),
(<function dask.bytes.core.read_block_from_file(lazy_file, off, bs, delimiter)>,
<OpenFile '/home/mrocklin/data/nyctaxi/yellow_tripdata_2019-01.csv'>,
192000000,
64000000,
b'\n'),
(<function dask.bytes.core.read_block_from_file(lazy_file, off, bs, delimiter)>,
<OpenFile '/home/mrocklin/data/nyctaxi/yellow_tripdata_2019-01.csv'>,
256000000,
64000000,
b'\n'),
(<function dask.bytes.core.read_block_from_file(lazy_file, off, bs, delimiter)>,
<OpenFile '/home/mrocklin/data/nyctaxi/yellow_tripdata_2019-01.csv'>,
320000000,
64000000,
b'\n'),
(<function dask.bytes.core.read_block_from_file(lazy_file, off, bs, delimiter)>,
<OpenFile '/home/mrocklin/data/nyctaxi/yellow_tripdata_2019-01.csv'>,
384000000,
64000000,
b'\n'),
(<function dask.bytes.core.read_block_from_file(lazy_file, off, bs, delimiter)>,
<OpenFile '/home/mrocklin/data/nyctaxi/yellow_tripdata_2019-01.csv'>,
448000000,
64000000,
b'\n'),
(<function dask.bytes.core.read_block_from_file(lazy_file, off, bs, delimiter)>,
<OpenFile '/home/mrocklin/data/nyctaxi/yellow_tripdata_2019-01.csv'>,
512000000,
64000000,
b'\n'),
(<function dask.bytes.core.read_block_from_file(lazy_file, off, bs, delimiter)>,
<OpenFile '/home/mrocklin/data/nyctaxi/yellow_tripdata_2019-01.csv'>,
576000000,
64000000,
b'\n'),
(<function dask.bytes.core.read_block_from_file(lazy_file, off, bs, delimiter)>,
<OpenFile '/home/mrocklin/data/nyctaxi/yellow_tripdata_2019-01.csv'>,
640000000,
64000000,
b'\n')),
'is_first': (True,
False,
False,
False,
False,
False,
False,
False,
False,
False,
False),
'head': VendorID tpep_pickup_datetime tpep_dropoff_datetime passenger_count ... tolls_amount improvement_surcharge total_amount congestion_surcharge
0 1 2019-01-01 00:46:40 2019-01-01 00:53:20 1 ... 0.0 0.3 9.95 NaN
1 1 2019-01-01 00:59:47 2019-01-01 01:18:59 1 ... 0.0 0.3 16.30 NaN
2 2 2018-12-21 13:48:30 2018-12-21 13:52:40 3 ... 0.0 0.3 5.80 NaN
3 2 2018-11-28 15:52:25 2018-11-28 15:55:45 5 ... 0.0 0.3 7.55 NaN
4 2 2018-11-28 15:56:57 2018-11-28 15:58:33 5 ... 0.0 0.3 55.55 NaN
... ... ... ... ... ... ... ... ... ...
2824 2 2019-01-01 00:34:49 2019-01-01 00:39:23 1 ... 0.0 0.3 6.30 NaN
2825 2 2019-01-01 00:41:40 2019-01-01 00:53:26 1 ... 0.0 0.3 13.57 NaN
2826 2 2019-01-01 00:26:13 2019-01-01 00:42:15 1 ... 0.0 0.3 16.64 NaN
2827 2 2019-01-01 00:48:40 2019-01-01 01:00:40 1 ... 0.0 0.3 12.50 NaN
2828 1 2019-01-01 00:16:55 2019-01-01 00:26:08 2 ... 0.0 0.3 11.00 NaN
[2829 rows x 18 columns],
'header': b'VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge\r\n',
'kwargs': {},
'dtypes': {'VendorID': dtype('int64'),
'tpep_pickup_datetime': dtype('O'),
'tpep_dropoff_datetime': dtype('O'),
'passenger_count': dtype('int64'),
'trip_distance': dtype('float64'),
'RatecodeID': dtype('int64'),
'store_and_fwd_flag': dtype('O'),
'PULocationID': dtype('int64'),
'DOLocationID': dtype('int64'),
'payment_type': dtype('int64'),
'fare_amount': dtype('float64'),
'extra': dtype('float64'),
'mta_tax': dtype('float64'),
'tip_amount': dtype('float64'),
'tolls_amount': dtype('float64'),
'improvement_surcharge': dtype('float64'),
'total_amount': dtype('float64'),
'congestion_surcharge': dtype('float64')},
'columns': ['VendorID',
'tpep_pickup_datetime',
'tpep_dropoff_datetime',
'passenger_count',
'trip_distance',
'RatecodeID',
'store_and_fwd_flag',
'PULocationID',
'DOLocationID',
'payment_type',
'fare_amount',
'extra',
'mta_tax',
'tip_amount',
'tolls_amount',
'improvement_surcharge',
'total_amount',
'congestion_surcharge'],
'enforce': False,
'colname': None,
'paths': None}Two things concern me here:
So in general I think that while this may have removed some complexity from the Blockwise code, it buried into each task. I'm not sure that this is an improvement. |
Right - For a single IO operation, there is no reason to use
Note that these keys also existed before this PR, but they needed to be surgically replaced with the underlying IO-function arguments at graph-construction time. This turned out to be a nightmare in distributed, since the argument list was being serialized on the client, but the graph needed to be materialized on the scheduler.
Oops - Fixed.
Sure, but it seems pretty difficult to achieve that after Blockwise fusion. Are you suggesting that a standalone IO operation should be constructed using different logic than other kinds of
Not sure I follow. Every task includes a callable object that knows how to interpret a key (actually the index component of the key) and return a partition. Therefore, that callable object just needs to embed the input arguments that would already need to exist as arguments in the task.
I'm having a hard time following how this is not an improvement over |
Ah, yeah that makes sense.
That callable object seems to know about all possible keys. If the collection has many keys then that object is going to be very large. If our collection has a million keys then the scheduler will have a million serialized versions of callable objects, each holding information about a million keys. |
Okay - Sorry. You are definitely right. It seems that the only way to avoid this is to move back to the "injection-at-graph-construction-time" approach for some of the arguments. Instead of doing it for "all" arguments, however, we can do it only for the subset of simple msgpack-serializable args that vary between partitions (anything that is the same in all partitions should be embedded in the function wrapper). |
|
In the meantime, maybe we should revert the previous change that caused
things to fail so that we can release and get people back on track?
…On Fri, Jan 8, 2021 at 10:58 AM Richard (Rick) Zamora < ***@***.***> wrote:
That callable object seems to know about all possible keys. If the
collection has many keys then that object is going to be very large. If our
collection has a million keys then the scheduler will have a million
serialized versions of callable objects, each holding information about a
million keys.
Okay - Sorry. You are definitely right. It seems that the only way to
avoid this is to move back to the "injection-at-graph-construction-time"
approach for *some* of the arguments. Instead of doing it for "all"
arguments, however, we can do it only for the subset of simple
msgpack-serializable args that vary between partitions (anything that is
the same in all partitions should be embedded in the function wrapper).
—
You are receiving this because you commented.
Reply to this email directly, view it on GitHub
<#7042 (comment)>, or
unsubscribe
<https://github.com/notifications/unsubscribe-auth/AACKZTB6B5O3YVDWBCFJXXLSY5IUVANCNFSM4VZSKRQQ>
.
|
Okay - The Blockwise-IO changes were introduced through several PRs dating back to early November. I will just put together a temporary change to remove [EDIT: I opened #7048 to revert the existing BlockwiseIO logic, so we can be more patient/careful about this PR] |
| for idx in itertools.product(*[range(len(c)) for c in chunks]): | ||
| ndim = len(chunks) | ||
| chunk = tuple(chunks[i][idx[i]] for i in range(ndim)) | ||
| chunk_key_map[tuple(idx)] = chunk |
There was a problem hiding this comment.
Sorry for the very slow look at this @rjzamora! One thing I'm struggling to understand here: the previous implementation of BlockwiseCreateArray produced a mapping between chunk keys and their shapes lazily, allowing a much smaller footprint for large arrays. If I understand this correctly, this now goes back to keeping all of the keys and chunk shapes around.
Now, I could envision this (and similar Layers) packing/unpacking the data needed to create this keymap so that it can be done lazily. Is that where you see this going longer-term?
There was a problem hiding this comment.
Yeah - It seems that I was going out of the way to avoid passing chunks to every task (required for lazy chunk resolution), but I don't think that priority makes any sense. We should be able to embed chunks in a simple function wrapper, and then use a similar logic as before.
There was a problem hiding this comment.
Yeah, that makes sense to me, chunks takes up a lot less space than the fully expanded mapping.
Going further -- this implementation only works for uniform functions (ones, zeros, etc), but I have some work-in-progress with the old BlockwiseIO setup that takes into account the position of each chunk. In that case that data will also need to be constructed on the fly. I don't see it as a major impediment with the way you have set up things here, though.
There was a problem hiding this comment.
One more thought: we should add some basic sanity tests ensuring that all the layer implementations in dask/dask can be serialized so we don't get into trouble again :)
I'd be happy to do that in a follow-up, or make a PR to your branch here.
There was a problem hiding this comment.
chunks takes up a lot less space than the fully expanded mapping.
It is worth considering that the expanded mapping does not need to be duplicated in every task when the graph is materialized, but chunks does need to be duplicated. Therefore, we may actually have an even larger memory footprint with chunk_key_map removed.
EDIT: Yeah - Embedding chunks in every task was not a scalable solution. I had to revert 2234ed8
One more thought: we should add some basic sanity tests ensuring that all the layer implementations in dask/dask can be serialized so we don't get into trouble again :)
Yeah - It's actually a bit silly that this is not the first thing I did in this PR :)
There was a problem hiding this comment.
Although this "func" approach seems to work, it is only a minor optimization. It will also require a bit more work to add a layer of protection to ensure that the user cannot relpace the callable object with arbitrary python code (a security vulnerability). Therefore, I suggest we attack this in a separate/follow-up PR
There was a problem hiding this comment.
I think we have been dancing around just implementing some extra logic in the __dask_distributed_{pack, unpack}__ for BlockwiseCreateArray. If we store the chunks at the top level, pack and unpack them simply, then produce the key map as needed, we could satisfy your constraint to not duplicate the chunks on every task, as well as mine to not send a mostly-materialized layer across the wire.
If we did that, we could also implement whatever gating/checks would be necessary to make sure only allowed "func"s get expanded on the scheduler. Or am I misunderstanding what you mean @rjzamora?
There was a problem hiding this comment.
Thanks @ian-r-rose - I don't think you are misunderstanding. __dask_distributed_{pack, unpack}__ is certainly the most promising place to handle dynamic chunk generation. The mechanism itself is very simple, but the gating/check is what I am not quite sure about yet (and have not prototyped anything yet).
Note that I opened #7281 with the intention to hash out these blockwise/array basics before adding DataFrame to the mix. At first, I was thinking that we could start with the "mostly-materialized" layer approach, but I can probably revise that PR with a new proposal for dynamic chunk materialization tomorrow.
There was a problem hiding this comment.
Thanks @rjzamora, that makes sense to me, I may have some bandwidth this week to look at dynamic chunk materialization if that would be helpful
There was a problem hiding this comment.
Cool - I'm definitely interested to get your thoughts/ideas.
The main challenge is that a BlockwiseCreateArray will often become a vanilla Blockwise Layer after optimize_blockwise is called, so we cannot simply rely on code in BlockwiseCreateArray.__dask_distributed_{pack, unpack}__. To understand why, consider the case where two arrays are generated with different creation techniques (with one possibly requiring non-trivial io_deps). If the two arrays are combined with a blockwise addition operation, optimize_blockwise will fuse the three layers together, and the final layer type cannot always be BlockwiseCreateArray.
Since we are already planning to support the io_deps attribute for all Blockwise layers, it would be nice to use the same structure to specify dynamic "unpacking" options. For example, we may be able to register a list of options (e.g "generate_chunk") that will map onto "cleared" functions known by the scheduler:
# Dynamic `io_deps` generation function
def generate_chunk(idx, chunks):
ndim = len(chunks)
return tuple(chunks[i][idx[i]] for i in range(ndim))
...
# "registered" function dict
io_dep_functions = {
"generate_chunk" : generate_chunk,
...
}
# Example `io_deps` definition.
# Scheduler would know to call `generate_chunk(idx, ((4, 4, 2), (4, 4, 2)))`
# to construct the input for each chunk/index of 'ones-4627d3bbed71139f53cd1f686350e412'
io_deps = {'ones-4627d3bbed71139f53cd1f686350e412': {"generate_chunk": ((4, 4, 2), (4, 4, 2))}}|
@rjzamora it seems like this won't make it into this release, but do you want to try to get it into the next one? |
Totally agree that it cannot get into this weeks release. Yes - I'll make it a personal priority to get it merged before the next release :) |
|
actually with the released bumped to next week it might make it in :) |
|
@jrbourbeau - I know we discussed the possibility of removing the With the above in mind - I would be happy to break out a simple PR with the the blockwise.py and array-only changes (to capture the key Blockwise + IO foundations). That PR could be merged first, and then this PR could be applied on top of it with the DataFrame-focused changes. Does that sound reasonable? |
| shape, | ||
| chunks, | ||
| ): | ||
| self.name = name |
There was a problem hiding this comment.
Is there any benefit in this? Will there ever be a case of self.name != self.output?
There was a problem hiding this comment.
No, the name is only saved here for the __repr__ - I'd be happy to remove it if you think the parent Blockwise.__repr__ is sufficient (perhaps we just add information about io_deps in that method?).
If you have the bandwidth @crusaderky, I'd be very interested to get your thoughts on #7281 (a smaller subset of this PR that should be easier to review).
|
Closing this in favor of #7415 (I think the timeline/discussion here is becoming difficult to follow) |
Blocked by #7281
DataFrameLayer,DataFrameIOLayer, andblockwise_io_layerto simplify the construction ofBlockwise-based DataFrame layers for IO. IntroducingDataFrameLayeralso generalizes the existingread_parquet/getitemoptimization. Thegetitemoptimization now works forread_parquet,read_orcandread_csv(and can be extended to other IO operations that can target a subset of columns).read_parquetto useBlockwise.read_orcto useBlockwise.read_csvto useBlockwise.