Move timeseries and daily-stock to Blockwise#7615
Conversation
…shuffle-avoid-pd-import
Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
…ask_distributed_unpack__
Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
|
@quasiben @jakirkham - This PR should make it a bit easier to benchmark scheduler performance for large graphs without any significant IO or communication overhead. For example, something like the following is a reasonable way to produce a large graph that will also benefit from Blockwise-IO optimizations: import dask
from dask.distributed import LocalCluster, wait, Client
cluster = LocalCluster()
client = Client(cluster)
ddf = dask.datasets.timeseries(
start='2000-01-01',
end='2003-12-31',
freq='120s',
partition_freq='1h',
)
s = ddf["id"] + 10
mean = s.mean()
with dask.config.set({"optimization.fuse.active": False}):
wait(mean.persist())This code seems to run ~20% faster for me with this PR (compared to |
|
It seems that For reference, here is the latest failure=================================== FAILURES ===================================
________________________ test_partition_on[fastparquet] ________________________
[gw0] linux -- Python 3.8.8 /usr/share/miniconda3/envs/test-environment/bin/python
tmpdir = '/tmp/pytest-of-runner/pytest-0/popen-gw0/test_partition_on_fastparquet_0'
engine = 'fastparquet'
def test_partition_on(tmpdir, engine):
tmpdir = str(tmpdir)
df = pd.DataFrame(
{
"a1": np.random.choice(["A", "B", "C"], size=100),
"a2": np.random.choice(["X", "Y", "Z"], size=100),
"b": np.random.random(size=100),
"c": np.random.randint(1, 5, size=100),
"d": np.arange(0, 100),
}
)
d = dd.from_pandas(df, npartitions=2)
d.to_parquet(tmpdir, partition_on=["a1", "a2"], engine=engine)
# Note #1: Cross-engine functionality is missing
# Note #2: The index is not preserved in pyarrow when partition_on is used
out = dd.read_parquet(
tmpdir, engine=engine, index=False, gather_statistics=False
).compute()
for val in df.a1.unique():
assert set(df.b[df.a1 == val]) == set(out.b[out.a1 == val])
# Now specify the columns and allow auto-index detection
> out = dd.read_parquet(tmpdir, engine=engine, columns=["b", "a2"]).compute()
dask/dataframe/io/tests/test_parquet.py:1257:
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
dask/base.py:285: in compute
(result,) = compute(self, traverse=False, **kwargs)
dask/base.py:567: in compute
results = schedule(dsk, keys, **kwargs)
dask/threaded.py:79: in get
results = get_async(
dask/local.py:514: in get_async
raise_exception(exc, tb)
dask/local.py:325: in reraise
raise exc
dask/local.py:223: in execute_task
result = _execute_task(task, data)
dask/core.py:121: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
dask/optimization.py:963: in __call__
return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
dask/core.py:151: in get
result = _execute_task(task, cache)
dask/core.py:121: in _execute_task
return func(*(_execute_task(a, cache) for a in args))
dask/dataframe/io/parquet/core.py:82: in __call__
return read_parquet_part(
dask/dataframe/io/parquet/core.py:345: in read_parquet_part
dfs = [
dask/dataframe/io/parquet/core.py:346: in <listcomp>
func(fs, rg, columns.copy(), index, **toolz.merge(kwargs, kw))
dask/dataframe/io/parquet/fastparquet.py:801: in read_partition
parquet_file.read_row_group_file(
/usr/share/miniconda3/envs/test-environment/lib/python3.8/site-packages/fastparquet/api.py:210: in read_row_group_file
core.read_row_group_file(
/usr/share/miniconda3/envs/test-environment/lib/python3.8/site-packages/fastparquet/core.py:303: in read_row_group_file
return read_row_group(f, rg, columns, categories, schema_helper, cats,
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
file = <_io.BufferedReader name='/tmp/pytest-of-runner/pytest-0/popen-gw0/test_partition_on_fastparquet_0/a1=B/a2=Y/part.0.parquet'>
rg = <class 'fastparquet.parquet_thrift.parquet.ttypes.RowGroup'>
columns: [<class 'fastparquet.parquet_thrift.parquet.ttyp...ompressed_size: 55
total_uncompressed_size: None
type: 2
]
num_rows: 4
sorting_columns: None
total_byte_size: 252
columns = ['b', 'a2', '__null_dask_index__'], categories = {}
schema_helper = <Parquet Schema with 5 entries>
cats = OrderedDict([('a1', ['C']), ('a2', ['X'])]), selfmade = True
index = ['__null_dask_index__']
assign = {'__null_dask_index__': array([18, 29, 34, 49]), 'a1': array([0, 0, 0, 0], dtype=int8), 'a1-catdef': ['B', 'B', 'B', 'B']
Categories (3, object): ['B', 'C', 'A'], 'a2': array([0, 0, 0, 0], dtype=int8), ...}
scheme = 'hive', partition_meta = {}
def read_row_group(file, rg, columns, categories, schema_helper, cats,
selfmade=False, index=None, assign=None,
scheme='hive', partition_meta=None):
"""
Access row-group in a file and read some columns into a data-frame.
"""
partition_meta = partition_meta or {}
if assign is None:
raise RuntimeError('Going with pre-allocation!')
read_row_group_arrays(file, rg, columns, categories, schema_helper,
cats, selfmade, assign=assign)
for cat in cats:
if scheme == 'hive':
s = ex_from_sep('/')
partitions = s.findall(rg.columns[0].file_path)
else:
partitions = [('dir%i' % i, v) for (i, v) in enumerate(
rg.columns[0].file_path.split('/')[:-1])]
key, val = [p for p in partitions if p[0] == cat][0]
val = val_to_num(val, meta=partition_meta.get(key))
> assign[cat][:] = cats[cat].index(val)
E ValueError: 'B' is not in list
/usr/share/miniconda3/envs/test-environment/lib/python3.8/site-packages/fastparquet/core.py:366: ValueError |
|
Re: the |
|
Guessing this needs a rebase now that PR ( #7415 ) has gone in |
|
Thank you Rick! 😄 |
Superceeds #7237
Blocked by #7415make_timeseriesto useBlockwise.daily_stockto useBlockwise.