-
-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Fix meta creation incase of object
#7586
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
04752dc
471a6f1
162de9f
7599725
3d03d46
ffe1973
87b262f
d7c6541
f7d4a6e
f9de319
b117659
ba6d611
99cec3a
737f0d0
cb70811
25e0c28
a40de33
e4a6de9
1bfe464
b703c12
cf72028
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -59,7 +59,6 @@ | |||||||||
| get_parallel_type, | ||||||||||
| group_split_dispatch, | ||||||||||
| hash_object_dispatch, | ||||||||||
| make_meta, | ||||||||||
| meta_nonempty, | ||||||||||
| ) | ||||||||||
| from .optimize import optimize | ||||||||||
|
|
@@ -76,6 +75,7 @@ | |||||||||
| is_dataframe_like, | ||||||||||
| is_index_like, | ||||||||||
| is_series_like, | ||||||||||
| make_meta_util, | ||||||||||
| raise_on_meta_error, | ||||||||||
| valid_divisions, | ||||||||||
| ) | ||||||||||
|
|
@@ -121,7 +121,9 @@ def __init__(self, dsk, name, meta, divisions=None): | |||||||||
| dsk = HighLevelGraph.from_collections(name, dsk, dependencies=[]) | ||||||||||
| self.dask = dsk | ||||||||||
| self._name = name | ||||||||||
| meta = make_meta(meta) | ||||||||||
| self._parent_meta = pd.Series(dtype="float64") | ||||||||||
|
|
||||||||||
| meta = make_meta_util(meta, parent_meta=self._parent_meta) | ||||||||||
| if is_dataframe_like(meta) or is_series_like(meta) or is_index_like(meta): | ||||||||||
| raise TypeError( | ||||||||||
| "Expected meta to specify scalar, got " | ||||||||||
|
|
@@ -265,7 +267,7 @@ def _scalar_binary(op, self, other, inv=False): | |||||||||
| (op, other_key, (self._name, 0)) if inv else (op, (self._name, 0), other_key) | ||||||||||
| ) | ||||||||||
|
|
||||||||||
| other_meta = make_meta(other) | ||||||||||
| other_meta = make_meta_util(other, parent_meta=self._parent_meta) | ||||||||||
| other_meta_nonempty = meta_nonempty(other_meta) | ||||||||||
| if inv: | ||||||||||
| meta = op(other_meta_nonempty, self._meta_nonempty) | ||||||||||
|
|
@@ -301,7 +303,7 @@ def __init__(self, dsk, name, meta, divisions): | |||||||||
| dsk = HighLevelGraph.from_collections(name, dsk, dependencies=[]) | ||||||||||
| self.dask = dsk | ||||||||||
| self._name = name | ||||||||||
| meta = make_meta(meta) | ||||||||||
| meta = make_meta_util(meta) | ||||||||||
| if not self._is_partition_type(meta): | ||||||||||
| raise TypeError( | ||||||||||
| "Expected meta to specify type {0}, got type " | ||||||||||
|
|
@@ -1886,6 +1888,7 @@ def mean(self, axis=None, skipna=True, split_every=False, dtype=None, out=None): | |||||||||
| token=name, | ||||||||||
| meta=meta, | ||||||||||
| enforce_metadata=False, | ||||||||||
| parent_meta=self._meta, | ||||||||||
| ) | ||||||||||
| if isinstance(self, DataFrame): | ||||||||||
| result.divisions = (self.columns.min(), self.columns.max()) | ||||||||||
|
|
@@ -2055,13 +2058,19 @@ def std( | |||||||||
| skipna=skipna, | ||||||||||
| ddof=ddof, | ||||||||||
| enforce_metadata=False, | ||||||||||
| parent_meta=self._meta, | ||||||||||
| ) | ||||||||||
| return handle_out(out, result) | ||||||||||
| else: | ||||||||||
| v = self.var(skipna=skipna, ddof=ddof, split_every=split_every) | ||||||||||
| name = self._token_prefix + "std" | ||||||||||
| result = map_partitions( | ||||||||||
| np.sqrt, v, meta=meta, token=name, enforce_metadata=False | ||||||||||
| np.sqrt, | ||||||||||
| v, | ||||||||||
| meta=meta, | ||||||||||
| token=name, | ||||||||||
| enforce_metadata=False, | ||||||||||
| parent_meta=self._meta, | ||||||||||
| ) | ||||||||||
| return handle_out(out, result) | ||||||||||
|
|
||||||||||
|
|
@@ -2292,14 +2301,20 @@ def sem(self, axis=None, skipna=None, ddof=1, split_every=False): | |||||||||
| axis=axis, | ||||||||||
| skipna=skipna, | ||||||||||
| ddof=ddof, | ||||||||||
| parent_meta=self._meta, | ||||||||||
| ) | ||||||||||
| else: | ||||||||||
| num = self._get_numeric_data() | ||||||||||
| v = num.var(skipna=skipna, ddof=ddof, split_every=split_every) | ||||||||||
| n = num.count(split_every=split_every) | ||||||||||
| name = self._token_prefix + "sem" | ||||||||||
| result = map_partitions( | ||||||||||
| np.sqrt, v / n, meta=meta, token=name, enforce_metadata=False | ||||||||||
| np.sqrt, | ||||||||||
| v / n, | ||||||||||
| meta=meta, | ||||||||||
| token=name, | ||||||||||
| enforce_metadata=False, | ||||||||||
| parent_meta=self._meta, | ||||||||||
| ) | ||||||||||
|
|
||||||||||
| if isinstance(self, DataFrame): | ||||||||||
|
|
@@ -2335,6 +2350,7 @@ def quantile(self, q=0.5, axis=0, method="default"): | |||||||||
| token=keyname, | ||||||||||
| enforce_metadata=False, | ||||||||||
| meta=(q, "f8"), | ||||||||||
| parent_meta=self._meta, | ||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Another case where I'd like to avoid passing
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This case is an example where we have |
||||||||||
| ) | ||||||||||
| else: | ||||||||||
| _raise_if_object_series(self, "quantile") | ||||||||||
|
|
@@ -3357,7 +3373,11 @@ def map(self, arg, na_action=None, meta=no_default): | |||||||||
| if meta is no_default: | ||||||||||
| meta = _emulate(M.map, self, arg, na_action=na_action, udf=True) | ||||||||||
| else: | ||||||||||
| meta = make_meta(meta, index=getattr(make_meta(self), "index", None)) | ||||||||||
| meta = make_meta_util( | ||||||||||
| meta, | ||||||||||
| index=getattr(make_meta_util(self), "index", None), | ||||||||||
| parent_meta=self._meta, | ||||||||||
| ) | ||||||||||
|
|
||||||||||
| return type(self)(graph, name, meta, self.divisions) | ||||||||||
|
|
||||||||||
|
|
@@ -4718,7 +4738,7 @@ def apply( | |||||||||
| M.apply, self._meta_nonempty, func, args=args, udf=True, **kwds | ||||||||||
| ) | ||||||||||
| warnings.warn(meta_warning(meta)) | ||||||||||
|
|
||||||||||
| kwds.update({"parent_meta": self._meta}) | ||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It would be nice if we didn't need to pass parent metadata into
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The reason Lines 5591 to 5594 in 640df6b
|
||||||||||
| return map_partitions(M.apply, self, func, args=args, meta=meta, **kwds) | ||||||||||
|
|
||||||||||
| @derived_from(pd.DataFrame) | ||||||||||
|
|
@@ -5503,15 +5523,17 @@ def apply_concat_apply( | |||||||||
| meta = _emulate( | ||||||||||
| aggregate, _concat([meta_chunk], ignore_index), udf=True, **aggregate_kwargs | ||||||||||
| ) | ||||||||||
| meta = make_meta( | ||||||||||
| meta, index=(getattr(make_meta(dfs[0]), "index", None) if dfs else None) | ||||||||||
| meta = make_meta_util( | ||||||||||
| meta, | ||||||||||
| index=(getattr(make_meta_util(dfs[0]), "index", None) if dfs else None), | ||||||||||
| parent_meta=dfs[0]._meta, | ||||||||||
| ) | ||||||||||
|
|
||||||||||
| graph = HighLevelGraph.from_collections(b, dsk, dependencies=dfs) | ||||||||||
|
|
||||||||||
| divisions = [None] * (split_out + 1) | ||||||||||
|
|
||||||||||
| return new_dd_object(graph, b, meta, divisions) | ||||||||||
| return new_dd_object(graph, b, meta, divisions, parent_meta=dfs[0]._meta) | ||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
I don't think we need this, but I may be missing something.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Similar issue here, > /nvme/0/pgali/cudf/dask/dask/dataframe/core.py(5535)apply_concat_apply()
(Pdb) dfs[0]._meta
Series([], dtype: int64)
(Pdb) meta
1 |
||||||||||
|
|
||||||||||
|
|
||||||||||
| aca = apply_concat_apply | ||||||||||
|
|
@@ -5577,6 +5599,7 @@ def map_partitions( | |||||||||
| $META | ||||||||||
| """ | ||||||||||
| name = kwargs.pop("token", None) | ||||||||||
| parent_meta = kwargs.pop("parent_meta", None) | ||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are there expected cases where a user would need the option of passing this in. Is there an expected reason that the
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This would be similar to the above one, when |
||||||||||
|
|
||||||||||
| if has_keyword(func, "partition_info"): | ||||||||||
| kwargs["partition_info"] = {"number": -1, "divisions": None} | ||||||||||
|
|
@@ -5594,14 +5617,16 @@ def map_partitions( | |||||||||
| args = _maybe_from_pandas(args) | ||||||||||
| args = _maybe_align_partitions(args) | ||||||||||
| dfs = [df for df in args if isinstance(df, _Frame)] | ||||||||||
| meta_index = getattr(make_meta(dfs[0]), "index", None) if dfs else None | ||||||||||
| meta_index = getattr(make_meta_util(dfs[0]), "index", None) if dfs else None | ||||||||||
| if parent_meta is None and dfs: | ||||||||||
| parent_meta = dfs[0]._meta | ||||||||||
|
|
||||||||||
| if meta is no_default: | ||||||||||
| # Use non-normalized kwargs here, as we want the real values (not | ||||||||||
| # delayed values) | ||||||||||
| meta = _emulate(func, *args, udf=True, **kwargs) | ||||||||||
| else: | ||||||||||
| meta = make_meta(meta, index=meta_index) | ||||||||||
| meta = make_meta_util(meta, index=meta_index, parent_meta=parent_meta) | ||||||||||
|
|
||||||||||
| if has_keyword(func, "partition_info"): | ||||||||||
| kwargs["partition_info"] = "__dummy__" | ||||||||||
|
|
@@ -5615,10 +5640,10 @@ def map_partitions( | |||||||||
| elif not (has_parallel_type(meta) or is_arraylike(meta) and meta.shape): | ||||||||||
| # If `meta` is not a pandas object, the concatenated results will be a | ||||||||||
| # different type | ||||||||||
| meta = make_meta(_concat([meta]), index=meta_index) | ||||||||||
| meta = make_meta_util(_concat([meta]), index=meta_index) | ||||||||||
|
|
||||||||||
| # Ensure meta is empty series | ||||||||||
| meta = make_meta(meta) | ||||||||||
| meta = make_meta_util(meta, parent_meta=parent_meta) | ||||||||||
|
|
||||||||||
| args2 = [] | ||||||||||
| dependencies = [] | ||||||||||
|
|
@@ -5979,7 +6004,9 @@ def cov_corr(df, min_periods=None, corr=False, scalar=False, split_every=False): | |||||||||
| graph = HighLevelGraph.from_collections(name, dsk, dependencies=[df]) | ||||||||||
| if scalar: | ||||||||||
| return Scalar(graph, name, "f8") | ||||||||||
| meta = make_meta([(c, "f8") for c in df.columns], index=df.columns) | ||||||||||
| meta = make_meta_util( | ||||||||||
| [(c, "f8") for c in df.columns], index=df.columns, parent_meta=df._meta | ||||||||||
| ) | ||||||||||
| return DataFrame(graph, name, meta, (df.columns[0], df.columns[-1])) | ||||||||||
|
|
||||||||||
|
|
||||||||||
|
|
@@ -6727,7 +6754,7 @@ def has_parallel_type(x): | |||||||||
| return get_parallel_type(x) is not Scalar | ||||||||||
|
|
||||||||||
|
|
||||||||||
| def new_dd_object(dsk, name, meta, divisions): | ||||||||||
| def new_dd_object(dsk, name, meta, divisions, parent_meta=None): | ||||||||||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Doesn't look like this change is necessary (I don't think
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Needed for this: https://github.com/dask/dask/pull/7586/files#r637579242 |
||||||||||
| """Generic constructor for dask.dataframe objects. | ||||||||||
|
|
||||||||||
| Decides the appropriate output class based on the type of `meta` provided. | ||||||||||
|
|
@@ -7026,7 +7053,7 @@ def series_map(base_series, map_series): | |||||||||
|
|
||||||||||
| meta = map_series._meta.copy() | ||||||||||
| meta.index = base_series._meta.index | ||||||||||
| meta = make_meta(meta) | ||||||||||
| meta = make_meta_util(meta) | ||||||||||
|
|
||||||||||
| dependencies = [base_series, map_series, base_series.index] | ||||||||||
| graph = HighLevelGraph.from_collections( | ||||||||||
|
|
||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,7 +14,12 @@ | |
| from ...utils import M, ensure_dict | ||
| from ..core import DataFrame, Index, Series, has_parallel_type, new_dd_object | ||
| from ..shuffle import set_partition | ||
| from ..utils import check_meta, insert_meta_param_description, is_series_like, make_meta | ||
| from ..utils import ( | ||
| check_meta, | ||
| insert_meta_param_description, | ||
| is_series_like, | ||
| make_meta_util, | ||
| ) | ||
|
|
||
| lock = Lock() | ||
|
|
||
|
|
@@ -588,10 +593,12 @@ def from_delayed( | |
| if not isinstance(df, Delayed): | ||
| raise TypeError("Expected Delayed object, got %s" % type(df).__name__) | ||
|
|
||
| parent_meta = delayed(make_meta_util)(dfs[0]).compute() | ||
|
|
||
| if meta is None: | ||
| meta = delayed(make_meta)(dfs[0]).compute() | ||
| meta = parent_meta | ||
| else: | ||
| meta = make_meta(meta) | ||
| meta = make_meta_util(meta, parent_meta=parent_meta) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a good reason to pass
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Sadly, nope. For example: > /nvme/0/pgali/cudf/dask/dask/dataframe/io/io.py(603)from_delayed()
(Pdb) meta
[('a', 'f8'), ('b', 'f8'), ('c', 'f8'), ('d', 'f8')]
(Pdb) parent_meta
Empty DataFrame
Columns: [a, b, c, d]
Index: [] |
||
|
|
||
| name = prefix + "-" + tokenize(*dfs) | ||
| dsk = merge(df.dask for df in dfs) | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we not pass in
parent_meta=self._metahere? Is the Scalar meta still too "general"?Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, for some scalars the meta seems to be too generic like
int, hence the need to passself._parent_metahere :for example