Skip to content

[Data] Enable expressions for grouped with_column in Ray Data#58231

Merged
bveeramani merged 12 commits intoray-project:masterfrom
YoussefEssDS:grouped-with-column
Nov 18, 2025
Merged

[Data] Enable expressions for grouped with_column in Ray Data#58231
bveeramani merged 12 commits intoray-project:masterfrom
YoussefEssDS:grouped-with-column

Conversation

@YoussefEssDS
Copy link
Copy Markdown
Contributor

Description

This PR will:
1- Introduce GroupedData.with_column, allowing grouped datasets to evaluate Ray Data expressions per group while preserving existing columns.
2- Validate the supplied expression type (reject non‑Expr and DownloadExpr since the expression evaluator can’t visit downloads as far as I understand) and reuse the projection engine so grouping flows stay aligned with the dataset-level expression API.
3- Add tests for grouped expression usage through udf-based and arithmetic expressions.

Related issues

Closes #57907

Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
@YoussefEssDS YoussefEssDS requested a review from a team as a code owner October 28, 2025 01:20
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces GroupedData.with_column, which allows applying expressions to grouped data. The implementation is clean, reusing existing components like map_groups and eval_projection. The added tests cover both UDF-based and arithmetic expressions, ensuring the new feature is well-tested. My feedback includes a minor suggestion to improve code clarity in the tests by removing a redundant dataset creation.

Comment on lines +152 to +159
ds = ray.data.from_items(
[
{"group": 1, "value": 1},
{"group": 1, "value": 2},
{"group": 2, "value": 3},
{"group": 2, "value": 4},
]
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

To improve code clarity and avoid redundancy, you can remove this duplicate dataset creation. The ds variable is already defined with the same data earlier in the test and can be reused for the second assertion.

@ray-gardener ray-gardener bot added data Ray Data-related issues community-contribution Contributed by the community labels Oct 28, 2025
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
@YoussefEssDS
Copy link
Copy Markdown
Contributor Author

Hi @alexeykudinkin do you have any further suggestions? Thanks!

@YoussefEssDS
Copy link
Copy Markdown
Contributor Author

Hi @goutamvenkat-anyscale can you please give this a review? Thanks

raise TypeError(
"expr must be a Ray Data expression created via the expression API."
)
if isinstance(expr, DownloadExpr):
Copy link
Copy Markdown
Contributor

@goutamvenkat-anyscale goutamvenkat-anyscale Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the contribution!

Curious why enforce this restriction?

I can imagine a case where we have ds.groupby('uri_prefix').with_column('bytes', download('uri')).. (Say we want to handle separate handling per group)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review, @goutamvenkat-anyscale! As far as I understand, in the current path, grouped with_column goes through eval_projection, which delegates to NativeExpressionEvaluator. That evaluator doesn’t support DownloadExpr (from calling download("uri")), so allowing it right now just leads to a late TypeError. I surfaced the restriction here to make things more clear. wdyt?

Copy link
Copy Markdown
Contributor

@goutamvenkat-anyscale goutamvenkat-anyscale Oct 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah we need to unify the expression machinery... Can you please paste the full TypeError here? Just curious whether the error is descriptive or not

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure! The TypeError in the expression evaluator is as follows:
"DownloadExpr evaluation is not yet implemented in NativeExpressionEvaluator."

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think the expression evaluator error is descriptive enough (i.e., we can drop the restriction in grouped_data)? Or we keep it for further clarity? wdyt @goutamvenkat-anyscale ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's leave out this check for now. We're in the process of adding Actor support for UDFExpr and DownloadExpr will be refactored to through the eval_projection flow.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! Can you please check if it's good to go? Thanks.

@YoussefEssDS YoussefEssDS force-pushed the grouped-with-column branch 2 times, most recently from 847771d to 6625d9f Compare October 31, 2025 00:28
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
@YoussefEssDS
Copy link
Copy Markdown
Contributor Author

@goutamvenkat-anyscale PTAL. Thanks!

Copy link
Copy Markdown
Contributor

@goutamvenkat-anyscale goutamvenkat-anyscale left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks for your contribution

@goutamvenkat-anyscale goutamvenkat-anyscale added the go add ONLY when ready to merge, run all tests label Nov 3, 2025
@YoussefEssDS
Copy link
Copy Markdown
Contributor Author

Hi @bveeramani, just bumping this. It's approved with the 'go' tag and ready to merge. Thanks!

Comment on lines +345 to +348
from ray.data._internal.planner.plan_expression.expression_evaluator import (
eval_projection,
)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Is this import prone to circular import errors? If not, move to top of file for consistency with PEP8/Google convention?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It causes the doc build to fail, since the evaluator requires pyarrow. wdyt? Keep the import inside for a green doc build or move it to top?

Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
fn: UserDefinedFunction[DataBatch, DataBatch],
*,
zero_copy_batch: bool = False,
zero_copy_batch: bool = True,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Breaking Change: Batches Become Read-Only

Changing the default value of zero_copy_batch from False to True in map_groups breaks existing code that mutates input batches. For example, the train_test_split method's add_train_flag function mutates the batch by adding a column, which fails with read-only errors when zero_copy_batch=True. This is a breaking API change that affects all existing map_groups calls that modify their input.

Fix in Cursor Fix in Web

Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
Copy link
Copy Markdown
Member

@bveeramani bveeramani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM with some minor comments.

For the doc failure -- if moving the import inside the function resolves the failure, I think that's okay to unblock this PR

memory=memory,
concurrency=concurrency,
udf_modifying_row_count=True,
udf_modifying_row_count=False,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Map groups can change the row count, right?

Comment on lines +330 to +331
>>> ds.groupby("group").with_column("value_twice", col("value") * 2).sort(["group", "value"]).take_all() # doctest: +SKIP
[{'group': 1, 'value': 1, 'value_twice': 2}, {'group': 1, 'value': 2, 'value_twice': 4}]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if you test this (i.e., remove # docetest +SKIP)?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally, I think we should avoid skipping these to prevent them from breaking

Comment on lines +329 to +331
>>> ds = ray.data.from_items([{"group": 1, "value": 1}, {"group": 1, "value": 2}])
>>> ds.groupby("group").with_column("value_twice", col("value") * 2).sort(["group", "value"]).take_all() # doctest: +SKIP
[{'group': 1, 'value': 1, 'value_twice': 2}, {'group': 1, 'value': 2, 'value_twice': 4}]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you format this snippet? I think it might be hard to read in the rendered documentation

Suggested change
>>> ds = ray.data.from_items([{"group": 1, "value": 1}, {"group": 1, "value": 2}])
>>> ds.groupby("group").with_column("value_twice", col("value") * 2).sort(["group", "value"]).take_all() # doctest: +SKIP
[{'group': 1, 'value': 1, 'value_twice': 2}, {'group': 1, 'value': 2, 'value_twice': 4}]
>>> ds = (
... ray.data.from_items([{"group": 1, "value": 1}, {"group": 1, "value": 2}])
... .groupby("group")
... .with_column("value_twice", col("value") * 2)
... .sort(["group", "value"])
... )
>>> ds.take_all() # doctest: +SKIP
[{'group': 1, 'value': 1, 'value_twice': 2}, {'group': 1, 'value': 2, 'value_twice': 4}]

Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
@YoussefEssDS
Copy link
Copy Markdown
Contributor Author

Hi @bveeramani any idea how to unblock this CI error:
Error response from daemon: client version 1.52 is too new. Maximum supported API version is 1.43

@bveeramani
Copy link
Copy Markdown
Member

Hi @bveeramani any idea how to unblock this CI error: Error response from daemon: client version 1.52 is too new. Maximum supported API version is 1.43

Huh, that's weird. Just updated the branch. Let's see if that fixes it

@YoussefEssDS
Copy link
Copy Markdown
Contributor Author

Seems that did the trick. Thanks @bveeramani

@bveeramani bveeramani merged commit 303d366 into ray-project:master Nov 18, 2025
6 checks passed
Aydin-ab pushed a commit to Aydin-ab/ray-aydin that referenced this pull request Nov 19, 2025
…oject#58231)

### Description
This PR will:
1- Introduce `GroupedData.with_column`, allowing grouped datasets to
evaluate Ray Data expressions per group while preserving existing
columns.
2- Validate the supplied expression type (reject non‑Expr and
DownloadExpr since the expression evaluator can’t visit downloads as far
as I understand) and reuse the projection engine so grouping flows stay
aligned with the dataset-level expression API.
3- Add tests for grouped expression usage through udf-based and
arithmetic expressions.

### Related issues
Closes ray-project#57907

---------

Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
Co-authored-by: Balaji Veeramani <balaji@anyscale.com>
Signed-off-by: Aydin Abiar <aydin@anyscale.com>
ykdojo pushed a commit to ykdojo/ray that referenced this pull request Nov 27, 2025
…oject#58231)

### Description
This PR will:
1- Introduce `GroupedData.with_column`, allowing grouped datasets to
evaluate Ray Data expressions per group while preserving existing
columns.
2- Validate the supplied expression type (reject non‑Expr and
DownloadExpr since the expression evaluator can’t visit downloads as far
as I understand) and reuse the projection engine so grouping flows stay
aligned with the dataset-level expression API.
3- Add tests for grouped expression usage through udf-based and
arithmetic expressions.

### Related issues
Closes ray-project#57907

---------

Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
Co-authored-by: Balaji Veeramani <balaji@anyscale.com>
Signed-off-by: YK <1811651+ykdojo@users.noreply.github.com>
SheldonTsen pushed a commit to SheldonTsen/ray that referenced this pull request Dec 1, 2025
…oject#58231)

### Description
This PR will:
1- Introduce `GroupedData.with_column`, allowing grouped datasets to
evaluate Ray Data expressions per group while preserving existing
columns.
2- Validate the supplied expression type (reject non‑Expr and
DownloadExpr since the expression evaluator can’t visit downloads as far
as I understand) and reuse the projection engine so grouping flows stay
aligned with the dataset-level expression API.
3- Add tests for grouped expression usage through udf-based and
arithmetic expressions.

### Related issues
Closes ray-project#57907

---------

Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
Co-authored-by: Balaji Veeramani <balaji@anyscale.com>
Future-Outlier pushed a commit to Future-Outlier/ray that referenced this pull request Dec 7, 2025
…oject#58231)

### Description
This PR will:
1- Introduce `GroupedData.with_column`, allowing grouped datasets to
evaluate Ray Data expressions per group while preserving existing
columns.
2- Validate the supplied expression type (reject non‑Expr and
DownloadExpr since the expression evaluator can’t visit downloads as far
as I understand) and reuse the projection engine so grouping flows stay
aligned with the dataset-level expression API.
3- Add tests for grouped expression usage through udf-based and
arithmetic expressions.

### Related issues
Closes ray-project#57907

---------

Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
Co-authored-by: Balaji Veeramani <balaji@anyscale.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
…oject#58231)

### Description
This PR will:
1- Introduce `GroupedData.with_column`, allowing grouped datasets to
evaluate Ray Data expressions per group while preserving existing
columns.
2- Validate the supplied expression type (reject non‑Expr and
DownloadExpr since the expression evaluator can’t visit downloads as far
as I understand) and reuse the projection engine so grouping flows stay
aligned with the dataset-level expression API.
3- Add tests for grouped expression usage through udf-based and
arithmetic expressions.

### Related issues
Closes ray-project#57907

---------

Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
Co-authored-by: Balaji Veeramani <balaji@anyscale.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-contribution Contributed by the community data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Ray Data] Support expressions on GroupedData.map_groups

3 participants