Skip to content

Commit 2ea7f79

Browse files
eeroelbkietz
andauthored
GH-37857: [Python][Dataset] Expose file size to python dataset (#37868)
### Rationale for this change Allow passing known file sizes to `make_fragment`, to avoid potential network requests. ### What changes are included in this PR? ### Are these changes tested? Yes, tests with S3 that file size gets used. ### Are there any user-facing changes? Yes, new function arguments. * Closes: #37857 Lead-authored-by: Eero Lihavainen <eero.lihavainen@nitor.com> Co-authored-by: Benjamin Kietzman <bengilgit@gmail.com> Co-authored-by: Eero Lihavainen <eeelliha@gmail.com> Signed-off-by: Antoine Pitrou <antoine@python.org>
1 parent 1b5e26d commit 2ea7f79

5 files changed

Lines changed: 84 additions & 16 deletions

File tree

python/pyarrow/_dataset.pxd

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,10 @@
2222
from pyarrow.includes.common cimport *
2323
from pyarrow.includes.libarrow_dataset cimport *
2424
from pyarrow.lib cimport *
25-
from pyarrow._fs cimport FileSystem
25+
from pyarrow._fs cimport FileSystem, FileInfo
2626

2727

28-
cdef CFileSource _make_file_source(object file, FileSystem filesystem=*)
29-
28+
cdef CFileSource _make_file_source(object file, FileSystem filesystem=*, object file_size=*)
3029

3130
cdef class DatasetFactory(_Weakrefable):
3231

python/pyarrow/_dataset.pyx

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ from pyarrow.includes.libarrow_dataset cimport *
3232
from pyarrow._acero cimport ExecNodeOptions
3333
from pyarrow._compute cimport Expression, _bind
3434
from pyarrow._compute import _forbid_instantiation
35-
from pyarrow._fs cimport FileSystem, FileSelector
35+
from pyarrow._fs cimport FileSystem, FileSelector, FileInfo
3636
from pyarrow._csv cimport (
3737
ConvertOptions, ParseOptions, ReadOptions, WriteOptions)
3838
from pyarrow.util import _is_iterable, _is_path_like, _stringify_path
@@ -96,27 +96,33 @@ def _get_parquet_symbol(name):
9696
return _dataset_pq and getattr(_dataset_pq, name)
9797

9898

99-
cdef CFileSource _make_file_source(object file, FileSystem filesystem=None):
99+
cdef CFileSource _make_file_source(object file, FileSystem filesystem=None, object file_size=None):
100100

101101
cdef:
102102
CFileSource c_source
103103
shared_ptr[CFileSystem] c_filesystem
104+
CFileInfo c_info
104105
c_string c_path
105106
shared_ptr[CRandomAccessFile] c_file
106107
shared_ptr[CBuffer] c_buffer
108+
int64_t c_size
107109

108110
if isinstance(file, Buffer):
109111
c_buffer = pyarrow_unwrap_buffer(file)
110112
c_source = CFileSource(move(c_buffer))
111-
112113
elif _is_path_like(file):
113114
if filesystem is None:
114115
raise ValueError("cannot construct a FileSource from "
115116
"a path without a FileSystem")
116117
c_filesystem = filesystem.unwrap()
117118
c_path = tobytes(_stringify_path(file))
118-
c_source = CFileSource(move(c_path), move(c_filesystem))
119119

120+
if file_size is not None:
121+
c_size = file_size
122+
c_info = FileInfo(c_path, size=c_size).unwrap()
123+
c_source = CFileSource(move(c_info), move(c_filesystem))
124+
else:
125+
c_source = CFileSource(move(c_path), move(c_filesystem))
120126
elif hasattr(file, 'read'):
121127
# Optimistically hope this is file-like
122128
c_file = get_native_file(file, False).get_random_access_file()
@@ -1230,15 +1236,16 @@ cdef class FileFormat(_Weakrefable):
12301236
The schema inferred from the file
12311237
"""
12321238
cdef:
1233-
CFileSource c_source = _make_file_source(file, filesystem)
1239+
CFileSource c_source = _make_file_source(file, filesystem, file_size=None)
12341240
CResult[shared_ptr[CSchema]] c_result
12351241
with nogil:
12361242
c_result = self.format.Inspect(c_source)
12371243
c_schema = GetResultValue(c_result)
12381244
return pyarrow_wrap_schema(move(c_schema))
12391245

12401246
def make_fragment(self, file, filesystem=None,
1241-
Expression partition_expression=None):
1247+
Expression partition_expression=None,
1248+
*, file_size=None):
12421249
"""
12431250
Make a FileFragment from a given file.
12441251
@@ -1252,6 +1259,9 @@ cdef class FileFormat(_Weakrefable):
12521259
partition_expression : Expression, optional
12531260
An expression that is guaranteed true for all rows in the fragment. Allows
12541261
fragment to be potentially skipped while scanning with a filter.
1262+
file_size : int, optional
1263+
The size of the file in bytes. Can improve performance with high-latency filesystems
1264+
when file size needs to be known before reading.
12551265
12561266
Returns
12571267
-------
@@ -1260,8 +1270,7 @@ cdef class FileFormat(_Weakrefable):
12601270
"""
12611271
if partition_expression is None:
12621272
partition_expression = _true
1263-
1264-
c_source = _make_file_source(file, filesystem)
1273+
c_source = _make_file_source(file, filesystem, file_size)
12651274
c_fragment = <shared_ptr[CFragment]> GetResultValue(
12661275
self.format.MakeFragment(move(c_source),
12671276
partition_expression.unwrap(),

python/pyarrow/_dataset_parquet.pyx

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -235,7 +235,7 @@ cdef class ParquetFileFormat(FileFormat):
235235
return f"<ParquetFileFormat read_options={self.read_options}>"
236236

237237
def make_fragment(self, file, filesystem=None,
238-
Expression partition_expression=None, row_groups=None):
238+
Expression partition_expression=None, row_groups=None, *, file_size=None):
239239
"""
240240
Make a FileFragment from a given file.
241241
@@ -251,6 +251,9 @@ cdef class ParquetFileFormat(FileFormat):
251251
fragment to be potentially skipped while scanning with a filter.
252252
row_groups : Iterable, optional
253253
The indices of the row groups to include
254+
file_size : int, optional
255+
The size of the file in bytes. Can improve performance with high-latency filesystems
256+
when file size needs to be known before reading.
254257
255258
Returns
256259
-------
@@ -259,15 +262,13 @@ cdef class ParquetFileFormat(FileFormat):
259262
"""
260263
cdef:
261264
vector[int] c_row_groups
262-
263265
if partition_expression is None:
264266
partition_expression = _true
265-
266267
if row_groups is None:
267268
return super().make_fragment(file, filesystem,
268-
partition_expression)
269+
partition_expression, file_size=file_size)
269270

270-
c_source = _make_file_source(file, filesystem)
271+
c_source = _make_file_source(file, filesystem, file_size)
271272
c_row_groups = [<int> row_group for row_group in set(row_groups)]
272273

273274
c_fragment = <shared_ptr[CFragment]> GetResultValue(

python/pyarrow/includes/libarrow_dataset.pxd

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
178178
const c_string& path() const
179179
const shared_ptr[CFileSystem]& filesystem() const
180180
const shared_ptr[CBuffer]& buffer() const
181+
const int64_t size() const
181182
# HACK: Cython can't handle all the overloads so don't declare them.
182183
# This means invalid construction of CFileSource won't be caught in
183184
# the C++ generation phase (though it will still be caught when

python/pyarrow/tests/test_dataset.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -988,6 +988,64 @@ def test_make_fragment(multisourcefs):
988988
assert row_group_fragment.row_groups == [0]
989989

990990

991+
@pytest.mark.parquet
992+
@pytest.mark.s3
993+
def test_make_fragment_with_size(s3_example_simple):
994+
"""
995+
Test passing file_size to make_fragment. Not all FS implementations make use
996+
of the file size (by implementing an OpenInputFile that takes a FileInfo), but
997+
s3 does, which is why it's used here.
998+
"""
999+
table, path, fs, uri, host, port, access_key, secret_key = s3_example_simple
1000+
1001+
file_format = ds.ParquetFileFormat()
1002+
paths = [path]
1003+
1004+
fragments = [file_format.make_fragment(path, fs)
1005+
for path in paths]
1006+
dataset = ds.FileSystemDataset(
1007+
fragments, format=file_format, schema=table.schema, filesystem=fs
1008+
)
1009+
1010+
tbl = dataset.to_table()
1011+
assert tbl.equals(table)
1012+
1013+
# true sizes -> works
1014+
sizes_true = [dataset.filesystem.get_file_info(x).size for x in dataset.files]
1015+
fragments_with_size = [file_format.make_fragment(path, fs, file_size=size)
1016+
for path, size in zip(paths, sizes_true)]
1017+
dataset_with_size = ds.FileSystemDataset(
1018+
fragments_with_size, format=file_format, schema=table.schema, filesystem=fs
1019+
)
1020+
tbl = dataset.to_table()
1021+
assert tbl.equals(table)
1022+
1023+
# too small sizes -> error
1024+
sizes_toosmall = [1 for path in paths]
1025+
fragments_with_size = [file_format.make_fragment(path, fs, file_size=size)
1026+
for path, size in zip(paths, sizes_toosmall)]
1027+
1028+
dataset_with_size = ds.FileSystemDataset(
1029+
fragments_with_size, format=file_format, schema=table.schema, filesystem=fs
1030+
)
1031+
1032+
with pytest.raises(pyarrow.lib.ArrowInvalid, match='Parquet file size is 1 bytes'):
1033+
table = dataset_with_size.to_table()
1034+
1035+
# too large sizes -> error
1036+
sizes_toolarge = [1000000 for path in paths]
1037+
fragments_with_size = [file_format.make_fragment(path, fs, file_size=size)
1038+
for path, size in zip(paths, sizes_toolarge)]
1039+
1040+
dataset_with_size = ds.FileSystemDataset(
1041+
fragments_with_size, format=file_format, schema=table.schema, filesystem=fs
1042+
)
1043+
1044+
# invalid range
1045+
with pytest.raises(OSError, match='HTTP status 416'):
1046+
table = dataset_with_size.to_table()
1047+
1048+
9911049
def test_make_csv_fragment_from_buffer(dataset_reader, pickle_module):
9921050
content = textwrap.dedent("""
9931051
alpha,num,animal

0 commit comments

Comments
 (0)