-
-
Notifications
You must be signed in to change notification settings - Fork 260
Allow dask arrays to be hstacked in ColumnTransformer #436
Copy link
Copy link
Closed
Description
Currently dask Arrays cannot be concatenated in a ColumnTransformer. It would be nice to allow this. This issue is related to #365: "Silence UserWarning in ColumnTransformer._hstack".
Example Transformer
from sklearn.base import BaseEstimator
# Generator ensures the DataFrame shape has NaN in first dim.
def gen_ints(r, c):
for i in range(r):
yield i + np.arange(c)
# Some basic transformer.
class SumTransformer(BaseEstimator):
def fit(self, X, y=None):
return self
def transform(self, X):
out = X.map_partitions(lambda x: x.values.sum(axis=-1).reshape(-1, 1))
return outCalling Code
import dask.bag as db
import sklearn
import dask_ml
def test_column_transformer_unk_chunksize():
cols = 3
rows = 4
a = 0x61 # character code for 'a'.
names = list(map(chr, a + np.arange(cols)))
x = db.from_sequence(gen_ints(rows, cols)).to_dataframe(columns=names)
features = sklearn.pipeline.Pipeline([
('features', sklearn.pipeline.FeatureUnion([
('ratios', dask_ml.compose.ColumnTransformer([
('a_b', SumTransformer(), ['a', 'b']),
('b_c', SumTransformer(), ['b', 'c'])
]))
]))
])
# Checks:
# ValueError: Tried to concatenate arrays with unknown shape (nan, 1).
# To force concatenation pass allow_unknown_chunksizes=True.
out = features.fit_transform(x)
exp = np.array([[1, 3], [3, 5], [5, 7], [7, 9]], dtype=np.int32)
assert isinstance(out, np.ndarray)
np.testing.assert_array_equal(out, exp)This could be fixed by enabling allow_unknown_chunksizes in _hstack, but currently fails with the following exception:
../../venv/lib/python3.6/site-packages/sklearn/pipeline.py:300: in fit_transform
return last_step.fit_transform(Xt, y, **fit_params)
../../venv/lib/python3.6/site-packages/sklearn/pipeline.py:793: in fit_transform
for name, trans, weight in self._iter())
../../venv/lib/python3.6/site-packages/sklearn/externals/joblib/parallel.py:917: in __call__
if self.dispatch_one_batch(iterator):
../../venv/lib/python3.6/site-packages/sklearn/externals/joblib/parallel.py:759: in dispatch_one_batch
self._dispatch(tasks)
../../venv/lib/python3.6/site-packages/sklearn/externals/joblib/parallel.py:716: in _dispatch
job = self._backend.apply_async(batch, callback=cb)
../../venv/lib/python3.6/site-packages/sklearn/externals/joblib/_parallel_backends.py:182: in apply_async
result = ImmediateResult(func)
../../venv/lib/python3.6/site-packages/sklearn/externals/joblib/_parallel_backends.py:549: in __init__
self.results = batch()
../../venv/lib/python3.6/site-packages/sklearn/externals/joblib/parallel.py:225: in __call__
for func, args, kwargs in self.items]
../../venv/lib/python3.6/site-packages/sklearn/externals/joblib/parallel.py:225: in <listcomp>
for func, args, kwargs in self.items]
../../venv/lib/python3.6/site-packages/sklearn/pipeline.py:614: in _fit_transform_one
res = transformer.fit_transform(X, y, **fit_params)
../../venv/lib/python3.6/site-packages/sklearn/compose/_column_transformer.py:471: in fit_transform
return self._hstack(list(Xs))
../../dask_ml/compose/_column_transformer.py:195: in _hstack
return da.hstack(Xs)
../../venv/lib/python3.6/site-packages/dask/array/routines.py:111: in hstack
return concatenate(tup, axis=1)
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
seq = [dask.array<lambda, shape=(nan, 1), dtype=int64, chunksize=(nan, 1)>, dask.array<lambda, shape=(nan, 1), dtype=int64, chunksize=(nan, 1)>]
axis = 1, allow_unknown_chunksizes = False
def concatenate(seq, axis=0, allow_unknown_chunksizes=False):
"""
Concatenate arrays along an existing axis
Given a sequence of dask Arrays form a new dask Array by stacking them
along an existing dimension (axis=0 by default)
Parameters
----------
seq: list of dask.arrays
axis: int
Dimension along which to align all of the arrays
allow_unknown_chunksizes: bool
Allow unknown chunksizes, such as come from converting from dask
dataframes. Dask.array is unable to verify that chunks line up. If
data comes from differently aligned sources then this can cause
unexpected results.
Examples
--------
Create slices
>>> import dask.array as da
>>> import numpy as np
>>> data = [from_array(np.ones((4, 4)), chunks=(2, 2))
... for i in range(3)]
>>> x = da.concatenate(data, axis=0)
>>> x.shape
(12, 4)
>>> da.concatenate(data, axis=1).shape
(4, 12)
Result is a new dask Array
See Also
--------
stack
"""
n = len(seq)
ndim = len(seq[0].shape)
if axis < 0:
axis = ndim + axis
if axis >= ndim:
msg = ("Axis must be less than than number of dimensions"
"\nData has %d dimensions, but got axis=%d")
raise ValueError(msg % (ndim, axis))
if n == 1:
return seq[0]
if (not allow_unknown_chunksizes and
not all(i == axis or all(x.shape[i] == seq[0].shape[i] for x in seq)
for i in range(ndim))):
if any(map(np.isnan, seq[0].shape)):
raise ValueError("Tried to concatenate arrays with unknown"
" shape %s. To force concatenation pass"
" allow_unknown_chunksizes=True."
> % str(seq[0].shape))
E ValueError: Tried to concatenate arrays with unknown shape (nan, 1). To force concatenation pass allow_unknown_chunksizes=True.
../../venv/lib/python3.6/site-packages/dask/array/core.py:2835: ValueError
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
No labels