Skip to content

[parquet] Is filters supposed to work in conjunction with partition_on? #7079

@rubenvdg

Description

@rubenvdg

The following toy example often (not always!) throws a ValueError: Categorical categories must be unique:

import dask.dataframe as dd
import pandas as pd
import pyarrow as pa
import numpy as np
from dask.distributed import LocalCluster, Client

cluster = LocalCluster(n_workers=2)
client = Client(cluster)

ddf = dd.from_pandas(
    pd.DataFrame(
        columns=["a", "b", "c"],
        data=np.random.randint(1, 10, size=(1000, 3))
    ),
    npartitions=3
)

ddf.to_parquet(
    "test/",
    overwrite=True,
    partition_on=["a", "b"],
    engine="pyarrow-dataset",
    schema={"a": pa.int64(), "b": pa.int64(), "c": pa.int64()}
)

dd.read_parquet(
    "test/",
    engine="pyarrow-dataset",
    filters=[
        ("a", "==", 1),  # also tried with "="
    ]
).compute()
  • Without the partition_on in to_parquet, it works without any problems.
  • If you change the engine from pyarrow-dataset to pyarrow in dd.read_parquet, it works without any problems.

Dask: 2020.12.0
PyArrow: 2.0.0
Python: 3.8.6
pandas: 1.2.0
OS: MacOS Big Sur

Full stacktrace:

---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
<ipython-input-1-e5f4bae2eb86> in <module>
     24 )
     25 
---> 26 dd.read_parquet(
     27     "test/",
     28     engine="pyarrow-dataset",

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
    277         dask.base.compute
    278         """
--> 279         (result,) = compute(self, traverse=False, **kwargs)
    280         return result
    281 

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
    565         postcomputes.append(x.__dask_postcompute__())
    566 
--> 567     results = schedule(dsk, keys, **kwargs)
    568     return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
    569 

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
   2674                     should_rejoin = False
   2675             try:
-> 2676                 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
   2677             finally:
   2678                 for f in futures.values():

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
   1983             else:
   1984                 local_worker = None
-> 1985             return self.sync(
   1986                 self._gather,
   1987                 futures,

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
    829             return future
    830         else:
--> 831             return sync(
    832                 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
    833             )

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
    338     if error[0]:
    339         typ, exc, tb = error[0]
--> 340         raise exc.with_traceback(tb)
    341     else:
    342         return result[0]

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/distributed/utils.py in f()
    322             if callback_timeout is not None:
    323                 future = asyncio.wait_for(future, callback_timeout)
--> 324             result[0] = yield future
    325         except Exception as exc:
    326             error[0] = sys.exc_info()

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/tornado/gen.py in run(self)
    760 
    761                     try:
--> 762                         value = future.result()
    763                     except Exception:
    764                         exc_info = sys.exc_info()

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
   1848                             exc = CancelledError(key)
   1849                         else:
-> 1850                             raise exception.with_traceback(traceback)
   1851                         raise exc
   1852                     if errors == "skip":

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/dask/optimization.py in __call__()
    961         if not len(args) == len(self.inkeys):
    962             raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 963         return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
    964 
    965     def __reduce__(self):

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/dask/core.py in get()
    149     for key in toposort(dsk):
    150         task = dsk[key]
--> 151         result = _execute_task(task, cache)
    152         cache[key] = result
    153     result = _execute_task(out, cache)

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/dask/core.py in _execute_task()
    119         # temporaries by their reference count and can execute certain
    120         # operations in-place.
--> 121         return func(*(_execute_task(a, cache) for a in args))
    122     elif not ishashable(arg):
    123         return arg

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/dask/blockwise.py in __call__()
     29             return None
     30         if isinstance(args, (tuple, list)):
---> 31             return self.func(*args)
     32         else:
     33             return self.func(args)

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py in read_parquet_part()
    381 
    382     if isinstance(part, list):
--> 383         dfs = [func(fs, rg, columns.copy(), index, **kwargs) for rg in part]
    384         df = concat(dfs, axis=0)
    385     else:

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py in <listcomp>()
    381 
    382     if isinstance(part, list):
--> 383         dfs = [func(fs, rg, columns.copy(), index, **kwargs) for rg in part]
    384         df = concat(dfs, axis=0)
    385     else:

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/dask/dataframe/io/parquet/arrow.py in read_partition()
    612                     arrow_table = arrow_table.append_column(partition.name, arr)
    613 
--> 614         df = cls._arrow_table_to_pandas(arrow_table, categories, **kwargs)
    615 
    616         # For pyarrow.dataset api, need to convert partition columns

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/dask/dataframe/io/parquet/arrow.py in _arrow_table_to_pandas()
   1554         _kwargs.update({"use_threads": False, "ignore_metadata": False})
   1555 
-> 1556         return arrow_table.to_pandas(categories=categories, **_kwargs)
   1557 
   1558     @classmethod

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/pyarrow/array.pxi in pyarrow.lib._PandasConvertible.to_pandas()
    740             self_destruct=self_destruct
    741         )
--> 742         return self._to_pandas(options, categories=categories,
    743                                ignore_metadata=ignore_metadata,
    744                                types_mapper=types_mapper)

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/pyarrow/table.pxi in pyarrow.lib.Table._to_pandas()
   1581                    types_mapper=None):
   1582         from pyarrow.pandas_compat import table_to_blockmanager
-> 1583         mgr = table_to_blockmanager(
   1584             options, self, categories,
   1585             ignore_metadata=ignore_metadata,

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/pyarrow/pandas_compat.py in table_to_blockmanager()
    786     _check_data_column_metadata_consistency(all_columns)
    787     columns = _deserialize_column_index(table, all_columns, column_indexes)
--> 788     blocks = _table_to_blocks(options, table, categories, ext_columns_dtypes)
    789 
    790     axes = [columns, index]

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/pyarrow/pandas_compat.py in _table_to_blocks()
   1127     result = pa.lib.table_to_blocks(options, block_table, categories,
   1128                                     list(extension_columns.keys()))
-> 1129     return [_reconstruct_block(item, columns, extension_columns)
   1130             for item in result]
   1131 

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/pyarrow/pandas_compat.py in <listcomp>()
   1127     result = pa.lib.table_to_blocks(options, block_table, categories,
   1128                                     list(extension_columns.keys()))
-> 1129     return [_reconstruct_block(item, columns, extension_columns)
   1130             for item in result]
   1131 

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/pyarrow/pandas_compat.py in _reconstruct_block()
    723     placement = item['placement']
    724     if 'dictionary' in item:
--> 725         cat = _pandas_api.categorical_type.from_codes(
    726             block_arr, categories=item['dictionary'],
    727             ordered=item['ordered'])

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/pandas/core/arrays/categorical.py in from_codes()
    567         Categories (2, object): ['a' < 'b']
    568         """
--> 569         dtype = CategoricalDtype._from_values_or_dtype(
    570             categories=categories, ordered=ordered, dtype=dtype
    571         )

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/pandas/core/dtypes/dtypes.py in _from_values_or_dtype()
    271             # Note: This could potentially have categories=None and
    272             # ordered=None.
--> 273             dtype = CategoricalDtype(categories, ordered)
    274 
    275         return dtype

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/pandas/core/dtypes/dtypes.py in __init__()
    158 
    159     def __init__(self, categories=None, ordered: Ordered = False):
--> 160         self._finalize(categories, ordered, fastpath=False)
    161 
    162     @classmethod

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/pandas/core/dtypes/dtypes.py in _finalize()
    312 
    313         if categories is not None:
--> 314             categories = self.validate_categories(categories, fastpath=fastpath)
    315 
    316         self._categories = categories

~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/pandas/core/dtypes/dtypes.py in validate_categories()
    509 
    510             if not categories.is_unique:
--> 511                 raise ValueError("Categorical categories must be unique")
    512 
    513         if isinstance(categories, ABCCategoricalIndex):

ValueError: Categorical categories must be unique

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions