-
Notifications
You must be signed in to change notification settings - Fork 7.4k
[data] download op fails on invalid/missing URIs (unhandled FileNotFoundError) and then hits ZeroDivisionError in batch-size estimator #58462
Description
What happened
When using Ray Data’s download("url") expression on a dataset of image URLs:
-
If a URL is missing/invalid, the internal helper
load_uri_bytescallsfs.open_input_file(uri_path)without handlingFileNotFoundError, causing the entire pipeline to fail. -
In cases where a partition yields no successful rows (e.g., all URIs in the partition are invalid), the operator tries to estimate rows-per-partition using an empty set of sizes and crashes with a
ZeroDivisionErrorin_estimate_nrows_per_partition.
Stack excerpts (trimmed)
Unhandled FileNotFoundError path:
FileNotFoundError: https://allnewsrecorder.com/.../Diplomatic-efforts-additionally-failed-the-method-of-returning-Pakistanis-stranded-in-Dubai-started.jpg
ray.data.exceptions.SystemException
ray.exceptions.RayTaskError(FileNotFoundError): ray::Download(url)()
...
File ".../ray/data/_internal/planner/plan_download_op.py", line 192, in load_uri_bytes
with fs.open_input_file(uri_path) as f:
...
File "/home/ray/anaconda3/lib/python3.12/site-packages/pyarrow/fs.py", line 419, in open_input_file
raise FileNotFoundError(path)
FileNotFoundError: http://www.ukbikefactory.com/.../frog-69-red.jpg
(Download(url) ...) Caught exception in transforming worker! [repeated ...]
ZeroDivisionError path when all rows in a block/partition fail:
ray.exceptions.RayTaskError(ZeroDivisionError): ray::MapWorker(Partition(url)).submit()
...
File ".../ray/data/_internal/planner/plan_download_op.py", line 251, in __call__
self._batch_size_estimate = self._estimate_nrows_per_partition(block)
File ".../ray/data/_internal/planner/plan_download_op.py", line 281, in _estimate_nrows_per_partition
avg_nbytes_per_row = sum(row_sizes) / len(row_sizes)
~~~~~~~~~~~~~~~^~~~~~~~~~~~~~~~
ZeroDivisionError: division by zero
What I expected
-
Invalid/missing URIs should not crash the job by default; the operator should support an error-tolerance policy so users can:
- skip bad rows, or
- attach an error column and continue, or
- handle errors via a user-provided callback.
-
The batch-size estimator should gracefully handle empty samples (e.g., return a conservative default or fall back to a fixed batch size) rather than dividing by zero when a partition produces no successful rows.
Useful details
The failing code path for (1) is in ray.data._internal.planner.plan_download_op.load_uri_bytes:
def load_uri_bytes(uri_path_iterator):
for uri_path in uri_path_iterator:
with fs.open_input_file(uri_path) as f:
yield f.read()There’s no try/except for FileNotFoundError (from PyArrow FS), so a single bad URI aborts the pipeline.
For (2), _estimate_nrows_per_partition divides by len(row_sizes) without guarding for the case when row_sizes is empty (all rows failed), resulting in ZeroDivisionError.
Versions / Dependencies
- Ray: 2.51.1
- Python: 3.12
- Platform: Linux (Anyscale / k8s)
- Workload: dataset of mixed-validity image URLs (some 404/missing)
Reproduction script
To reproduce the ZeroDivisionError
This shows current behavior with at least one invalid URL (triggers (1)), and if a partition happens to contain only invalid URLs, it also exposes .
import ray
from ray.data import from_items
from ray.data.expressions import download # Download("url") expression
urls = [
# valid or invalid URLs (include at least one invalid/missing)
"https://www.labelsrus.com/images/products/thumbs/lip-97.jpg", # 404/missing
"https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcTc9APxkj0xClmrU3PpMZglHQkx446nQPG6lA&s",
]
ds = from_items([{"url": u} for u in urls])
# This triggers plan_download_op -> load_uri_bytes()
# batch_format/pandas not strictly required, but keeps it minimal.
ds_with_bytes = ds.with_column("bytes", download("url"))
# Force execution
print(ds_with_bytes.take(2))To reproduce FileNotFoundError error
import ray
from ray.data import from_items
from ray.data.expressions import download # Download("url") expression
urls = [
# valid or invalid URLs (include at least one invalid/missing)
"https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcTc9APxkj0xClmrU3PpMZglHQkx446nQPG6lA&s",
"https://www.labelsrus.com/images/products/thumbs/lip-97.jpg", # 404/missing
]
ds = from_items([{"url": u} for u in urls])
# This triggers plan_download_op -> load_uri_bytes()
# batch_format/pandas not strictly required, but keeps it minimal.
ds_with_bytes = ds.with_column("bytes", download("url"))
# Force execution
print(ds_with_bytes.take(2))Observed: FileNotFoundError surfaced as a RayTaskError; if a partition has zero successful rows, a ZeroDivisionError occurs in _estimate_nrows_per_partition.
Proposed fixes / enhancements
-
Error policy for
download(...): Add anon_errorparameter ("raise"|"skip"|"return_error") and plumb it through toload_uri_bytes, which should catchFileNotFoundError(and possibly other transient FS/network errors) accordingly. Default to"raise"to preserve current behavior. -
Estimator guard: In
_estimate_nrows_per_partition, handlelen(row_sizes) == 0by:- returning a conservative default nrows (e.g., 1 or a fixed small batch size), or
- bypassing estimation for that partition and using the global/default batch size.
-
(Optional) Observability: Emit counters/metrics for skipped/errored URIs and (if
"return_error") include a structured error field (message, URI) to help users filter/inspect failures downstream.
Severity
High — a single bad URL can abort the entire job; additionally, partitions with all-bad rows crash the operator with ZeroDivisionError.