Skip to content

Speed up reading Hive and Drill partitions#471

Merged
martindurant merged 3 commits intodask:masterfrom
ig248:read_partitions_speed_up
Jan 22, 2020
Merged

Speed up reading Hive and Drill partitions#471
martindurant merged 3 commits intodask:masterfrom
ig248:read_partitions_speed_up

Conversation

@ig248
Copy link

@ig248 ig248 commented Dec 31, 2019

Summary

Before this change, repeated key-value inserts in ParquetFile._read_partitions where the main bottleneck in reading _metadata for large partitioned datasets.

After this change, duplication is avoided by efficiently filtering repeats over columns sharing file paths, as well as over common levels of partitions.

Background

I have tested this approach on a dataset of approx. 1.3e9 rows stored in 300+ partitions and 25000+ individual parquet files. Even with the benefit of the _metadata file, dask.dataframe.read_parquet can take several minutes to initialize the read tasks.

The obvious issue is that the majority of inserts in ParquetFile._read_partitions are redundant, degrading performance unnecesseraly.

Here is an example line profile output:

>>> %lprun -m fastparquet.api pf = ParquetFile(root + "/_metadata", open_with=fsspec.open)

Total time: 122.152 s
File: /home/igor/checkouts/third_party/fastparquet/fastparquet/api.py
Function: __init__ at line 87

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
    87                                               def __init__(self, fn, verify=False, open_with=default_open,
    88                                                            root=False, sep=None):
    89         1          4.0      4.0      0.0          if isinstance(fn, (tuple, list)):
    90                                                       basepath, fmd = metadata_from_many(fn, verify_schema=verify,
    91                                                                                          open_with=open_with, root=root)
    92                                                       if basepath:
    93                                                           self.fn = join_path(basepath, '_metadata')  # effective file
    94                                                       else:
    95                                                           self.fn = '_metadata'
    96                                                       self.fmd = fmd
    97                                                       self._set_attrs()
    98         1          2.0      2.0      0.0          elif hasattr(fn, 'read'):
    99                                                       # file-like
   100                                                       self._parse_header(fn, verify)
   101                                                       if self.file_scheme not in ['simple', 'empty']:
   102                                                           raise ValueError('Cannot use file-like input '
   103                                                                            'with multi-file data')
   104                                                       open_with = lambda *args, **kwargs: fn
   105                                                       self.fn = None
   106                                                   else:
   107         1          2.0      2.0      0.0              try:
   108         1        186.0    186.0      0.0                  fn2 = join_path(fn, '_metadata')
   109         1          3.0      3.0      0.0                  self.fn = fn2
   110         1        619.0    619.0      0.0                  with open_with(fn2, 'rb') as f:
   111                                                               self._parse_header(f, verify)
   112                                                           fn = fn2
   113         1          3.0      3.0      0.0              except (IOError, OSError):
   114         1        159.0    159.0      0.0                  self.fn = join_path(fn)
   115         1        469.0    469.0      0.0                  with open_with(fn, 'rb') as f:
   116         1  122151018.0 122151018.0    100.0                      self._parse_header(f, verify)
   117         1          3.0      3.0      0.0          self.open = open_with
   118         1          2.0      2.0      0.0          self.sep = sep
   119         1          2.0      2.0      0.0          self._statistics = None

Total time: 122.151 s
File: /home/igor/checkouts/third_party/fastparquet/fastparquet/api.py
Function: _parse_header at line 121

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
   121                                               def _parse_header(self, f, verify=True):
   122         1          1.0      1.0      0.0          try:
   123         1          7.0      7.0      0.0              f.seek(0)
   124         1          0.0      0.0      0.0              if verify:
   125                                                           assert f.read(4) == b'PAR1'
   126         1          7.0      7.0      0.0              f.seek(-8, 2)
   127         1     182567.0 182567.0      0.1              head_size = struct.unpack('<i', f.read(4))[0]
   128         1          2.0      2.0      0.0              if verify:
   129                                                           assert f.read() == b'PAR1'
   130                                                   except (AssertionError, struct.error):
   131                                                       raise ParquetException('File parse failed: %s' % self.fn)
   132                                           
   133         1          9.0      9.0      0.0          f.seek(-(head_size+8), 2)
   134         1          1.0      1.0      0.0          try:
   135         1    6953325.0 6953325.0      5.7              fmd = read_thrift(f, parquet_thrift.FileMetaData)
   136                                                   except Exception:
   137                                                       raise ParquetException('Metadata parse failed: %s' %
   138                                                                              self.fn)
   139         1          3.0      3.0      0.0          self.head_size = head_size
   140         1          2.0      2.0      0.0          self.fmd = fmd
   141         1  115015014.0 115015014.0     94.2          self._set_attrs()

Total time: 115.014 s
File: /home/igor/checkouts/third_party/fastparquet/fastparquet/api.py
Function: _set_attrs at line 143

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
   143                                               def _set_attrs(self):
   144         1          2.0      2.0      0.0          fmd = self.fmd
   145         1          2.0      2.0      0.0          self.version = fmd.version
   146         1          2.0      2.0      0.0          self._schema = fmd.schema
   147         1          2.0      2.0      0.0          self.row_groups = fmd.row_groups or []
   148         1          2.0      2.0      0.0          self.key_value_metadata = {k.key: k.value
   149         1          5.0      5.0      0.0                                     for k in fmd.key_value_metadata or []}
   150         1          2.0      2.0      0.0          self.created_by = fmd.created_by
   151         1        200.0    200.0      0.0          self.schema = schema.SchemaHelper(self._schema)
   152         1          6.0      6.0      0.0          self.selfmade = self.created_by.split(' ', 1)[0] == "fastparquet-python" if self.created_by is not None else False
   153         1          2.0      2.0      0.0          files = [rg.columns[0].file_path
   154         1      31262.0  31262.0      0.0                   for rg in self.row_groups
   155                                                            if rg.columns]
   156         1     225739.0 225739.0      0.2          self.file_scheme = get_file_scheme(files)
   157         1  113189127.0 113189127.0     98.4          self._read_partitions()
   158         1    1568123.0 1568123.0      1.4          self._dtypes()

Total time: 109.535 s
File: /home/igor/checkouts/third_party/fastparquet/fastparquet/api.py
Function: _read_partitions at line 178

Line #      Hits         Time  Per Hit   % Time  Line Contents
==============================================================
   178                                               def _read_partitions(self):
   179         1          3.0      3.0      0.0          if self.file_scheme in ['simple', 'flat', 'other']:
   180                                                       self.cats = {}
   181                                                       return
   182         1          3.0      3.0      0.0          cats = OrderedDict()
   183         1          2.0      2.0      0.0          raw_cats = OrderedDict()
   184     25335      63982.0      2.5      0.1          for rg in self.row_groups:
   185    177338     428572.0      2.4      0.4              for col in rg.columns:
   186    152004    1150097.0      7.6      1.0                  s = ex_from_sep('/')
   187    152004     409148.0      2.7      0.4                  path = col.file_path or ""
   188    152004     387555.0      2.5      0.4                  if self.file_scheme == 'hive':
   189    152004    1310294.0      8.6      1.2                      partitions = s.findall(path)
   190    304008     808723.0      2.7      0.7                      for key, val in partitions:
   191    152004  104205128.0    685.5     95.1                          cats.setdefault(key, set()).add(val_to_num(val))
   192    152004     770878.0      5.1      0.7                          raw_cats.setdefault(key, set()).add(val)
   193                                                           else:
   194                                                               for i, val in enumerate(col.file_path.split('/')[:-1]):
   195                                                                   key = 'dir%i' % i
   196                                                                   cats.setdefault(key, set()).add(val_to_num(val))
   197                                                                   raw_cats.setdefault(key, set()).add(val)
   198                                           
   199         2          7.0      3.5      0.0          for key, v in cats.items():
   200                                                       # Check that no partition names map to the same value after transformation by val_to_num
   201         1          3.0      3.0      0.0              raw = raw_cats[key]
   202         1          4.0      4.0      0.0              if len(v) != len(raw):
   203                                                           conflicts_by_value = OrderedDict()
   204                                                           for raw_val in raw_cats[key]:
   205                                                               conflicts_by_value.setdefault(val_to_num(raw_val), set()).add(raw_val)
   206                                                           conflicts = [c for k in conflicts_by_value.values() if len(k) > 1 for c in k]
   207                                                           raise ValueError("Partition names map to the same value: %s" % conflicts)
   208         1        905.0    905.0      0.0              vals_by_type = groupby_types(v)
   209                                           
   210                                                       # Check that all partition names map to the same type after transformation by val_to_num
   211         1          2.0      2.0      0.0              if len(vals_by_type) > 1:
   212                                                           examples = [x[0] for x in vals_by_type.values()]
   213                                                           warnings.warn("Partition names coerce to values of different types, e.g. %s" % examples)
   214                                           
   215         1          3.0      3.0      0.0          self.cats = OrderedDict([(key, list(v))
   216         1         22.0     22.0      0.0                                  for key, v in cats.items()])

The vast number of redundant calls to OrderedDict.setdefault().add() is responsible for >90% of the execution time.

Fix

Following the itertools recipe, we drop duplicates on-the-fly before inserting into the OrderedDict. This is done in two steps:

  1. Drop duplicate paths arising from columns in the same row group generally sharing the same file path
  2. Drop duplicate key/value pairs arising from multiple files sharing the same partition

The first step is shared between hive and drill implementations, while the second one is implemented similarly for drill.

Result

On the dataset above, the _metadata read time improves from ~120s down to ~10s (slightly faster without line profiler). In fact, 60% of the run-time are now due to read_thrift.

Aside

Reading the same _metadata file using pyarrow takes ~20-25s; the performance improvement introduced in this PR thus compares favourably to the alternative engine.

@ig248 ig248 changed the title Speed up reading Hive and Dill partitions Speed up reading Hive and Drill partitions Dec 31, 2019
@ig248 ig248 force-pushed the read_partitions_speed_up branch from c47fc5c to 4e78559 Compare December 31, 2019 17:18
@ig248
Copy link
Author

ig248 commented Jan 2, 2020

ig248 added 3 commits January 2, 2020 13:05
Before this change, repeated key-value inserts where  the main bottleneck in the initialization on large partitioned datasets. After this change, duplication is avoided by filtering repeats over columns sharing file paths, as well as over partitions sharing common parts.
Before this change, Python 2.7 tests were failing. Although, https://pythonclock.org/ suggests it may not be worth the effort.
Currently, the function is duplicated in `dask.dataframe.io.parquet.fastparquet`. After this change, the optimized implementation can be called from `dask` instead.
@ig248 ig248 force-pushed the read_partitions_speed_up branch from b7235b1 to 9db34ac Compare January 2, 2020 13:06
@martindurant
Copy link
Member

@rjzamora , you might find this interesting

@martindurant
Copy link
Member

At a quick glance, this looks like an excellent change. I have not had change to look in detail, so will leave it open a little while longer in case anyone else has something to comment.

@ig248
Copy link
Author

ig248 commented Jan 22, 2020

@martindurant do you have any thoughts/suggestions? The only thing I can think of is to add atomic unit tests for the functions added in this PR, if you prefer. It's not urgent but I'd like to be able to install from master...

@martindurant
Copy link
Member

If you have the chance, I would appreciate if you ran dask's parquet-related test suite against this change. I would still like to hear from @rjzamora , but I think we can merge this, assuming it's not a problem for dask.

@rjzamora
Copy link
Member

This looks great - Thanks @ig248 !

The parquet-related test suite is passing for me on my local machine with these changes. I completely agree that the paths_to_cats changes should definitely be propagated to the copied code in dask :)

@martindurant
Copy link
Member

Thanks for the approval, @rjzamora - I'll merge now. Hopefully someone has the chance to make the changes in Dask sometime.

@martindurant martindurant merged commit 0402257 into dask:master Jan 22, 2020
@martindurant
Copy link
Member

Thanks, @ig248 !

@ig248
Copy link
Author

ig248 commented Jan 22, 2020

Thanks @rjzamora and @martindurant, I can look at propagating the change to dask

@ig248 ig248 deleted the read_partitions_speed_up branch January 22, 2020 15:02
ig248 added a commit to ig248/dask that referenced this pull request Jan 22, 2020
Before this change, code was duplicated from fastparquet.
After this change, an optimized function is imported from fastparquet following
dask/fastparquet#471

Since fastparquet versions range is not explicit in dask,
the import is for now made optional, reverting to existing implementation if
using older fastparquet.
TomAugspurger pushed a commit to dask/dask that referenced this pull request Feb 5, 2020
* Import optimized `fastparquet.api.paths_to_cats`.

Before this change, code was duplicated from fastparquet.
After this change, an optimized function is imported from fastparquet following
dask/fastparquet#471

Since fastparquet versions range is not explicit in dask,
the import is for now made optional, reverting to existing implementation if
using older fastparquet.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants