Skip to content

[Data] Operator receives empty input blocks (BlockMetadata with num_rows=0) #56879

@owenowenisme

Description

@owenowenisme

What happened + What you expected to happen

I noticed that the operator receives many input blocks with ‎num_rows=0 (only metadata, no rows) from upstream. This happens even with simple datasets like ‎ray.data.range(10).

Input blocks to the operators should contain rows unless there is a specific reason for upstream operators to emit empty blocks. If empty blocks are intentional, documentation on their purpose and downstream impact would be helpful.

Observed:
Many input blocks have ‎num_rows=0 and only contain metadata. This slightly impacts efficiency and requires some exception handling.

This happens to many operators e.g. zip operator and union operator

Versions / Dependencies

python: 3.12.3
ray: master branch

Reproduction script

  1. Add logging to the _add_input_inner in operators, take zip_operator as example
Image
  1. Run a simple zip script
ray.init()
ds1 = ray.data.range(10)
ds2 = ray.data.range(10)
print(ds1.zip(ds2).take_all())
# Add logging to _add_input_inner of zip_operator to see the empty input blocks
  1. Check the output and we can see there are lots of empty block
2025-09-24 04:26:49,514 INFO worker.py:1828 -- Connecting to existing Ray cluster at address: 103.122.117.152:6379...
2025-09-24 04:26:49,529 INFO worker.py:1999 -- Connected to Ray cluster. View the dashboard at http://127.0.0.1:8265/ 
/home/ubuntu/ray/.venv/lib/python3.12/site-packages/ray/_private/worker.py:2047: FutureWarning: Tip: In future versions of Ray, Ray will no longer override accelerator visible devices env var if num_gpus=0 or num_gpus=None (default). To enable this behavior and turn off this error message, set RAY_ACCEL_ENV_VAR_OVERRIDE_ON_ZERO=0
  warnings.warn(
2025-09-24 04:26:50,246 INFO logging.py:293 -- Registered dataset logger for dataset dataset_236_0
2025-09-24 04:26:50,265 INFO streaming_executor.py:159 -- Starting execution of Dataset dataset_236_0. Full logs are in /tmp/ray/session_2025-09-23_06-22-05_433649_86967/logs/ray-data
2025-09-24 04:26:50,265 INFO streaming_executor.py:160 -- Execution plan of Dataset dataset_236_0: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange], InputDataBuffer[Input] -> TaskPoolMapOperator[ReadRange] -> ZipOperator[Zip]
Running 0: 0.00 row [00:00, ? row/2025-09-24 04:26:50,276       WARNING resource_manager.py:134 -- ⚠️  Ray's object store is configured to use only 42.9% of available memory (14.8GiB out of 34.5GiB total). For optimal Ray Data performance, we recommend setting the object store to at least 50% of available memory. You can do this by setting the 'object_store_memory' parameter when calling ray.init() or by setting the RAY_DEFAULT_OBJECT_STORE_MEMORY_PROPORTION environment variable.
start printing input ref bundle                                                                                                
((ObjectRef(220c79498c0b3791ffffffffffffffffffffffff4f00000002000000), BlockMetadata(num_rows=1, size_bytes=8, exec_stats={'wall_time_s': 0.006623486056923866, 'cpu_time_s': 0.006766827000000086, 'udf_time_s': 0, 'node_id': 'adf7ac8735e9a71fd33c6451d608cebd31278eaa5fccea71d25da2f8'}, input_files=[])),)                                                                                       
end printing input ref bundle                                                                                                             
start printing input ref bundle                                                                                                           
((ObjectRef(220c79498c0b3791ffffffffffffffffffffffff4f00000004000000), BlockMetadata(num_rows=0, size_bytes=0, exec_stats={'wall_time_s': 6.6353939473629e-05, 'cpu_time_s': 6.575000000008657e-05, 'udf_time_s': 0, 'node_id': 'adf7ac8735e9a71fd33c6451d608cebd31278eaa5fccea71d25da2f8'}, input_files=[])),)                                                                                       
end printing input ref bundle                                                                                                             
start printing input ref bundle                                                                                                           
((ObjectRef(220c79498c0b3791ffffffffffffffffffffffff4f00000006000000), BlockMetadata(num_rows=0, size_bytes=0, exec_stats={'wall_time_s': 0.00013178912922739983, 'cpu_time_s': 0.0001305099999999726, 'udf_time_s': 0, 'node_id': 'adf7ac8735e9a71fd33c6451d608cebd31278eaa5fccea71d25da2f8'}, input_files=[])),)      
... and more

Issue Severity

Medium: It is a significant difficulty but I can work around it.

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions