[Data] Enable expressions for grouped with_column in Ray Data#58231
[Data] Enable expressions for grouped with_column in Ray Data#58231bveeramani merged 12 commits intoray-project:masterfrom
Conversation
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
There was a problem hiding this comment.
Code Review
This pull request introduces GroupedData.with_column, which allows applying expressions to grouped data. The implementation is clean, reusing existing components like map_groups and eval_projection. The added tests cover both UDF-based and arithmetic expressions, ensuring the new feature is well-tested. My feedback includes a minor suggestion to improve code clarity in the tests by removing a redundant dataset creation.
| ds = ray.data.from_items( | ||
| [ | ||
| {"group": 1, "value": 1}, | ||
| {"group": 1, "value": 2}, | ||
| {"group": 2, "value": 3}, | ||
| {"group": 2, "value": 4}, | ||
| ] | ||
| ) |
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
|
Hi @alexeykudinkin do you have any further suggestions? Thanks! |
|
Hi @goutamvenkat-anyscale can you please give this a review? Thanks |
| raise TypeError( | ||
| "expr must be a Ray Data expression created via the expression API." | ||
| ) | ||
| if isinstance(expr, DownloadExpr): |
There was a problem hiding this comment.
Thanks for the contribution!
Curious why enforce this restriction?
I can imagine a case where we have ds.groupby('uri_prefix').with_column('bytes', download('uri')).. (Say we want to handle separate handling per group)
There was a problem hiding this comment.
Thanks for the review, @goutamvenkat-anyscale! As far as I understand, in the current path, grouped with_column goes through eval_projection, which delegates to NativeExpressionEvaluator. That evaluator doesn’t support DownloadExpr (from calling download("uri")), so allowing it right now just leads to a late TypeError. I surfaced the restriction here to make things more clear. wdyt?
There was a problem hiding this comment.
Ah we need to unify the expression machinery... Can you please paste the full TypeError here? Just curious whether the error is descriptive or not
There was a problem hiding this comment.
Sure! The TypeError in the expression evaluator is as follows:
"DownloadExpr evaluation is not yet implemented in NativeExpressionEvaluator."
There was a problem hiding this comment.
Do you think the expression evaluator error is descriptive enough (i.e., we can drop the restriction in grouped_data)? Or we keep it for further clarity? wdyt @goutamvenkat-anyscale ?
There was a problem hiding this comment.
Let's leave out this check for now. We're in the process of adding Actor support for UDFExpr and DownloadExpr will be refactored to through the eval_projection flow.
There was a problem hiding this comment.
Done! Can you please check if it's good to go? Thanks.
847771d to
6625d9f
Compare
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
6625d9f to
ea4a08f
Compare
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
|
@goutamvenkat-anyscale PTAL. Thanks! |
goutamvenkat-anyscale
left a comment
There was a problem hiding this comment.
LGTM. Thanks for your contribution
|
Hi @bveeramani, just bumping this. It's approved with the 'go' tag and ready to merge. Thanks! |
| from ray.data._internal.planner.plan_expression.expression_evaluator import ( | ||
| eval_projection, | ||
| ) | ||
|
|
There was a problem hiding this comment.
Nit: Is this import prone to circular import errors? If not, move to top of file for consistency with PEP8/Google convention?
There was a problem hiding this comment.
It causes the doc build to fail, since the evaluator requires pyarrow. wdyt? Keep the import inside for a green doc build or move it to top?
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
| fn: UserDefinedFunction[DataBatch, DataBatch], | ||
| *, | ||
| zero_copy_batch: bool = False, | ||
| zero_copy_batch: bool = True, |
There was a problem hiding this comment.
Bug: Breaking Change: Batches Become Read-Only
Changing the default value of zero_copy_batch from False to True in map_groups breaks existing code that mutates input batches. For example, the train_test_split method's add_train_flag function mutates the batch by adding a column, which fails with read-only errors when zero_copy_batch=True. This is a breaking API change that affects all existing map_groups calls that modify their input.
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
bveeramani
left a comment
There was a problem hiding this comment.
LGTM with some minor comments.
For the doc failure -- if moving the import inside the function resolves the failure, I think that's okay to unblock this PR
| memory=memory, | ||
| concurrency=concurrency, | ||
| udf_modifying_row_count=True, | ||
| udf_modifying_row_count=False, |
There was a problem hiding this comment.
Map groups can change the row count, right?
python/ray/data/grouped_data.py
Outdated
| >>> ds.groupby("group").with_column("value_twice", col("value") * 2).sort(["group", "value"]).take_all() # doctest: +SKIP | ||
| [{'group': 1, 'value': 1, 'value_twice': 2}, {'group': 1, 'value': 2, 'value_twice': 4}] |
There was a problem hiding this comment.
What happens if you test this (i.e., remove # docetest +SKIP)?
There was a problem hiding this comment.
Ideally, I think we should avoid skipping these to prevent them from breaking
python/ray/data/grouped_data.py
Outdated
| >>> ds = ray.data.from_items([{"group": 1, "value": 1}, {"group": 1, "value": 2}]) | ||
| >>> ds.groupby("group").with_column("value_twice", col("value") * 2).sort(["group", "value"]).take_all() # doctest: +SKIP | ||
| [{'group': 1, 'value': 1, 'value_twice': 2}, {'group': 1, 'value': 2, 'value_twice': 4}] |
There was a problem hiding this comment.
Could you format this snippet? I think it might be hard to read in the rendered documentation
| >>> ds = ray.data.from_items([{"group": 1, "value": 1}, {"group": 1, "value": 2}]) | |
| >>> ds.groupby("group").with_column("value_twice", col("value") * 2).sort(["group", "value"]).take_all() # doctest: +SKIP | |
| [{'group': 1, 'value': 1, 'value_twice': 2}, {'group': 1, 'value': 2, 'value_twice': 4}] | |
| >>> ds = ( | |
| ... ray.data.from_items([{"group": 1, "value": 1}, {"group": 1, "value": 2}]) | |
| ... .groupby("group") | |
| ... .with_column("value_twice", col("value") * 2) | |
| ... .sort(["group", "value"]) | |
| ... ) | |
| >>> ds.take_all() # doctest: +SKIP | |
| [{'group': 1, 'value': 1, 'value_twice': 2}, {'group': 1, 'value': 2, 'value_twice': 4}] |
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
bd7225e to
2ab94f2
Compare
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
|
Hi @bveeramani any idea how to unblock this CI error: |
Huh, that's weird. Just updated the branch. Let's see if that fixes it |
|
Seems that did the trick. Thanks @bveeramani |
…oject#58231) ### Description This PR will: 1- Introduce `GroupedData.with_column`, allowing grouped datasets to evaluate Ray Data expressions per group while preserving existing columns. 2- Validate the supplied expression type (reject non‑Expr and DownloadExpr since the expression evaluator can’t visit downloads as far as I understand) and reuse the projection engine so grouping flows stay aligned with the dataset-level expression API. 3- Add tests for grouped expression usage through udf-based and arithmetic expressions. ### Related issues Closes ray-project#57907 --------- Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com> Co-authored-by: Balaji Veeramani <balaji@anyscale.com> Signed-off-by: Aydin Abiar <aydin@anyscale.com>
…oject#58231) ### Description This PR will: 1- Introduce `GroupedData.with_column`, allowing grouped datasets to evaluate Ray Data expressions per group while preserving existing columns. 2- Validate the supplied expression type (reject non‑Expr and DownloadExpr since the expression evaluator can’t visit downloads as far as I understand) and reuse the projection engine so grouping flows stay aligned with the dataset-level expression API. 3- Add tests for grouped expression usage through udf-based and arithmetic expressions. ### Related issues Closes ray-project#57907 --------- Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com> Co-authored-by: Balaji Veeramani <balaji@anyscale.com> Signed-off-by: YK <1811651+ykdojo@users.noreply.github.com>
…oject#58231) ### Description This PR will: 1- Introduce `GroupedData.with_column`, allowing grouped datasets to evaluate Ray Data expressions per group while preserving existing columns. 2- Validate the supplied expression type (reject non‑Expr and DownloadExpr since the expression evaluator can’t visit downloads as far as I understand) and reuse the projection engine so grouping flows stay aligned with the dataset-level expression API. 3- Add tests for grouped expression usage through udf-based and arithmetic expressions. ### Related issues Closes ray-project#57907 --------- Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com> Co-authored-by: Balaji Veeramani <balaji@anyscale.com>
…oject#58231) ### Description This PR will: 1- Introduce `GroupedData.with_column`, allowing grouped datasets to evaluate Ray Data expressions per group while preserving existing columns. 2- Validate the supplied expression type (reject non‑Expr and DownloadExpr since the expression evaluator can’t visit downloads as far as I understand) and reuse the projection engine so grouping flows stay aligned with the dataset-level expression API. 3- Add tests for grouped expression usage through udf-based and arithmetic expressions. ### Related issues Closes ray-project#57907 --------- Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com> Co-authored-by: Balaji Veeramani <balaji@anyscale.com> Signed-off-by: Future-Outlier <eric901201@gmail.com>
…oject#58231) ### Description This PR will: 1- Introduce `GroupedData.with_column`, allowing grouped datasets to evaluate Ray Data expressions per group while preserving existing columns. 2- Validate the supplied expression type (reject non‑Expr and DownloadExpr since the expression evaluator can’t visit downloads as far as I understand) and reuse the projection engine so grouping flows stay aligned with the dataset-level expression API. 3- Add tests for grouped expression usage through udf-based and arithmetic expressions. ### Related issues Closes ray-project#57907 --------- Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com> Co-authored-by: Balaji Veeramani <balaji@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
Description
This PR will:
1- Introduce
GroupedData.with_column, allowing grouped datasets to evaluate Ray Data expressions per group while preserving existing columns.2- Validate the supplied expression type (reject non‑Expr and DownloadExpr since the expression evaluator can’t visit downloads as far as I understand) and reuse the projection engine so grouping flows stay aligned with the dataset-level expression API.
3- Add tests for grouped expression usage through udf-based and arithmetic expressions.
Related issues
Closes #57907