Skip to content

Commit c57115d

Browse files
GH-40142: [Python] Allow FileInfo instances to be passed to dataset init (#40143)
### Rationale for this change Closes #40142 I'm developing a new dask integration with pyarrow parquet reader (see dask/dask-expr#882) and want to rely on the pyarrow Filesystem more. Right now, we are performing a list operation ourselves to get all touched files and I would like to pass the retrieved `FileInfo` objects directly to the dataset constructor. This API is already exposed in C++ and this PR is adding the necessary python bindings. The benefit of this is that there is API is that it cuts the need to perform additional HEAD requests to a remote storage. This came up in #38389 (comment) and there's been related work already with #37857 ### What changes are included in this PR? Python bindings for the `DatasetFactory` constructor that accepts a list/vector of `FileInfo` objects. ### Are these changes tested? ~I slightly modified the minio test setup such that the prometheus endpoint is exposed. This can be used to assert that there hasn't been any HEAD requests.~ I ended up removing this again since parsing the response is a bit brittle. ### Are there any user-facing changes? * Closes: #40142 Lead-authored-by: fjetter <fjetter@users.noreply.github.com> Co-authored-by: Joris Van den Bossche <jorisvandenbossche@gmail.com> Signed-off-by: Joris Van den Bossche <jorisvandenbossche@gmail.com>
1 parent 06d841e commit c57115d

4 files changed

Lines changed: 58 additions & 10 deletions

File tree

python/pyarrow/_dataset.pyx

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3139,6 +3139,13 @@ cdef class FileSystemFactoryOptions(_Weakrefable):
31393139
self.options.selector_ignore_prefixes = [tobytes(v) for v in values]
31403140

31413141

3142+
cdef vector[CFileInfo] unwrap_finfos(finfos):
3143+
cdef vector[CFileInfo] o_vect
3144+
for fi in finfos:
3145+
o_vect.push_back((<FileInfo> fi).unwrap())
3146+
return o_vect
3147+
3148+
31423149
cdef class FileSystemDatasetFactory(DatasetFactory):
31433150
"""
31443151
Create a DatasetFactory from a list of paths with schema inspection.
@@ -3163,6 +3170,7 @@ cdef class FileSystemDatasetFactory(DatasetFactory):
31633170
FileSystemFactoryOptions options=None):
31643171
cdef:
31653172
vector[c_string] paths
3173+
vector[CFileInfo] finfos
31663174
CFileSelector c_selector
31673175
CResult[shared_ptr[CDatasetFactory]] result
31683176
shared_ptr[CFileSystem] c_filesystem
@@ -3184,14 +3192,24 @@ cdef class FileSystemDatasetFactory(DatasetFactory):
31843192
c_options
31853193
)
31863194
elif isinstance(paths_or_selector, (list, tuple)):
3187-
paths = [tobytes(s) for s in paths_or_selector]
3188-
with nogil:
3189-
result = CFileSystemDatasetFactory.MakeFromPaths(
3190-
c_filesystem,
3191-
paths,
3192-
c_format,
3193-
c_options
3194-
)
3195+
if len(paths_or_selector) > 0 and isinstance(paths_or_selector[0], FileInfo):
3196+
finfos = unwrap_finfos(paths_or_selector)
3197+
with nogil:
3198+
result = CFileSystemDatasetFactory.MakeFromFileInfos(
3199+
c_filesystem,
3200+
finfos,
3201+
c_format,
3202+
c_options
3203+
)
3204+
else:
3205+
paths = [tobytes(s) for s in paths_or_selector]
3206+
with nogil:
3207+
result = CFileSystemDatasetFactory.MakeFromPaths(
3208+
c_filesystem,
3209+
paths,
3210+
c_format,
3211+
c_options
3212+
)
31953213
else:
31963214
raise TypeError('Must pass either paths or a FileSelector, but '
31973215
'passed {}'.format(type(paths_or_selector)))

python/pyarrow/dataset.py

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -456,11 +456,22 @@ def _filesystem_dataset(source, schema=None, filesystem=None,
456456
-------
457457
FileSystemDataset
458458
"""
459+
from pyarrow.fs import LocalFileSystem, _ensure_filesystem, FileInfo
460+
459461
format = _ensure_format(format or 'parquet')
460462
partitioning = _ensure_partitioning(partitioning)
461463

462464
if isinstance(source, (list, tuple)):
463-
fs, paths_or_selector = _ensure_multiple_sources(source, filesystem)
465+
if source and isinstance(source[0], FileInfo):
466+
if filesystem is None:
467+
# fall back to local file system as the default
468+
fs = LocalFileSystem()
469+
else:
470+
# construct a filesystem if it is a valid URI
471+
fs = _ensure_filesystem(filesystem)
472+
paths_or_selector = source
473+
else:
474+
fs, paths_or_selector = _ensure_multiple_sources(source, filesystem)
464475
else:
465476
fs, paths_or_selector = _ensure_single_source(source, filesystem)
466477

@@ -767,6 +778,7 @@ def dataset(source, schema=None, format=None, filesystem=None,
767778
... dataset("local/path/to/data", format="ipc")
768779
... ]) # doctest: +SKIP
769780
"""
781+
from pyarrow.fs import FileInfo
770782
# collect the keyword arguments for later reuse
771783
kwargs = dict(
772784
schema=schema,
@@ -781,7 +793,7 @@ def dataset(source, schema=None, format=None, filesystem=None,
781793
if _is_path_like(source):
782794
return _filesystem_dataset(source, **kwargs)
783795
elif isinstance(source, (tuple, list)):
784-
if all(_is_path_like(elem) for elem in source):
796+
if all(_is_path_like(elem) or isinstance(elem, FileInfo) for elem in source):
785797
return _filesystem_dataset(source, **kwargs)
786798
elif all(isinstance(elem, Dataset) for elem in source):
787799
return _union_dataset(source, **kwargs)

python/pyarrow/includes/libarrow_dataset.pxd

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -403,3 +403,11 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
403403
shared_ptr[CFileFormat] format,
404404
CFileSystemFactoryOptions options
405405
)
406+
407+
@staticmethod
408+
CResult[shared_ptr[CDatasetFactory]] MakeFromFileInfos "Make"(
409+
shared_ptr[CFileSystem] filesystem,
410+
vector[CFileInfo] files,
411+
shared_ptr[CFileFormat] format,
412+
CFileSystemFactoryOptions options
413+
)

python/pyarrow/tests/test_dataset.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2725,6 +2725,16 @@ def test_open_dataset_from_uri_s3(s3_example_simple, dataset_reader):
27252725
assert dataset_reader.to_table(dataset).equals(table)
27262726

27272727

2728+
@pytest.mark.parquet
2729+
@pytest.mark.s3
2730+
def test_open_dataset_from_fileinfos(s3_example_simple, dataset_reader):
2731+
table, path, filesystem, uri, _, _, _, _ = s3_example_simple
2732+
selector = fs.FileSelector("mybucket")
2733+
finfos = filesystem.get_file_info(selector)
2734+
dataset = ds.dataset(finfos, format="parquet", filesystem=filesystem)
2735+
assert dataset_reader.to_table(dataset).equals(table)
2736+
2737+
27282738
@pytest.mark.parquet
27292739
@pytest.mark.s3 # still needed to create the data
27302740
def test_open_dataset_from_uri_s3_fsspec(s3_example_simple):

0 commit comments

Comments
 (0)