-
-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Closed
Description
The following toy example often (not always!) throws a ValueError: Categorical categories must be unique:
import dask.dataframe as dd
import pandas as pd
import pyarrow as pa
import numpy as np
from dask.distributed import LocalCluster, Client
cluster = LocalCluster(n_workers=2)
client = Client(cluster)
ddf = dd.from_pandas(
pd.DataFrame(
columns=["a", "b", "c"],
data=np.random.randint(1, 10, size=(1000, 3))
),
npartitions=3
)
ddf.to_parquet(
"test/",
overwrite=True,
partition_on=["a", "b"],
engine="pyarrow-dataset",
schema={"a": pa.int64(), "b": pa.int64(), "c": pa.int64()}
)
dd.read_parquet(
"test/",
engine="pyarrow-dataset",
filters=[
("a", "==", 1), # also tried with "="
]
).compute()
- Without the
partition_oninto_parquet, it works without any problems. - If you change the engine from
pyarrow-datasettopyarrowindd.read_parquet, it works without any problems.
Dask: 2020.12.0
PyArrow: 2.0.0
Python: 3.8.6
pandas: 1.2.0
OS: MacOS Big Sur
Full stacktrace:
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-1-e5f4bae2eb86> in <module>
24 )
25
---> 26 dd.read_parquet(
27 "test/",
28 engine="pyarrow-dataset",
~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/dask/base.py in compute(self, **kwargs)
277 dask.base.compute
278 """
--> 279 (result,) = compute(self, traverse=False, **kwargs)
280 return result
281
~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/dask/base.py in compute(*args, **kwargs)
565 postcomputes.append(x.__dask_postcompute__())
566
--> 567 results = schedule(dsk, keys, **kwargs)
568 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
569
~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/distributed/client.py in get(self, dsk, keys, restrictions, loose_restrictions, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2674 should_rejoin = False
2675 try:
-> 2676 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2677 finally:
2678 for f in futures.values():
~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
1983 else:
1984 local_worker = None
-> 1985 return self.sync(
1986 self._gather,
1987 futures,
~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
829 return future
830 else:
--> 831 return sync(
832 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
833 )
~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
338 if error[0]:
339 typ, exc, tb = error[0]
--> 340 raise exc.with_traceback(tb)
341 else:
342 return result[0]
~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/distributed/utils.py in f()
322 if callback_timeout is not None:
323 future = asyncio.wait_for(future, callback_timeout)
--> 324 result[0] = yield future
325 except Exception as exc:
326 error[0] = sys.exc_info()
~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/tornado/gen.py in run(self)
760
761 try:
--> 762 value = future.result()
763 except Exception:
764 exc_info = sys.exc_info()
~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1848 exc = CancelledError(key)
1849 else:
-> 1850 raise exception.with_traceback(traceback)
1851 raise exc
1852 if errors == "skip":
~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/dask/optimization.py in __call__()
961 if not len(args) == len(self.inkeys):
962 raise ValueError("Expected %d args, got %d" % (len(self.inkeys), len(args)))
--> 963 return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
964
965 def __reduce__(self):
~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/dask/core.py in get()
149 for key in toposort(dsk):
150 task = dsk[key]
--> 151 result = _execute_task(task, cache)
152 cache[key] = result
153 result = _execute_task(out, cache)
~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/dask/core.py in _execute_task()
119 # temporaries by their reference count and can execute certain
120 # operations in-place.
--> 121 return func(*(_execute_task(a, cache) for a in args))
122 elif not ishashable(arg):
123 return arg
~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/dask/blockwise.py in __call__()
29 return None
30 if isinstance(args, (tuple, list)):
---> 31 return self.func(*args)
32 else:
33 return self.func(args)
~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py in read_parquet_part()
381
382 if isinstance(part, list):
--> 383 dfs = [func(fs, rg, columns.copy(), index, **kwargs) for rg in part]
384 df = concat(dfs, axis=0)
385 else:
~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/dask/dataframe/io/parquet/core.py in <listcomp>()
381
382 if isinstance(part, list):
--> 383 dfs = [func(fs, rg, columns.copy(), index, **kwargs) for rg in part]
384 df = concat(dfs, axis=0)
385 else:
~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/dask/dataframe/io/parquet/arrow.py in read_partition()
612 arrow_table = arrow_table.append_column(partition.name, arr)
613
--> 614 df = cls._arrow_table_to_pandas(arrow_table, categories, **kwargs)
615
616 # For pyarrow.dataset api, need to convert partition columns
~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/dask/dataframe/io/parquet/arrow.py in _arrow_table_to_pandas()
1554 _kwargs.update({"use_threads": False, "ignore_metadata": False})
1555
-> 1556 return arrow_table.to_pandas(categories=categories, **_kwargs)
1557
1558 @classmethod
~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/pyarrow/array.pxi in pyarrow.lib._PandasConvertible.to_pandas()
740 self_destruct=self_destruct
741 )
--> 742 return self._to_pandas(options, categories=categories,
743 ignore_metadata=ignore_metadata,
744 types_mapper=types_mapper)
~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/pyarrow/table.pxi in pyarrow.lib.Table._to_pandas()
1581 types_mapper=None):
1582 from pyarrow.pandas_compat import table_to_blockmanager
-> 1583 mgr = table_to_blockmanager(
1584 options, self, categories,
1585 ignore_metadata=ignore_metadata,
~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/pyarrow/pandas_compat.py in table_to_blockmanager()
786 _check_data_column_metadata_consistency(all_columns)
787 columns = _deserialize_column_index(table, all_columns, column_indexes)
--> 788 blocks = _table_to_blocks(options, table, categories, ext_columns_dtypes)
789
790 axes = [columns, index]
~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/pyarrow/pandas_compat.py in _table_to_blocks()
1127 result = pa.lib.table_to_blocks(options, block_table, categories,
1128 list(extension_columns.keys()))
-> 1129 return [_reconstruct_block(item, columns, extension_columns)
1130 for item in result]
1131
~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/pyarrow/pandas_compat.py in <listcomp>()
1127 result = pa.lib.table_to_blocks(options, block_table, categories,
1128 list(extension_columns.keys()))
-> 1129 return [_reconstruct_block(item, columns, extension_columns)
1130 for item in result]
1131
~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/pyarrow/pandas_compat.py in _reconstruct_block()
723 placement = item['placement']
724 if 'dictionary' in item:
--> 725 cat = _pandas_api.categorical_type.from_codes(
726 block_arr, categories=item['dictionary'],
727 ordered=item['ordered'])
~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/pandas/core/arrays/categorical.py in from_codes()
567 Categories (2, object): ['a' < 'b']
568 """
--> 569 dtype = CategoricalDtype._from_values_or_dtype(
570 categories=categories, ordered=ordered, dtype=dtype
571 )
~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/pandas/core/dtypes/dtypes.py in _from_values_or_dtype()
271 # Note: This could potentially have categories=None and
272 # ordered=None.
--> 273 dtype = CategoricalDtype(categories, ordered)
274
275 return dtype
~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/pandas/core/dtypes/dtypes.py in __init__()
158
159 def __init__(self, categories=None, ordered: Ordered = False):
--> 160 self._finalize(categories, ordered, fastpath=False)
161
162 @classmethod
~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/pandas/core/dtypes/dtypes.py in _finalize()
312
313 if categories is not None:
--> 314 categories = self.validate_categories(categories, fastpath=fastpath)
315
316 self._categories = categories
~/.pyenv/versions/3.8.6/envs/ezra/lib/python3.8/site-packages/pandas/core/dtypes/dtypes.py in validate_categories()
509
510 if not categories.is_unique:
--> 511 raise ValueError("Categorical categories must be unique")
512
513 if isinstance(categories, ABCCategoricalIndex):
ValueError: Categorical categories must be unique
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels