Skip to content

Parquet reader create 50MB dask tasks #5357

@jdlesage

Description

@jdlesage

I am using dask 0.2.3 and pyarrow 0.14. My files are on HDFS

When using the dask method read_parquet it generates huge dask tasks. If I read several parquet files, it creates dask graph that is > 16GB. It is very slow to transfer and crashes the scheduler. My issue might be related to #5321

How to reproduce
filenames = hdfs.ls(...)
features = [....]
df = dd.read_parquet(filenames[0], columns = features, engine='pyarrow')

import cloudpickle
fun = df.__dask_graph__().values()[0]
print(len(cloudpickle.dumps(fun)), fun)

It returns

38805505 (<function read_parquet_part at 0x7facf7860ae8>, <function ArrowEngine.read_partition at 0x7facab15a0d0>, <fsspec.implementations.hdfs.PyArrowHDFS object at 0x7facaaf87f98>, Empty DataFrame Columns: [....] Index: [], ParquetDatasetPiece('/user/[....].parquet', row_group=None, partition_keys=[]), [...], None, {'partitions': None, 'categories': None})

Currently my workaround is to read parquet files using the pandas reader:
@dask.delayed
def read_parquet(filename):
return pd.read_parquet(filename, columns = features, engine='pyarrow')

df = dd.from_delayed([read_parquet(filename) for filename in filenames])

In this case every 'read_parquet' tasks is about 1KB. Computations become fast and my scheduler consumes less than 1GB.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions