-
-
Notifications
You must be signed in to change notification settings - Fork 1.9k
Description
I am using dask 0.2.3 and pyarrow 0.14. My files are on HDFS
When using the dask method read_parquet it generates huge dask tasks. If I read several parquet files, it creates dask graph that is > 16GB. It is very slow to transfer and crashes the scheduler. My issue might be related to #5321
How to reproduce
filenames = hdfs.ls(...)
features = [....]
df = dd.read_parquet(filenames[0], columns = features, engine='pyarrow')
import cloudpickle
fun = df.__dask_graph__().values()[0]
print(len(cloudpickle.dumps(fun)), fun)
It returns
38805505 (<function read_parquet_part at 0x7facf7860ae8>, <function ArrowEngine.read_partition at 0x7facab15a0d0>, <fsspec.implementations.hdfs.PyArrowHDFS object at 0x7facaaf87f98>, Empty DataFrame Columns: [....] Index: [], ParquetDatasetPiece('/user/[....].parquet', row_group=None, partition_keys=[]), [...], None, {'partitions': None, 'categories': None})
Currently my workaround is to read parquet files using the pandas reader:
@dask.delayed
def read_parquet(filename):
return pd.read_parquet(filename, columns = features, engine='pyarrow')
df = dd.from_delayed([read_parquet(filename) for filename in filenames])
In this case every 'read_parquet' tasks is about 1KB. Computations become fast and my scheduler consumes less than 1GB.