Skip to content

Commit 3a6fc1f

Browse files
GH-33976: [Python] Add scan ExecNode options (#34530)
Continuing GH-34102, this adds the exec node options classes defined in the dataset module (scan, not yet write). * Issue: #33976 Authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com> Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
1 parent f1eece9 commit 3a6fc1f

File tree

2 files changed

+88
-3
lines changed

2 files changed

+88
-3
lines changed

python/pyarrow/_dataset.pyx

Lines changed: 51 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import pyarrow as pa
2929
from pyarrow.lib cimport *
3030
from pyarrow.lib import ArrowTypeError, frombytes, tobytes, _pc
3131
from pyarrow.includes.libarrow_dataset cimport *
32+
from pyarrow._acero cimport ExecNodeOptions
3233
from pyarrow._compute cimport Expression, _bind
3334
from pyarrow._compute import _forbid_instantiation
3435
from pyarrow._fs cimport FileSystem, FileSelector
@@ -37,14 +38,19 @@ from pyarrow._csv cimport (
3738
from pyarrow.util import _is_iterable, _is_path_like, _stringify_path
3839

3940

40-
_orc_fileformat = None
41-
_orc_imported = False
42-
4341
_DEFAULT_BATCH_SIZE = 2**17
4442
_DEFAULT_BATCH_READAHEAD = 16
4543
_DEFAULT_FRAGMENT_READAHEAD = 4
4644

4745

46+
# Initialise support for Datasets in ExecPlan
47+
Initialize()
48+
49+
50+
_orc_fileformat = None
51+
_orc_imported = False
52+
53+
4854
def _get_orc_fileformat():
4955
"""
5056
Import OrcFileFormat on first usage (to avoid circular import issue
@@ -3634,3 +3640,45 @@ def _filesystemdataset_write(
36343640
c_scanner = data.unwrap()
36353641
with nogil:
36363642
check_status(CFileSystemDataset.Write(c_options, c_scanner))
3643+
3644+
3645+
cdef class _ScanNodeOptions(ExecNodeOptions):
3646+
3647+
def _set_options(self, Dataset dataset, dict scan_options):
3648+
cdef:
3649+
shared_ptr[CScanOptions] c_scan_options
3650+
3651+
c_scan_options = Scanner._make_scan_options(dataset, scan_options)
3652+
3653+
self.wrapped.reset(
3654+
new CScanNodeOptions(dataset.unwrap(), c_scan_options)
3655+
)
3656+
3657+
3658+
class ScanNodeOptions(_ScanNodeOptions):
3659+
"""
3660+
A Source node which yields batches from a Dataset scan.
3661+
3662+
This is the option class for the "scan" node factory.
3663+
3664+
This node is capable of applying pushdown projections or filters
3665+
to the file readers which reduce the amount of data that needs to
3666+
be read (if supported by the file format). But note that this does not
3667+
construct associated filter or project nodes to perform the final
3668+
filtering or projection. Rather, you may supply the same filter
3669+
expression or projection to the scan node that you also supply
3670+
to the filter or project node.
3671+
3672+
Yielded batches will be augmented with fragment/batch indices to
3673+
enable stable ordering for simple ExecPlans.
3674+
3675+
Parameters
3676+
----------
3677+
dataset : pyarrow.dataset.Dataset
3678+
The table which acts as the data source.
3679+
**kwargs : dict, optional
3680+
Scan options. See `Scanner.from_dataset` for possible arguments.
3681+
"""
3682+
3683+
def __init__(self, Dataset dataset, **kwargs):
3684+
self._set_options(dataset, kwargs)

python/pyarrow/tests/test_acero.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import pyarrow as pa
2121
import pyarrow.compute as pc
2222
from pyarrow.compute import field
23+
import pyarrow.dataset as ds
2324

2425
from pyarrow._acero import (
2526
TableSourceNodeOptions,
@@ -29,6 +30,7 @@
2930
HashJoinNodeOptions,
3031
Declaration,
3132
)
33+
from pyarrow._dataset import ScanNodeOptions
3234

3335

3436
@pytest.fixture
@@ -291,3 +293,38 @@ def test_hash_join():
291293
names=["key", "a", "b"]
292294
)
293295
assert result.sort_by("a").equals(expected)
296+
297+
298+
def test_scan(tempdir):
299+
table = pa.table({'a': [1, 2, 3], 'b': [4, 5, 6]})
300+
ds.write_dataset(table, tempdir / "dataset", format="parquet")
301+
dataset = ds.dataset(tempdir / "dataset", format="parquet")
302+
decl = Declaration("scan", ScanNodeOptions(dataset))
303+
result = decl.to_table()
304+
assert result.schema.names == [
305+
"a", "b", "__fragment_index", "__batch_index",
306+
"__last_in_fragment", "__filename"
307+
]
308+
assert result.select(["a", "b"]).equals(table)
309+
310+
# using a filter only does pushdown (depending on file format), not actual filter
311+
312+
scan_opts = ScanNodeOptions(dataset, filter=field('a') > 1)
313+
decl = Declaration("scan", scan_opts)
314+
# fragment not filtered based on min/max statistics
315+
assert decl.to_table().num_rows == 3
316+
317+
scan_opts = ScanNodeOptions(dataset, filter=field('a') > 4)
318+
decl = Declaration("scan", scan_opts)
319+
# full fragment filtered based on min/max statistics
320+
assert decl.to_table().num_rows == 0
321+
322+
# projection scan option
323+
324+
scan_opts = ScanNodeOptions(dataset, columns={"a2": pc.multiply(field("a"), 2)})
325+
decl = Declaration("scan", scan_opts)
326+
result = decl.to_table()
327+
# "a" is included in the result (needed later on for the actual projection)
328+
assert result["a"].to_pylist() == [1, 2, 3]
329+
# "b" is still included, but without data as it will be removed by the projection
330+
assert pc.all(result["b"].is_null()).as_py()

0 commit comments

Comments
 (0)