-
-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Closed
Labels
bugSomething is brokenSomething is brokendataframeneeds attentionIt's been a while since this was pushed on. Needs attention from the owner or a maintainer.It's been a while since this was pushed on. Needs attention from the owner or a maintainer.
Description
What happened:
Dask.dataframe errors when trying to call dd.set_index with a period dtype.
Minimal Complete Verifiable Example:
import dask.dataframe as dd
import pandas
df = pandas.DataFrame({
"a": range(10),
"b": pandas.period_range(start="2022-05-23", periods=10, freq="1D")
})
dd.from_pandas(df, npartitions=2).set_index("b")which produces
Details
---------------------------------------------------------------------------
File ~/miniconda3/envs/parquet/lib/python3.9/site-packages/dask/dataframe/core.py:4714, in DataFrame.set_index(***failed resolving arguments***)
4711 else:
4712 from dask.dataframe.shuffle import set_index
-> 4714 return set_index(
4715 self,
4716 other,
4717 drop=drop,
4718 npartitions=npartitions,
4719 divisions=divisions,
4720 **kwargs,
4721 )
File ~/miniconda3/envs/parquet/lib/python3.9/site-packages/dask/dataframe/shuffle.py:220, in set_index(df, index, npartitions, shuffle, compute, drop, upsample, divisions, partition_size, **kwargs)
217 index2 = index
219 if divisions is None:
--> 220 divisions, mins, maxes = _calculate_divisions(
221 df, index2, repartition, npartitions, upsample, partition_size
222 )
224 if (
225 mins == sorted(mins)
226 and maxes == sorted(maxes)
227 and all(mx < mn for mx, mn in zip(maxes[:-1], mins[1:]))
228 and npartitions == df.npartitions
229 ):
230 divisions = mins + [maxes[-1]]
File ~/miniconda3/envs/parquet/lib/python3.9/site-packages/dask/dataframe/shuffle.py:42, in _calculate_divisions(df, partition_col, repartition, npartitions, upsample, partition_size)
40 mins = partition_col.map_partitions(M.min)
41 maxes = partition_col.map_partitions(M.max)
---> 42 divisions, sizes, mins, maxes = compute(divisions, sizes, mins, maxes)
43 divisions = methods.tolist(divisions)
44 if type(sizes) is not list:
File ~/miniconda3/envs/parquet/lib/python3.9/site-packages/dask/base.py:575, in compute(traverse, optimize_graph, scheduler, get, *args, **kwargs)
572 keys.append(x.__dask_keys__())
573 postcomputes.append(x.__dask_postcompute__())
--> 575 results = schedule(dsk, keys, **kwargs)
576 return repack([f(r, *a) for r, (f, a) in zip(results, postcomputes)])
File ~/miniconda3/envs/parquet/lib/python3.9/site-packages/dask/threaded.py:81, in get(dsk, result, cache, num_workers, pool, **kwargs)
78 elif isinstance(pool, multiprocessing.pool.Pool):
79 pool = MultiprocessingPoolExecutor(pool)
---> 81 results = get_async(
82 pool.submit,
83 pool._max_workers,
84 dsk,
85 result,
86 cache=cache,
87 get_id=_thread_get_id,
88 pack_exception=pack_exception,
89 **kwargs,
90 )
92 # Cleanup pools associated to dead threads
93 with pools_lock:
File ~/miniconda3/envs/parquet/lib/python3.9/site-packages/dask/local.py:508, in get_async(submit, num_workers, dsk, result, cache, get_id, rerun_exceptions_locally, pack_exception, raise_exception, callbacks, dumps, loads, chunksize, **kwargs)
506 _execute_task(task, data) # Re-execute locally
507 else:
--> 508 raise_exception(exc, tb)
509 res, worker_id = loads(res_info)
510 state["cache"][key] = res
File ~/miniconda3/envs/parquet/lib/python3.9/site-packages/dask/local.py:316, in reraise(exc, tb)
314 if exc.__traceback__ is not tb:
315 raise exc.with_traceback(tb)
--> 316 raise exc
File ~/miniconda3/envs/parquet/lib/python3.9/site-packages/dask/local.py:221, in execute_task(key, task_info, dumps, loads, get_id, pack_exception)
219 try:
220 task, data = loads(task_info)
--> 221 result = _execute_task(task, data)
222 id = get_id()
223 result = dumps((result, id))
File ~/miniconda3/envs/parquet/lib/python3.9/site-packages/dask/core.py:119, in _execute_task(arg, cache, dsk)
115 func, args = arg[0], arg[1:]
116 # Note: Don't assign the subtask results to a variable. numpy detects
117 # temporaries by their reference count and can execute certain
118 # operations in-place.
--> 119 return func(*(_execute_task(a, cache) for a in args))
120 elif not ishashable(arg):
121 return arg
File ~/miniconda3/envs/parquet/lib/python3.9/site-packages/dask/dataframe/partitionquantiles.py:417, in percentiles_summary(df, num_old, num_new, upsample, state)
415 data = data.cat.codes
416 interpolation = "nearest"
--> 417 elif isinstance(data.dtype, pd.core.dtypes.dtypes.DatetimeTZDtype) or np.issubdtype(
418 data.dtype, np.integer
419 ):
420 interpolation = "nearest"
421 vals, n = _percentile(data, qs, interpolation=interpolation)
File ~/miniconda3/envs/parquet/lib/python3.9/site-packages/numpy/core/numerictypes.py:416, in issubdtype(arg1, arg2)
358 r"""
359 Returns True if first argument is a typecode lower/equal in type hierarchy.
360
(...)
413
414 """
415 if not issubclass_(arg1, generic):
--> 416 arg1 = dtype(arg1).type
417 if not issubclass_(arg2, generic):
418 arg2 = dtype(arg2).type
TypeError: Cannot interpret 'period[D]' as a data typeEnvironment:
- Dask version: 2022.5.0
- Python version: 3.9
- Operating System: ubuntu
- Install method (conda, pip, source): pip
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
bugSomething is brokenSomething is brokendataframeneeds attentionIt's been a while since this was pushed on. Needs attention from the owner or a maintainer.It's been a while since this was pushed on. Needs attention from the owner or a maintainer.