[Data] Fix limit operator#34800
Conversation
|
It seems like the limit tests are still broken: |
Signed-off-by: Hao Chen <chenh1024@gmail.com>
| BlockMetadata( | ||
| num_rows=None, | ||
| size_bytes=None, | ||
| size_bytes=0, |
There was a problem hiding this comment.
If I don't change this. I get the following error.
I believe it is a bug of the test itself.
The previous implemented streaming execution for "limit" and just triggered this bug.
size_bytes isn't supposed to be None in the first place.
@ericl @c21 could you help confirm?
python/ray/data/datastream.py:2276: in take
for row in self.iter_rows():
python/ray/data/iterator.py:234: in iter_rows
for batch in self.iter_batches(**iter_batch_args):
python/ray/data/iterator.py:167: in iter_batches
block_iterator, stats, blocks_owned_by_consumer = self._to_block_iterator()
python/ray/data/_internal/iterator/iterator_impl.py:33: in _to_block_iterator
block_iterator, stats, executor = ds._plan.execute_to_iterator()
python/ray/data/_internal/plan.py:512: in execute_to_iterator
block_iter = itertools.chain([next(gen)], gen)
python/ray/data/_internal/execution/legacy_compat.py:51: in execute_to_legacy_block_iterator
bundle_iter = execute_to_legacy_bundle_iterator(
python/ray/data/_internal/execution/legacy_compat.py:80: in execute_to_legacy_bundle_iterator
dag, stats = _get_execution_dag(
python/ray/data/_internal/execution/legacy_compat.py:141: in _get_execution_dag
dag, stats = _to_operator_dag(plan, allow_clear_input_blocks)
python/ray/data/_internal/execution/legacy_compat.py:174: in _to_operator_dag
operator = _blocks_to_input_buffer(blocks, owns_blocks)
python/ray/data/_internal/execution/legacy_compat.py:235: in _blocks_to_input_buffer
output = _block_list_to_bundles(blocks, owns_blocks=owns_blocks)
python/ray/data/_internal/execution/legacy_compat.py:359: in _block_list_to_bundles
RefBundle(
<string>:7: in __init__
???
_ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
self = RefBundle(blocks=[(ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000001000000), BlockMetadata(num_rows=None, s...None, schema=None, input_files=[], exec_stats=None))], owns_blocks=False, output_split_idx=None, _cached_location=None)
def __post_init__(self):
for b in self.blocks:
assert isinstance(b, tuple), b
assert len(b) == 2, b
assert isinstance(b[0], ray.ObjectRef), b
assert isinstance(b[1], BlockMetadata), b
if b[1].size_bytes is None:
> raise ValueError(
"The size in bytes of the block must be known: {}".format(b)
)
E ValueError: The size in bytes of the block must be known: (ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000001000000), BlockMetadata(num_rows=None, size_bytes=None, schema=None, input_files=[], exec_stats=None))
There was a problem hiding this comment.
Do we know why the limit operator would trigger the bug? Is it because the previous is not using streaming executor, and right now we are using it?
There was a problem hiding this comment.
I think so - https://github.com/ray-project/ray/blob/master/python/ray/data/_internal/plan.py#L764 . So the theory makes sense.
Can we put a meaningful number here? such as 4 * n?
|
I think I can skip this test for now and re-enable it after implementing the optimization for streaming executor? |
|
This looks a regresson now with new limit operator impl. |
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Sure. Will prioritize it. |
|
Just one lint error. |
|
CI failures should be unrelated. Merging. |
## Why are these changes needed? Fixes a bug introduced by ray-project#34705 ## Related issue number ray-project#34234 Signed-off-by: Jack He <jackhe2345@gmail.com>
## Why are these changes needed? Fixes a bug introduced by ray-project#34705 ## Related issue number ray-project#34234
Why are these changes needed?
Fixes a bug introduced by #34705
Related issue number
#34234
Checks
git commit -s) in this PR.scripts/format.shto lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/under thecorresponding
.rstfile.