Speed up reading Hive and Drill partitions#471
Conversation
c47fc5c to
4e78559
Compare
|
In fact, this change could/should also be propagated to https://github.com/dask/dask/blob/c69d10f97cadd06f147817f0084bddeadb8195e6/dask/dataframe/io/parquet/fastparquet.py#L30 |
Before this change, repeated key-value inserts where the main bottleneck in the initialization on large partitioned datasets. After this change, duplication is avoided by filtering repeats over columns sharing file paths, as well as over partitions sharing common parts.
Before this change, Python 2.7 tests were failing. Although, https://pythonclock.org/ suggests it may not be worth the effort.
Currently, the function is duplicated in `dask.dataframe.io.parquet.fastparquet`. After this change, the optimized implementation can be called from `dask` instead.
b7235b1 to
9db34ac
Compare
|
@rjzamora , you might find this interesting |
|
At a quick glance, this looks like an excellent change. I have not had change to look in detail, so will leave it open a little while longer in case anyone else has something to comment. |
|
@martindurant do you have any thoughts/suggestions? The only thing I can think of is to add atomic unit tests for the functions added in this PR, if you prefer. It's not urgent but I'd like to be able to install from |
|
If you have the chance, I would appreciate if you ran dask's parquet-related test suite against this change. I would still like to hear from @rjzamora , but I think we can merge this, assuming it's not a problem for dask. |
|
This looks great - Thanks @ig248 ! The parquet-related test suite is passing for me on my local machine with these changes. I completely agree that the |
|
Thanks for the approval, @rjzamora - I'll merge now. Hopefully someone has the chance to make the changes in Dask sometime. |
|
Thanks, @ig248 ! |
|
Thanks @rjzamora and @martindurant, I can look at propagating the change to |
Before this change, code was duplicated from fastparquet. After this change, an optimized function is imported from fastparquet following dask/fastparquet#471 Since fastparquet versions range is not explicit in dask, the import is for now made optional, reverting to existing implementation if using older fastparquet.
* Import optimized `fastparquet.api.paths_to_cats`. Before this change, code was duplicated from fastparquet. After this change, an optimized function is imported from fastparquet following dask/fastparquet#471 Since fastparquet versions range is not explicit in dask, the import is for now made optional, reverting to existing implementation if using older fastparquet.
Summary
Before this change, repeated key-value inserts in
ParquetFile._read_partitionswhere the main bottleneck in reading_metadatafor large partitioned datasets.After this change, duplication is avoided by efficiently filtering repeats over columns sharing file paths, as well as over common levels of partitions.
Background
I have tested this approach on a dataset of approx. 1.3e9 rows stored in 300+ partitions and 25000+ individual parquet files. Even with the benefit of the
_metadatafile,dask.dataframe.read_parquetcan take several minutes to initialize the read tasks.The obvious issue is that the majority of inserts in
ParquetFile._read_partitionsare redundant, degrading performance unnecesseraly.Here is an example line profile output:
The vast number of redundant calls to OrderedDict.setdefault().add() is responsible for >90% of the execution time.
Fix
Following the
itertoolsrecipe, we drop duplicates on-the-fly before inserting into theOrderedDict. This is done in two steps:The first step is shared between
hiveanddrillimplementations, while the second one is implemented similarly fordrill.Result
On the dataset above, the
_metadataread time improves from ~120s down to ~10s (slightly faster without line profiler). In fact, 60% of the run-time are now due toread_thrift.Aside
Reading the same
_metadatafile usingpyarrowtakes ~20-25s; the performance improvement introduced in this PR thus compares favourably to the alternative engine.