[data] support generator udf for map_groups#58039
[data] support generator udf for map_groups#58039richardliaw merged 4 commits intoray-project:masterfrom
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces a valuable enhancement by adding support for generator UDFs in map_groups. This change can significantly reduce memory consumption for UDFs that produce large outputs. The implementation in _apply_udf_to_groups is clean and correctly handles both single DataBatch and Iterator[DataBatch] return types. The new test case test_map_groups_generator_udf is comprehensive and effectively validates the new functionality. I have one minor suggestion to improve code clarity by updating a type hint to align with this new capability.
Bug: UDF Type Checking Fails on Older Python VersionsThe |
Bug: Runtime Error with Typing GenericsThe |
jectpro7
left a comment
There was a problem hiding this comment.
thanks for the PR, overall looks good.
| @@ -1,3 +1,4 @@ | |||
| from collections.abc import Iterator as IteratorABC | |||
There was a problem hiding this comment.
The collection.abc.Iterator is same with the typing.Iterator, just reuse it.
Signed-off-by: my-vegetable-has-exploded <wy1109468038@gmail.com>
Signed-off-by: my-vegetable-has-exploded <wy1109468038@gmail.com>
Signed-off-by: my-vegetable-has-exploded <wy1109468038@gmail.com>
Signed-off-by: my-vegetable-has-exploded <wy1109468038@gmail.com>
Head branch was pushed to by a user without write access
18854f5 to
85df154
Compare
## Description This pr support return a generator object from map_groups UDF. if UDF have a large output , we return iterator to reduce memory cost. ## Related issues Close ray-project#57935 ## Additional information This change centers on the `_apply_udf_to_groups` helper function within the file ray/data/grouped_data.py. `map_groups` internally calls map_batches, providing a wrapper function (wrapped_fn) that in turn calls `_apply_udf_to_groups` to apply the user's UDF to each group. The key modification is that instead of directly yielding the UDF's return value, the logic now inspects the result first. If the result is an Iterator, it is consumed with `yield from` to produce each data batch individually. If it is not an iterator, the single data batch is yielded directly, preserving the original behavior. --------- Signed-off-by: my-vegetable-has-exploded <wy1109468038@gmail.com> Signed-off-by: Aydin Abiar <aydin@anyscale.com>
## Description This pr support return a generator object from map_groups UDF. if UDF have a large output , we return iterator to reduce memory cost. ## Related issues Close ray-project#57935 ## Additional information This change centers on the `_apply_udf_to_groups` helper function within the file ray/data/grouped_data.py. `map_groups` internally calls map_batches, providing a wrapper function (wrapped_fn) that in turn calls `_apply_udf_to_groups` to apply the user's UDF to each group. The key modification is that instead of directly yielding the UDF's return value, the logic now inspects the result first. If the result is an Iterator, it is consumed with `yield from` to produce each data batch individually. If it is not an iterator, the single data batch is yielded directly, preserving the original behavior. --------- Signed-off-by: my-vegetable-has-exploded <wy1109468038@gmail.com> Signed-off-by: YK <1811651+ykdojo@users.noreply.github.com>
## Description This pr support return a generator object from map_groups UDF. if UDF have a large output , we return iterator to reduce memory cost. ## Related issues Close ray-project#57935 ## Additional information This change centers on the `_apply_udf_to_groups` helper function within the file ray/data/grouped_data.py. `map_groups` internally calls map_batches, providing a wrapper function (wrapped_fn) that in turn calls `_apply_udf_to_groups` to apply the user's UDF to each group. The key modification is that instead of directly yielding the UDF's return value, the logic now inspects the result first. If the result is an Iterator, it is consumed with `yield from` to produce each data batch individually. If it is not an iterator, the single data batch is yielded directly, preserving the original behavior. --------- Signed-off-by: my-vegetable-has-exploded <wy1109468038@gmail.com>
## Description This pr support return a generator object from map_groups UDF. if UDF have a large output , we return iterator to reduce memory cost. ## Related issues Close ray-project#57935 ## Additional information This change centers on the `_apply_udf_to_groups` helper function within the file ray/data/grouped_data.py. `map_groups` internally calls map_batches, providing a wrapper function (wrapped_fn) that in turn calls `_apply_udf_to_groups` to apply the user's UDF to each group. The key modification is that instead of directly yielding the UDF's return value, the logic now inspects the result first. If the result is an Iterator, it is consumed with `yield from` to produce each data batch individually. If it is not an iterator, the single data batch is yielded directly, preserving the original behavior. --------- Signed-off-by: my-vegetable-has-exploded <wy1109468038@gmail.com> Signed-off-by: Future-Outlier <eric901201@gmail.com>
## Description This pr support return a generator object from map_groups UDF. if UDF have a large output , we return iterator to reduce memory cost. ## Related issues Close ray-project#57935 ## Additional information This change centers on the `_apply_udf_to_groups` helper function within the file ray/data/grouped_data.py. `map_groups` internally calls map_batches, providing a wrapper function (wrapped_fn) that in turn calls `_apply_udf_to_groups` to apply the user's UDF to each group. The key modification is that instead of directly yielding the UDF's return value, the logic now inspects the result first. If the result is an Iterator, it is consumed with `yield from` to produce each data batch individually. If it is not an iterator, the single data batch is yielded directly, preserving the original behavior. --------- Signed-off-by: my-vegetable-has-exploded <wy1109468038@gmail.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
Description
This pr support return a generator object from map_groups UDF. if UDF have a large output , we return iterator to reduce memory cost.
Related issues
Close #57935
Additional information
This change centers on the
_apply_udf_to_groupshelper function within the file ray/data/grouped_data.py.map_groupsinternally calls map_batches, providing a wrapper function (wrapped_fn) that in turn calls_apply_udf_to_groupsto apply the user's UDF to each group.The key modification is that instead of directly yielding the UDF's return value, the logic now inspects the result first. If the result is an Iterator, it is consumed with
yield fromto produce each data batch individually. If it is not an iterator, the single data batch is yielded directly, preserving the original behavior.