Skip to content

Commit 227557d

Browse files
committed
read_parquet POC
1 parent 1c3ec9e commit 227557d

File tree

3 files changed

+495
-86
lines changed

3 files changed

+495
-86
lines changed

dask_expr/_collection.py

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4606,7 +4606,11 @@ def read_parquet(
46064606
engine=None,
46074607
**kwargs,
46084608
):
4609-
from dask_expr.io.parquet import ReadParquet, _set_parquet_engine
4609+
from dask_expr.io.parquet import (
4610+
ReadParquetFSSpec,
4611+
ReadParquetPyarrowFS,
4612+
_set_parquet_engine,
4613+
)
46104614

46114615
if not isinstance(path, str):
46124616
path = stringify_path(path)
@@ -4618,9 +4622,59 @@ def read_parquet(
46184622
col, op, val = filter
46194623
if op == "in" and not isinstance(val, (set, list, tuple)):
46204624
raise TypeError("Value of 'in' filter must be a list, set or tuple.")
4625+
from pyarrow import fs as pa_fs
4626+
4627+
if (
4628+
isinstance(filesystem, pa_fs.FileSystem)
4629+
or isinstance(filesystem, str)
4630+
and filesystem.lower() in ("arrow", "pyarrow")
4631+
):
4632+
if calculate_divisions:
4633+
raise NotImplementedError(
4634+
"calculate_divisions is not supported when using the pyarrow filesystem."
4635+
)
4636+
if metadata_task_size is not None:
4637+
raise NotImplementedError(
4638+
"metadata_task_size is not supported when using the pyarrow filesystem."
4639+
)
4640+
if split_row_groups != "infer":
4641+
raise NotImplementedError(
4642+
"split_row_groups is not supported when using the pyarrow filesystem."
4643+
)
4644+
if blocksize != "default":
4645+
raise NotImplementedError(
4646+
"blocksize is not supported when using the pyarrow filesystem."
4647+
)
4648+
if aggregate_files is not None:
4649+
raise NotImplementedError(
4650+
"aggregate_files is not supported when using the pyarrow filesystem."
4651+
)
4652+
if parquet_file_extension != (".parq", ".parquet", ".pq"):
4653+
raise NotImplementedError(
4654+
"parquet_file_extension is not supported when using the pyarrow filesystem."
4655+
)
4656+
if engine is not None:
4657+
raise NotImplementedError(
4658+
"engine is not supported when using the pyarrow filesystem."
4659+
)
4660+
4661+
return new_collection(
4662+
ReadParquetPyarrowFS(
4663+
path,
4664+
columns=_convert_to_list(columns),
4665+
filters=filters,
4666+
categories=categories,
4667+
index=index,
4668+
storage_options=storage_options,
4669+
filesystem=filesystem,
4670+
ignore_metadata_file=ignore_metadata_file,
4671+
kwargs=kwargs,
4672+
_series=isinstance(columns, str),
4673+
)
4674+
)
46214675

46224676
return new_collection(
4623-
ReadParquet(
4677+
ReadParquetFSSpec(
46244678
path,
46254679
columns=_convert_to_list(columns),
46264680
filters=filters,

0 commit comments

Comments
 (0)