Skip to content

Move timeseries and daily-stock to Blockwise#7615

Merged
jakirkham merged 94 commits intodask:mainfrom
rjzamora:blockwise-io-demo
Apr 30, 2021
Merged

Move timeseries and daily-stock to Blockwise#7615
jakirkham merged 94 commits intodask:mainfrom
rjzamora:blockwise-io-demo

Conversation

@rjzamora
Copy link
Member

@rjzamora rjzamora commented Apr 27, 2021

Superceeds #7237
Blocked by #7415

  • Refactors make_timeseries to use Blockwise.
  • Refactors daily_stock to use Blockwise.

jrbourbeau and others added 30 commits March 11, 2021 19:16
Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
Co-authored-by: James Bourbeau <jrbourbeau@users.noreply.github.com>
@rjzamora
Copy link
Member Author

rjzamora commented Apr 27, 2021

@quasiben @jakirkham - This PR should make it a bit easier to benchmark scheduler performance for large graphs without any significant IO or communication overhead. For example, something like the following is a reasonable way to produce a large graph that will also benefit from Blockwise-IO optimizations:

import dask
from dask.distributed import LocalCluster, wait, Client

cluster = LocalCluster()
client = Client(cluster)

ddf = dask.datasets.timeseries(
    start='2000-01-01',
    end='2003-12-31',
    freq='120s',
    partition_freq='1h',
)
s = ddf["id"] + 10
mean = s.mean()

with dask.config.set({"optimization.fuse.active": False}):
    wait(mean.persist())

This code seems to run ~20% faster for me with this PR (compared to main).

@rjzamora
Copy link
Member Author

rjzamora commented Apr 28, 2021

It seems that test_parquet.py::test_partition_on has become flaky for some reason (I've seen it fail here and in #7415 for apparently different reasons). Unfortunately, I cannot reproduce locally no matter how many times I run the test (or even all parquet tests) in a loop.

For reference, here is the latest failure
=================================== FAILURES ===================================
________________________ test_partition_on[fastparquet] ________________________
[gw0] linux -- Python 3.8.8 /usr/share/miniconda3/envs/test-environment/bin/python

tmpdir = '/tmp/pytest-of-runner/pytest-0/popen-gw0/test_partition_on_fastparquet_0'
engine = 'fastparquet'

    def test_partition_on(tmpdir, engine):
        tmpdir = str(tmpdir)
        df = pd.DataFrame(
            {
                "a1": np.random.choice(["A", "B", "C"], size=100),
                "a2": np.random.choice(["X", "Y", "Z"], size=100),
                "b": np.random.random(size=100),
                "c": np.random.randint(1, 5, size=100),
                "d": np.arange(0, 100),
            }
        )
        d = dd.from_pandas(df, npartitions=2)
        d.to_parquet(tmpdir, partition_on=["a1", "a2"], engine=engine)
        # Note #1: Cross-engine functionality is missing
        # Note #2: The index is not preserved in pyarrow when partition_on is used
        out = dd.read_parquet(
            tmpdir, engine=engine, index=False, gather_statistics=False
        ).compute()
        for val in df.a1.unique():
            assert set(df.b[df.a1 == val]) == set(out.b[out.a1 == val])
    
        # Now specify the columns and allow auto-index detection
>       out = dd.read_parquet(tmpdir, engine=engine, columns=["b", "a2"]).compute()

dask/dataframe/io/tests/test_parquet.py:1257: 
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
dask/base.py:285: in compute
    (result,) = compute(self, traverse=False, **kwargs)
dask/base.py:567: in compute
    results = schedule(dsk, keys, **kwargs)
dask/threaded.py:79: in get
    results = get_async(
dask/local.py:514: in get_async
    raise_exception(exc, tb)
dask/local.py:325: in reraise
    raise exc
dask/local.py:223: in execute_task
    result = _execute_task(task, data)
dask/core.py:121: in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
dask/optimization.py:963: in __call__
    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
dask/core.py:151: in get
    result = _execute_task(task, cache)
dask/core.py:121: in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
dask/dataframe/io/parquet/core.py:82: in __call__
    return read_parquet_part(
dask/dataframe/io/parquet/core.py:345: in read_parquet_part
    dfs = [
dask/dataframe/io/parquet/core.py:346: in <listcomp>
    func(fs, rg, columns.copy(), index, **toolz.merge(kwargs, kw))
dask/dataframe/io/parquet/fastparquet.py:801: in read_partition
    parquet_file.read_row_group_file(
/usr/share/miniconda3/envs/test-environment/lib/python3.8/site-packages/fastparquet/api.py:210: in read_row_group_file
    core.read_row_group_file(
/usr/share/miniconda3/envs/test-environment/lib/python3.8/site-packages/fastparquet/core.py:303: in read_row_group_file
    return read_row_group(f, rg, columns, categories, schema_helper, cats,
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

file = <_io.BufferedReader name='/tmp/pytest-of-runner/pytest-0/popen-gw0/test_partition_on_fastparquet_0/a1=B/a2=Y/part.0.parquet'>
rg = <class 'fastparquet.parquet_thrift.parquet.ttypes.RowGroup'>
columns: [<class 'fastparquet.parquet_thrift.parquet.ttyp...ompressed_size: 55
  total_uncompressed_size: None
  type: 2

]
num_rows: 4
sorting_columns: None
total_byte_size: 252

columns = ['b', 'a2', '__null_dask_index__'], categories = {}
schema_helper = <Parquet Schema with 5 entries>
cats = OrderedDict([('a1', ['C']), ('a2', ['X'])]), selfmade = True
index = ['__null_dask_index__']
assign = {'__null_dask_index__': array([18, 29, 34, 49]), 'a1': array([0, 0, 0, 0], dtype=int8), 'a1-catdef': ['B', 'B', 'B', 'B']
Categories (3, object): ['B', 'C', 'A'], 'a2': array([0, 0, 0, 0], dtype=int8), ...}
scheme = 'hive', partition_meta = {}

    def read_row_group(file, rg, columns, categories, schema_helper, cats,
                       selfmade=False, index=None, assign=None,
                       scheme='hive', partition_meta=None):
        """
        Access row-group in a file and read some columns into a data-frame.
        """
        partition_meta = partition_meta or {}
        if assign is None:
            raise RuntimeError('Going with pre-allocation!')
        read_row_group_arrays(file, rg, columns, categories, schema_helper,
                              cats, selfmade, assign=assign)
    
        for cat in cats:
            if scheme == 'hive':
                s = ex_from_sep('/')
                partitions = s.findall(rg.columns[0].file_path)
            else:
                partitions = [('dir%i' % i, v) for (i, v) in enumerate(
                    rg.columns[0].file_path.split('/')[:-1])]
            key, val = [p for p in partitions if p[0] == cat][0]
            val = val_to_num(val, meta=partition_meta.get(key))
>           assign[cat][:] = cats[cat].index(val)
E           ValueError: 'B' is not in list

/usr/share/miniconda3/envs/test-environment/lib/python3.8/site-packages/fastparquet/core.py:366: ValueError

@jrbourbeau
Copy link
Member

Re: the test_parquet.py::test_partition_on failure, we've seen similar things in other tests (xref #7369)

@jakirkham
Copy link
Member

Guessing this needs a rebase now that PR ( #7415 ) has gone in

@rjzamora rjzamora marked this pull request as ready for review April 29, 2021 16:27
@jakirkham jakirkham merged commit 709bd45 into dask:main Apr 30, 2021
@jakirkham
Copy link
Member

Thank you Rick! 😄

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants