Skip to content

[data] support generator udf for map_groups#58039

Merged
richardliaw merged 4 commits intoray-project:masterfrom
my-vegetable-has-exploded:map-group-iter
Nov 15, 2025
Merged

[data] support generator udf for map_groups#58039
richardliaw merged 4 commits intoray-project:masterfrom
my-vegetable-has-exploded:map-group-iter

Conversation

@my-vegetable-has-exploded
Copy link
Copy Markdown
Contributor

Description

This pr support return a generator object from map_groups UDF. if UDF have a large output , we return iterator to reduce memory cost.

Related issues

Close #57935

Additional information

This change centers on the _apply_udf_to_groups helper function within the file ray/data/grouped_data.py.

map_groups internally calls map_batches, providing a wrapper function (wrapped_fn) that in turn calls _apply_udf_to_groups to apply the user's UDF to each group.

The key modification is that instead of directly yielding the UDF's return value, the logic now inspects the result first. If the result is an Iterator, it is consumed with yield from to produce each data batch individually. If it is not an iterator, the single data batch is yielded directly, preserving the original behavior.

@my-vegetable-has-exploded my-vegetable-has-exploded requested a review from a team as a code owner October 23, 2025 08:34
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a valuable enhancement by adding support for generator UDFs in map_groups. This change can significantly reduce memory consumption for UDFs that produce large outputs. The implementation in _apply_udf_to_groups is clean and correctly handles both single DataBatch and Iterator[DataBatch] return types. The new test case test_map_groups_generator_udf is comprehensive and effectively validates the new functionality. I have one minor suggestion to improve code clarity by updating a type hint to align with this new capability.

@cursor
Copy link
Copy Markdown

cursor bot commented Oct 23, 2025

Bug: UDF Type Checking Fails on Older Python Versions

The isinstance check for UDF results uses typing.Iterator, which is a generic alias not suitable for runtime type checking. This causes a TypeError on Python versions prior to 3.10, preventing the correct detection of iterator or generator objects.

Fix in Cursor Fix in Web

@cursor
Copy link
Copy Markdown

cursor bot commented Oct 23, 2025

Bug: Runtime Error with Typing Generics

The isinstance(result, Iterator) check uses typing.Iterator, which is a generic type alias. On Python 3.7-3.9, this raises a TypeError because typing generics aren't designed for runtime checks. This prevents UDFs returning generators from being correctly unpacked, causing map_groups to fail or produce incorrect results.

Fix in Cursor Fix in Web

@ray-gardener ray-gardener bot added data Ray Data-related issues community-contribution Contributed by the community labels Oct 23, 2025
cursor[bot]

This comment was marked as outdated.

Copy link
Copy Markdown
Contributor

@jectpro7 jectpro7 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for the PR, overall looks good.

@@ -1,3 +1,4 @@
from collections.abc import Iterator as IteratorABC
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The collection.abc.Iterator is same with the typing.Iterator, just reuse it.

@raulchen raulchen enabled auto-merge (squash) November 12, 2025 18:54
@github-actions github-actions bot added the go add ONLY when ready to merge, run all tests label Nov 12, 2025
Signed-off-by: my-vegetable-has-exploded <wy1109468038@gmail.com>
Signed-off-by: my-vegetable-has-exploded <wy1109468038@gmail.com>
Signed-off-by: my-vegetable-has-exploded <wy1109468038@gmail.com>
Signed-off-by: my-vegetable-has-exploded <wy1109468038@gmail.com>
auto-merge was automatically disabled November 13, 2025 03:45

Head branch was pushed to by a user without write access

@richardliaw richardliaw merged commit eede46f into ray-project:master Nov 15, 2025
6 checks passed
Aydin-ab pushed a commit to Aydin-ab/ray-aydin that referenced this pull request Nov 19, 2025
## Description

This pr support return a generator object from map_groups UDF. if UDF
have a large output , we return iterator to reduce memory cost.

## Related issues

Close ray-project#57935

## Additional information

This change centers on the `_apply_udf_to_groups` helper function within
the file ray/data/grouped_data.py.

`map_groups` internally calls map_batches, providing a wrapper function
(wrapped_fn) that in turn calls `_apply_udf_to_groups` to apply the
user's UDF to each group.

The key modification is that instead of directly yielding the UDF's
return value, the logic now inspects the result first. If the result is
an Iterator, it is consumed with `yield from` to produce each data batch
individually. If it is not an iterator, the single data batch is yielded
directly, preserving the original behavior.

---------

Signed-off-by: my-vegetable-has-exploded <wy1109468038@gmail.com>
Signed-off-by: Aydin Abiar <aydin@anyscale.com>
ykdojo pushed a commit to ykdojo/ray that referenced this pull request Nov 27, 2025
## Description

This pr support return a generator object from map_groups UDF. if UDF
have a large output , we return iterator to reduce memory cost.

## Related issues

Close ray-project#57935

## Additional information

This change centers on the `_apply_udf_to_groups` helper function within
the file ray/data/grouped_data.py.

`map_groups` internally calls map_batches, providing a wrapper function
(wrapped_fn) that in turn calls `_apply_udf_to_groups` to apply the
user's UDF to each group.

The key modification is that instead of directly yielding the UDF's
return value, the logic now inspects the result first. If the result is
an Iterator, it is consumed with `yield from` to produce each data batch
individually. If it is not an iterator, the single data batch is yielded
directly, preserving the original behavior.

---------

Signed-off-by: my-vegetable-has-exploded <wy1109468038@gmail.com>
Signed-off-by: YK <1811651+ykdojo@users.noreply.github.com>
SheldonTsen pushed a commit to SheldonTsen/ray that referenced this pull request Dec 1, 2025
## Description

This pr support return a generator object from map_groups UDF. if UDF
have a large output , we return iterator to reduce memory cost.

## Related issues

Close ray-project#57935

## Additional information

This change centers on the `_apply_udf_to_groups` helper function within
the file ray/data/grouped_data.py.

`map_groups` internally calls map_batches, providing a wrapper function
(wrapped_fn) that in turn calls `_apply_udf_to_groups` to apply the
user's UDF to each group.

The key modification is that instead of directly yielding the UDF's
return value, the logic now inspects the result first. If the result is
an Iterator, it is consumed with `yield from` to produce each data batch
individually. If it is not an iterator, the single data batch is yielded
directly, preserving the original behavior.

---------

Signed-off-by: my-vegetable-has-exploded <wy1109468038@gmail.com>
Future-Outlier pushed a commit to Future-Outlier/ray that referenced this pull request Dec 7, 2025
## Description

This pr support return a generator object from map_groups UDF. if UDF
have a large output , we return iterator to reduce memory cost.

## Related issues

Close ray-project#57935

## Additional information

This change centers on the `_apply_udf_to_groups` helper function within
the file ray/data/grouped_data.py.

`map_groups` internally calls map_batches, providing a wrapper function
(wrapped_fn) that in turn calls `_apply_udf_to_groups` to apply the
user's UDF to each group.

The key modification is that instead of directly yielding the UDF's
return value, the logic now inspects the result first. If the result is
an Iterator, it is consumed with `yield from` to produce each data batch
individually. If it is not an iterator, the single data batch is yielded
directly, preserving the original behavior.

---------

Signed-off-by: my-vegetable-has-exploded <wy1109468038@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
## Description

This pr support return a generator object from map_groups UDF. if UDF
have a large output , we return iterator to reduce memory cost.

## Related issues

Close ray-project#57935

## Additional information

This change centers on the `_apply_udf_to_groups` helper function within
the file ray/data/grouped_data.py.

`map_groups` internally calls map_batches, providing a wrapper function
(wrapped_fn) that in turn calls `_apply_udf_to_groups` to apply the
user's UDF to each group.

The key modification is that instead of directly yielding the UDF's
return value, the logic now inspects the result first. If the result is
an Iterator, it is consumed with `yield from` to produce each data batch
individually. If it is not an iterator, the single data batch is yielded
directly, preserving the original behavior.

---------

Signed-off-by: my-vegetable-has-exploded <wy1109468038@gmail.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[data] returning iterators from map_groups

5 participants