[Data] Implement limit physical operator#34705
Conversation
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
e648694 to
7727fad
Compare
There was a problem hiding this comment.
Hmm this is a blocking call right? I think we want to ensure all operators are streaming compatible. Maybe there should be an op that we insert before the Limit that ensures all bundles have num_rows set, fetching it if needed.
There was a problem hiding this comment.
Actually, can we assume num_rows is always available? After the read, I think this should always be populated.
There was a problem hiding this comment.
Not sure if this is the correct way to handle stats. I just follow map_operator. Is there any docs?
There was a problem hiding this comment.
Seems reasonable, probably we should improve the docstring in interfaces.py
There was a problem hiding this comment.
We should still run it in a remote task to avoid needing to fetch the data block locally. This avoids the ray.put() later on.
There was a problem hiding this comment.
the purpose of doing this is to avoid putting too much data on the driver side? Or is there any other reason why we shouldn't do ray.put?
There was a problem hiding this comment.
Yeah, avoiding fetching large blocks to the driver is important, and it also avoids an extra data copy. If the data is split in a remote task, then only one put() happens, instead of two.
There was a problem hiding this comment.
Should this extend AbstractOneToOne instead of AbstractAllToAll?
There was a problem hiding this comment.
This is the logical operator. Not used right now, but I added this for completeness of the Datastream.limit API.
I think limit is also a kind of all-to-all logical operator. So this should make sense? Or do you prefer to have it directly extend to LogicalOperator.
There was a problem hiding this comment.
Don't have a strong opinion, but it makes sense that it should extend LogicalOperator to me and not AllToAll, since there's technically not any all:all data dependency between rows.
There was a problem hiding this comment.
Makes sense. Fixed.
python/ray/data/datastream.py
Outdated
ericl
left a comment
There was a problem hiding this comment.
LGTM assuming you want to do the early shutdown in the second PR. Existing tests will suffice for this refactor.
python/ray/data/_internal/execution/operators/limit_operator.py
Outdated
Show resolved
Hide resolved
Yep, I plan to do that in a second PR. There are multiple ways to do this. I'll comment on the original issue. Let's continue discussing there. |
Signed-off-by: Hao Chen <chenh1024@gmail.com>
Signed-off-by: Hao Chen <chenh1024@gmail.com>
196fe77 to
595d110
Compare
|
Some tests seem to be failing with "ValueError: The size in bytes of the block must be known: (ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000056010000), BlockMetadata(num_rows=10, size_bytes=None, schema=None, input_files=[], exec_stats=None))", but I think this is a bug in the test. The new streaming executor assumes that at least Previous the tests passed since we bypassed operator execution entirely with the old limit() impl. |
## Why are these changes needed? - Implemented the Limit physical operator for streaming execution. - Added the `LimitStage` for legacy compatibility. Note, currently when the limit operator reaches the limit, the upstream operators still won't stop producing data. This will be optimized in a follow-up PR. ## Related issue number ray-project#34234 Signed-off-by: Jack He <jackhe2345@gmail.com>
## Why are these changes needed? Fixes a bug introduced by ray-project#34705 ## Related issue number ray-project#34234 Signed-off-by: Jack He <jackhe2345@gmail.com>
## Why are these changes needed? - Implemented the Limit physical operator for streaming execution. - Added the `LimitStage` for legacy compatibility. Note, currently when the limit operator reaches the limit, the upstream operators still won't stop producing data. This will be optimized in a follow-up PR. ## Related issue number ray-project#34234
## Why are these changes needed? Fixes a bug introduced by ray-project#34705 ## Related issue number ray-project#34234



Why are these changes needed?
LimitStagefor legacy compatibility.Note, currently when the limit operator reaches the limit, the upstream operators still won't stop producing data. This will be optimized in a follow-up PR.
Related issue number
#34234
Checks
git commit -s) in this PR.scripts/format.shto lint the changes in this PR.method in Tune, I've added it in
doc/source/tune/api/under thecorresponding
.rstfile.