Skip to content

Cannot set index to a column with a period dtype #9118

@ian-r-rose

Description

@ian-r-rose

What happened:
Dask.dataframe errors when trying to call dd.set_index with a period dtype.

Minimal Complete Verifiable Example:

import dask.dataframe as dd
import pandas

df = pandas.DataFrame({
    "a": range(10),
    "b": pandas.period_range(start="2022-05-23", periods=10, freq="1D")
})
dd.from_pandas(df, npartitions=2).set_index("b")

which produces

Details
---------------------------------------------------------------------------
File ~/miniconda3/envs/parquet/lib/python3.9/site-packages/dask/dataframe/core.py:4714, in DataFrame.set_index(***failed resolving arguments***)
   4711 else:
   4712     from dask.dataframe.shuffle import set_index
-> 4714     return set_index(
   4715         self,
   4716         other,
   4717         drop=drop,
   4718         npartitions=npartitions,
   4719         divisions=divisions,
   4720         **kwargs,
   4721     )

File ~/miniconda3/envs/parquet/lib/python3.9/site-packages/dask/dataframe/shuffle.py:220, in set_index(df, index, npartitions, shuffle, compute, drop, upsample, divisions, partition_size, **kwargs)
    217     index2 = index
    219 if divisions is None:
--> 220     divisions, mins, maxes = _calculate_divisions(
    221         df, index2, repartition, npartitions, upsample, partition_size
    222     )
    224     if (
    225         mins == sorted(mins)
    226         and maxes == sorted(maxes)
    227         and all(mx < mn for mx, mn in zip(maxes[:-1], mins[1:]))
    228         and npartitions == df.npartitions
    229     ):
    230         divisions = mins + [maxes[-1]]

File ~/miniconda3/envs/parquet/lib/python3.9/site-packages/dask/dataframe/shuffle.py:42, in _calculate_divisions(df, partition_col, repartition, npartitions, upsample, partition_size)
     40 mins = partition_col.map_partitions(M.min)
     41 maxes = partition_col.map_partitions(M.max)
---> 42 divisions, sizes, mins, maxes = compute(divisions, sizes, mins, maxes)
     43 divisions = methods.tolist(divisions)
     44 if type(sizes) is not list:

File ~/miniconda3/envs/parquet/lib/python3.9/site-packages/dask/base.py:575, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
    572     keys.append(x.__dask_keys__())
    573     postcomputes.append(x.__dask_postcompute__())
--> 575 results = schedule(dsk, keys, **kwargs)
    576 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])

File ~/miniconda3/envs/parquet/lib/python3.9/site-packages/dask/threaded.py:81, in get(dsk, result, cache, num_workers, pool, **kwargs)
     78     elif isinstance(pool, multiprocessing.pool.Pool):
     79         pool = MultiprocessingPoolExecutor(pool)
---> 81 results = get_async(
     82     pool.submit,
     83     pool._max_workers,
     84     dsk,
     85     result,
     86     cache=cache,
     87     get_id=_thread_get_id,
     88     pack_exception=pack_exception,
     89     **kwargs,
     90 )
     92 # Cleanup pools associated to dead threads
     93 with pools_lock:

File ~/miniconda3/envs/parquet/lib/python3.9/site-packages/dask/local.py:508, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
    506         _execute_task(task, data)  # Re-execute locally
    507     else:
--> 508         raise_exception(exc, tb)
    509 res, worker_id = loads(res_info)
    510 state["cache"][key] = res

File ~/miniconda3/envs/parquet/lib/python3.9/site-packages/dask/local.py:316, in reraise(exc, tb)
    314 if exc.__traceback__ is not tb:
    315     raise exc.with_traceback(tb)
--> 316 raise exc

File ~/miniconda3/envs/parquet/lib/python3.9/site-packages/dask/local.py:221, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
    219 try:
    220     task, data = loads(task_info)
--> 221     result = _execute_task(task, data)
    222     id = get_id()
    223     result = dumps((result, id))

File ~/miniconda3/envs/parquet/lib/python3.9/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
    115     func, args = arg[0], arg[1:]
    116     # Note: Don't assign the subtask results to a variable. numpy detects
    117     # temporaries by their reference count and can execute certain
    118     # operations in-place.
--> 119     return func(*(_execute_task(a, cache) for a in args))
    120 elif not ishashable(arg):
    121     return arg

File ~/miniconda3/envs/parquet/lib/python3.9/site-packages/dask/dataframe/partitionquantiles.py:417, in percentiles_summary(df, num_old, num_new, upsample, state)
    415     data = data.cat.codes
    416     interpolation = "nearest"
--> 417 elif isinstance(data.dtype, pd.core.dtypes.dtypes.DatetimeTZDtype) or np.issubdtype(
    418     data.dtype, np.integer
    419 ):
    420     interpolation = "nearest"
    421 vals, n = _percentile(data, qs, interpolation=interpolation)

File ~/miniconda3/envs/parquet/lib/python3.9/site-packages/numpy/core/numerictypes.py:416, in issubdtype(arg1, arg2)
    358 r"""
    359 Returns True if first argument is a typecode lower/equal in type hierarchy.
    360 
   (...)
    413 
    414 """
    415 if not issubclass_(arg1, generic):
--> 416     arg1 = dtype(arg1).type
    417 if not issubclass_(arg2, generic):
    418     arg2 = dtype(arg2).type

TypeError: Cannot interpret 'period[D]' as a data type

Environment:

  • Dask version: 2022.5.0
  • Python version: 3.9
  • Operating System: ubuntu
  • Install method (conda, pip, source): pip

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething is brokendataframeneeds attentionIt's been a while since this was pushed on. Needs attention from the owner or a maintainer.

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions