Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion dask/bag/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1564,7 +1564,7 @@ def to_dataframe(self, meta=None, columns=None):
elif columns is not None:
raise ValueError("Can't specify both `meta` and `columns`")
else:
meta = dd.utils.make_meta(meta)
meta = dd.utils.make_meta_util(meta, parent_meta=pd.DataFrame())
# Serializing the columns and dtypes is much smaller than serializing
# the empty frame
cols = list(meta.columns)
Expand Down
18 changes: 14 additions & 4 deletions dask/dataframe/backends.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
hash_object_dispatch,
is_categorical_dtype_dispatch,
make_meta,
make_meta_obj,
meta_nonempty,
tolist_dispatch,
union_categoricals_dispatch,
Expand Down Expand Up @@ -66,7 +67,16 @@ def make_meta_index(x, index=None):
return x[0:0]


@make_meta.register(object)
meta_object_types = (pd.Series, pd.DataFrame, pd.Index, pd.MultiIndex)
try:
import scipy.sparse as sp

meta_object_types += (sp.spmatrix,)
except ImportError:
pass


@make_meta_obj.register(meta_object_types)
def make_meta_object(x, index=None):
"""Create an empty pandas object containing the desired metadata.

Expand Down Expand Up @@ -94,9 +104,8 @@ def make_meta_object(x, index=None):
>>> make_meta('i8') # doctest: +SKIP
1
"""
if hasattr(x, "_meta"):
return x._meta
elif is_arraylike(x) and x.shape:

if is_arraylike(x) and x.shape:
return x[:0]

if index is not None:
Expand Down Expand Up @@ -516,5 +525,6 @@ def is_categorical_dtype_pandas(obj):
@get_parallel_type.register_lazy("cudf")
@meta_nonempty.register_lazy("cudf")
@make_meta.register_lazy("cudf")
@make_meta_obj.register_lazy("cudf")
def _register_cudf():
import dask_cudf # noqa: F401
2 changes: 1 addition & 1 deletion dask/dataframe/categorical.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def _get_categories(df, columns, index):
for col in columns:
x = df[col]
if is_categorical_dtype(x):
res[col] = pd.Series(x.cat.categories)
res[col] = x._constructor(x.cat.categories)
else:
res[col] = x.dropna().drop_duplicates()
if index:
Expand Down
63 changes: 45 additions & 18 deletions dask/dataframe/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@
get_parallel_type,
group_split_dispatch,
hash_object_dispatch,
make_meta,
meta_nonempty,
)
from .optimize import optimize
Expand All @@ -76,6 +75,7 @@
is_dataframe_like,
is_index_like,
is_series_like,
make_meta_util,
raise_on_meta_error,
valid_divisions,
)
Expand Down Expand Up @@ -121,7 +121,9 @@ def __init__(self, dsk, name, meta, divisions=None):
dsk = HighLevelGraph.from_collections(name, dsk, dependencies=[])
self.dask = dsk
self._name = name
meta = make_meta(meta)
self._parent_meta = pd.Series(dtype="float64")

meta = make_meta_util(meta, parent_meta=self._parent_meta)
if is_dataframe_like(meta) or is_series_like(meta) or is_index_like(meta):
raise TypeError(
"Expected meta to specify scalar, got "
Expand Down Expand Up @@ -265,7 +267,7 @@ def _scalar_binary(op, self, other, inv=False):
(op, other_key, (self._name, 0)) if inv else (op, (self._name, 0), other_key)
)

other_meta = make_meta(other)
other_meta = make_meta_util(other, parent_meta=self._parent_meta)
Copy link
Member

@rjzamora rjzamora May 20, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we not pass in parent_meta=self._meta here? Is the Scalar meta still too "general"?

Copy link
Contributor Author

@galipremsagar galipremsagar May 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, for some scalars the meta seems to be too generic like int, hence the need to pass self._parent_meta here :

for example

> /nvme/0/pgali/cudf/dask/dask/dataframe/core.py(269)_scalar_binary()
(Pdb) self._meta
1
(Pdb) self._parent_meta
Series([], dtype: float64)

other_meta_nonempty = meta_nonempty(other_meta)
if inv:
meta = op(other_meta_nonempty, self._meta_nonempty)
Expand Down Expand Up @@ -301,7 +303,7 @@ def __init__(self, dsk, name, meta, divisions):
dsk = HighLevelGraph.from_collections(name, dsk, dependencies=[])
self.dask = dsk
self._name = name
meta = make_meta(meta)
meta = make_meta_util(meta)
if not self._is_partition_type(meta):
raise TypeError(
"Expected meta to specify type {0}, got type "
Expand Down Expand Up @@ -1886,6 +1888,7 @@ def mean(self, axis=None, skipna=True, split_every=False, dtype=None, out=None):
token=name,
meta=meta,
enforce_metadata=False,
parent_meta=self._meta,
)
if isinstance(self, DataFrame):
result.divisions = (self.columns.min(), self.columns.max())
Expand Down Expand Up @@ -2055,13 +2058,19 @@ def std(
skipna=skipna,
ddof=ddof,
enforce_metadata=False,
parent_meta=self._meta,
)
return handle_out(out, result)
else:
v = self.var(skipna=skipna, ddof=ddof, split_every=split_every)
name = self._token_prefix + "std"
result = map_partitions(
np.sqrt, v, meta=meta, token=name, enforce_metadata=False
np.sqrt,
v,
meta=meta,
token=name,
enforce_metadata=False,
parent_meta=self._meta,
)
return handle_out(out, result)

Expand Down Expand Up @@ -2292,14 +2301,20 @@ def sem(self, axis=None, skipna=None, ddof=1, split_every=False):
axis=axis,
skipna=skipna,
ddof=ddof,
parent_meta=self._meta,
)
else:
num = self._get_numeric_data()
v = num.var(skipna=skipna, ddof=ddof, split_every=split_every)
n = num.count(split_every=split_every)
name = self._token_prefix + "sem"
result = map_partitions(
np.sqrt, v / n, meta=meta, token=name, enforce_metadata=False
np.sqrt,
v / n,
meta=meta,
token=name,
enforce_metadata=False,
parent_meta=self._meta,
)

if isinstance(self, DataFrame):
Expand Down Expand Up @@ -2335,6 +2350,7 @@ def quantile(self, q=0.5, axis=0, method="default"):
token=keyname,
enforce_metadata=False,
meta=(q, "f8"),
parent_meta=self._meta,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another case where I'd like to avoid passing parent_meta if we can avoid it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case is an example where we have meta is (q, 'f8') and we cannot really know what the parents meta is.

)
else:
_raise_if_object_series(self, "quantile")
Expand Down Expand Up @@ -3357,7 +3373,11 @@ def map(self, arg, na_action=None, meta=no_default):
if meta is no_default:
meta = _emulate(M.map, self, arg, na_action=na_action, udf=True)
else:
meta = make_meta(meta, index=getattr(make_meta(self), "index", None))
meta = make_meta_util(
meta,
index=getattr(make_meta_util(self), "index", None),
parent_meta=self._meta,
)

return type(self)(graph, name, meta, self.divisions)

Expand Down Expand Up @@ -4718,7 +4738,7 @@ def apply(
M.apply, self._meta_nonempty, func, args=args, udf=True, **kwds
)
warnings.warn(meta_warning(meta))

kwds.update({"parent_meta": self._meta})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be nice if we didn't need to pass parent metadata into map_partitions since we could easily use the _meta on the first _Frame object within that function.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason map_partitions would need a parent_meta is dfs frame objects list can be empty too:

dask/dask/dataframe/core.py

Lines 5591 to 5594 in 640df6b

args = _maybe_from_pandas(args)
args = _maybe_align_partitions(args)
dfs = [df for df in args if isinstance(df, _Frame)]
meta_index = getattr(make_meta(dfs[0]), "index", None) if dfs else None

return map_partitions(M.apply, self, func, args=args, meta=meta, **kwds)

@derived_from(pd.DataFrame)
Expand Down Expand Up @@ -5503,15 +5523,17 @@ def apply_concat_apply(
meta = _emulate(
aggregate, _concat([meta_chunk], ignore_index), udf=True, **aggregate_kwargs
)
meta = make_meta(
meta, index=(getattr(make_meta(dfs[0]), "index", None) if dfs else None)
meta = make_meta_util(
meta,
index=(getattr(make_meta_util(dfs[0]), "index", None) if dfs else None),
parent_meta=dfs[0]._meta,
)

graph = HighLevelGraph.from_collections(b, dsk, dependencies=dfs)

divisions = [None] * (split_out + 1)

return new_dd_object(graph, b, meta, divisions)
return new_dd_object(graph, b, meta, divisions, parent_meta=dfs[0]._meta)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return new_dd_object(graph, b, meta, divisions, parent_meta=dfs[0]._meta)
return new_dd_object(graph, b, meta, divisions)

I don't think we need this, but I may be missing something.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar issue here, meta can be too generic :

> /nvme/0/pgali/cudf/dask/dask/dataframe/core.py(5535)apply_concat_apply()
(Pdb) dfs[0]._meta
Series([], dtype: int64)
(Pdb) meta
1



aca = apply_concat_apply
Expand Down Expand Up @@ -5577,6 +5599,7 @@ def map_partitions(
$META
"""
name = kwargs.pop("token", None)
parent_meta = kwargs.pop("parent_meta", None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there expected cases where a user would need the option of passing this in. Is there an expected reason that the _meta of the first _Frame in args is not a good assumption?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This would be similar to the above one, when _Frame list can be all empty or the args has no _Frame at all and the meta at hand is way too generic.


if has_keyword(func, "partition_info"):
kwargs["partition_info"] = {"number": -1, "divisions": None}
Expand All @@ -5594,14 +5617,16 @@ def map_partitions(
args = _maybe_from_pandas(args)
args = _maybe_align_partitions(args)
dfs = [df for df in args if isinstance(df, _Frame)]
meta_index = getattr(make_meta(dfs[0]), "index", None) if dfs else None
meta_index = getattr(make_meta_util(dfs[0]), "index", None) if dfs else None
if parent_meta is None and dfs:
parent_meta = dfs[0]._meta

if meta is no_default:
# Use non-normalized kwargs here, as we want the real values (not
# delayed values)
meta = _emulate(func, *args, udf=True, **kwargs)
else:
meta = make_meta(meta, index=meta_index)
meta = make_meta_util(meta, index=meta_index, parent_meta=parent_meta)

if has_keyword(func, "partition_info"):
kwargs["partition_info"] = "__dummy__"
Expand All @@ -5615,10 +5640,10 @@ def map_partitions(
elif not (has_parallel_type(meta) or is_arraylike(meta) and meta.shape):
# If `meta` is not a pandas object, the concatenated results will be a
# different type
meta = make_meta(_concat([meta]), index=meta_index)
meta = make_meta_util(_concat([meta]), index=meta_index)

# Ensure meta is empty series
meta = make_meta(meta)
meta = make_meta_util(meta, parent_meta=parent_meta)

args2 = []
dependencies = []
Expand Down Expand Up @@ -5979,7 +6004,9 @@ def cov_corr(df, min_periods=None, corr=False, scalar=False, split_every=False):
graph = HighLevelGraph.from_collections(name, dsk, dependencies=[df])
if scalar:
return Scalar(graph, name, "f8")
meta = make_meta([(c, "f8") for c in df.columns], index=df.columns)
meta = make_meta_util(
[(c, "f8") for c in df.columns], index=df.columns, parent_meta=df._meta
)
return DataFrame(graph, name, meta, (df.columns[0], df.columns[-1]))


Expand Down Expand Up @@ -6727,7 +6754,7 @@ def has_parallel_type(x):
return get_parallel_type(x) is not Scalar


def new_dd_object(dsk, name, meta, divisions):
def new_dd_object(dsk, name, meta, divisions, parent_meta=None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def new_dd_object(dsk, name, meta, divisions, parent_meta=None):
def new_dd_object(dsk, name, meta, divisions):

Doesn't look like this change is necessary (I don't think parent_meta is used in new_dd_object)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"""Generic constructor for dask.dataframe objects.

Decides the appropriate output class based on the type of `meta` provided.
Expand Down Expand Up @@ -7026,7 +7053,7 @@ def series_map(base_series, map_series):

meta = map_series._meta.copy()
meta.index = base_series._meta.index
meta = make_meta(meta)
meta = make_meta_util(meta)

dependencies = [base_series, map_series, base_series.index]
graph = HighLevelGraph.from_collections(
Expand Down
28 changes: 28 additions & 0 deletions dask/dataframe/dispatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
Also see extension.py
"""

import dask.array as da
import dask.dataframe as dd

from ..utils import Dispatch

make_meta = Dispatch("make_meta")
make_meta_obj = Dispatch("make_meta_obj")
meta_nonempty = Dispatch("meta_nonempty")
hash_object_dispatch = Dispatch("hash_object_dispatch")
group_split_dispatch = Dispatch("group_split_dispatch")
Expand Down Expand Up @@ -79,6 +83,30 @@ def tolist(obj):
return func(obj)


def make_meta_util(x, index=None, parent_meta=None):
if isinstance(
x,
(
dd._Frame,
dd.core.Scalar,
dd.groupby._GroupBy,
dd.accessor.Accessor,
da.Array,
),
):
return x._meta

try:
return make_meta(x, index=index)
except TypeError:
if parent_meta is not None:
func = make_meta_obj.dispatch(type(parent_meta))
return func(x, index=index)
else:
func = make_meta_obj.dispatch(type(x))
return func(x, index=index)


def union_categoricals(to_union, sort_categories=False, ignore_order=False):
func = union_categoricals_dispatch.dispatch(type(to_union[0]))
return func(to_union, sort_categories=sort_categories, ignore_order=ignore_order)
6 changes: 3 additions & 3 deletions dask/dataframe/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
insert_meta_param_description,
is_dataframe_like,
is_series_like,
make_meta,
make_meta_util,
raise_on_meta_error,
)

Expand Down Expand Up @@ -1673,7 +1673,7 @@ def apply(self, func, *args, **kwargs):
)
warnings.warn(msg, stacklevel=2)

meta = make_meta(meta)
meta = make_meta_util(meta, parent_meta=self._meta.obj)

# Validate self.index
if isinstance(self.index, list) and any(
Expand Down Expand Up @@ -1762,7 +1762,7 @@ def transform(self, func, *args, **kwargs):
)
warnings.warn(msg, stacklevel=2)

meta = make_meta(meta)
meta = make_meta_util(meta, parent_meta=self._meta.obj)

# Validate self.index
if isinstance(self.index, list) and any(
Expand Down
13 changes: 10 additions & 3 deletions dask/dataframe/io/io.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,12 @@
from ...utils import M, ensure_dict
from ..core import DataFrame, Index, Series, has_parallel_type, new_dd_object
from ..shuffle import set_partition
from ..utils import check_meta, insert_meta_param_description, is_series_like, make_meta
from ..utils import (
check_meta,
insert_meta_param_description,
is_series_like,
make_meta_util,
)

lock = Lock()

Expand Down Expand Up @@ -588,10 +593,12 @@ def from_delayed(
if not isinstance(df, Delayed):
raise TypeError("Expected Delayed object, got %s" % type(df).__name__)

parent_meta = delayed(make_meta_util)(dfs[0]).compute()

if meta is None:
meta = delayed(make_meta)(dfs[0]).compute()
meta = parent_meta
else:
meta = make_meta(meta)
meta = make_meta_util(meta, parent_meta=parent_meta)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a good reason to pass parent_meta if a meta object is already provided? That is, shouldn't meta already be an appropriate DataFrame type? My intuition tells me that the above changes should just be swapping make_meta with make_meta_util.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is, shouldn't meta already be an appropriate DataFrame type?

Sadly, nope. For example:

> /nvme/0/pgali/cudf/dask/dask/dataframe/io/io.py(603)from_delayed()
(Pdb) meta
[('a', 'f8'), ('b', 'f8'), ('c', 'f8'), ('d', 'f8')]
(Pdb) parent_meta
Empty DataFrame
Columns: [a, b, c, d]
Index: []


name = prefix + "-" + tokenize(*dfs)
dsk = merge(df.dask for df in dfs)
Expand Down
4 changes: 2 additions & 2 deletions dask/dataframe/io/json.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from ...bytes import read_bytes
from ...core import flatten
from ...delayed import delayed
from ..utils import insert_meta_param_description, make_meta
from ..utils import insert_meta_param_description, make_meta_util
from .io import from_delayed


Expand Down Expand Up @@ -198,7 +198,7 @@ def read_json(
chunks = list(flatten(chunks))
if meta is None:
meta = read_json_chunk(first, encoding, errors, engine, kwargs)
meta = make_meta(meta)
meta = make_meta_util(meta)
parts = [
delayed(read_json_chunk)(chunk, encoding, errors, engine, kwargs, meta=meta)
for chunk in chunks
Expand Down
Loading