Skip to content

[Data] - Add support for callable classes for UDFExpr#56725

Merged
bveeramani merged 33 commits intoray-project:masterfrom
goutamvenkat-anyscale:goutam/callable_class_udf
Dec 30, 2025
Merged

[Data] - Add support for callable classes for UDFExpr#56725
bveeramani merged 33 commits intoray-project:masterfrom
goutamvenkat-anyscale:goutam/callable_class_udf

Conversation

@goutamvenkat-anyscale
Copy link
Copy Markdown
Contributor

@goutamvenkat-anyscale goutamvenkat-anyscale commented Sep 18, 2025

Why are these changes needed?

With this change the @udf decorator can be added to callable classes. Allows for expressions to be used in conjunction with callable classes.

Related issue number

Closes #56529

Checks

  • I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • I've run scripts/format.sh to lint the changes in this PR.
  • I've included any doc changes needed for https://docs.ray.io/en/master/.
    • I've added any new APIs to the API Reference. For example, if I added a
      method in Tune, I've added it in doc/source/tune/api/ under the
      corresponding .rst file.
  • I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • Unit tests
    • Release tests
    • This PR is not tested :(

Signed-off-by: Goutam V. <goutam@anyscale.com>
@goutamvenkat-anyscale goutamvenkat-anyscale requested a review from a team as a code owner September 18, 2025 23:05
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request adds support for using callable classes as User-Defined Functions (UDFs) with the @udf decorator in Ray Data expressions. This is a great enhancement as it allows stateful UDFs to be used with expressions, for example in conjunction with actors. The changes primarily involve modifying the udf decorator to wrap callable classes with logic to handle expression-based calls. The implementation is solid and includes good documentation updates and a comprehensive test case. I have a couple of suggestions to improve the robustness and maintainability of the implementation.

Signed-off-by: Goutam V. <goutam@anyscale.com>
@goutamvenkat-anyscale goutamvenkat-anyscale added data Ray Data-related issues go add ONLY when ready to merge, run all tests labels Sep 18, 2025
Copy link
Copy Markdown
Contributor

@omatthew98 omatthew98 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems super reasonable, thanks for getting this up so quickly. OOC have you tested this in a multinode cluster? It seems like this should work, but we should verify that the instances are getting instantiated in the correct places (I think that is part of why we have this code here).

@goutamvenkat-anyscale
Copy link
Copy Markdown
Contributor Author

Seems super reasonable, thanks for getting this up so quickly. OOC have you tested this in a multinode cluster? It seems like this should work, but we should verify that the instances are getting instantiated in the correct places (I think that is part of why we have this code here).

I haven't. I'll test it out. If bugs arise I'll tackle that as a separate PR, if that's alright

@omatthew98
Copy link
Copy Markdown
Contributor

I haven't. I'll test it out. If bugs arise I'll tackle that as a separate PR, if that's alright

Yeah I think that makes sense to just keep iterating on this. Might be good to add a warning or TODO that we need to add support / do more testing for multinode cases? Will defer to you what to do.

Signed-off-by: Goutam V. <goutam@anyscale.com>
@github-actions
Copy link
Copy Markdown

github-actions bot commented Oct 8, 2025

This pull request has been automatically marked as stale because it has not had
any activity for 14 days. It will be closed in another 14 days if no further activity occurs.
Thank you for your contributions.

You can always ask for help on our discussion forum or Ray's public slack channel.

If you'd like to keep this open, just leave any comment, and the stale label will be removed.

@github-actions github-actions bot added the stale The issue is stale. It will be closed within 7 days unless there are further conversation label Oct 8, 2025
@github-actions
Copy link
Copy Markdown

This pull request has been automatically closed because there has been no more activity in the 14 days
since being marked stale.

Please feel free to reopen or open a new pull request if you'd still like this to be addressed.

Again, you can always ask for help on our discussion forum or Ray's public slack channel.

Thanks again for your contribution!

Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
cursor[bot]

This comment was marked as outdated.

@github-actions github-actions bot added unstale A PR that has been marked unstale. It will not get marked stale again if this label is on it. and removed stale The issue is stale. It will be closed within 7 days unless there are further conversation labels Oct 25, 2025
@goutamvenkat-anyscale goutamvenkat-anyscale removed the unstale A PR that has been marked unstale. It will not get marked stale again if this label is on it. label Oct 29, 2025
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
cursor[bot]

This comment was marked as outdated.

Signed-off-by: Goutam <goutam@anyscale.com>
result = expr.fn(*args, **kwargs)

# Try to call from actor context (handles both sync and async UDFs)
result = call_udf_from_actor_context(expr.fn_constructor_class, args, kwargs)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is expr.fn_constructor_class here because of the scheduling on worker nodes?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes we need to evaluate the UDF expression here (which happens on the worker)

def __call__(self, *call_args, **call_kwargs):
# Build a lazy callable that instantiates on the worker
# the first time it's invoked during expression evaluation.
def _lazy_impl(*args, **kwargs):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is the lazy instantiation needed here? I feel like it should be unified with the how we normally create actors, right? Will this work with autoscaling actor pools?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we don't want to do the instantiation on the driver (it could be an expensive model that's being loaded for example). The lazy_impl is for executing with task pool strategy.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you document this? I had the same question.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I can get rid of this. Apparently in get_compute_strategy, task pool strategy in combination with actors is forbidden

return inspect.iscoroutinefunction(fn) or inspect.isasyncgenfunction(fn)


def _call_udf_instance_with_async_bridge(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happened to _generate_transform_fn_for_async_map, and why is this needed?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's used by the new expression evaluation system via call_udf_from_actor_context. When expressions contain callable class UDFs (like col("x").apply(MyAsyncUDF)), the expression evaluator needs to:

  1. Look up the UDF instance from actor context
  2. Call it with the evaluated arguments
  3. Handle async/sync bridging

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

visit_udf() → call_udf_from_actor_context() → _call_udf_instance_with_async_bridge()

Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
@goutamvenkat-anyscale
Copy link
Copy Markdown
Contributor Author

/gemini summary

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

This pull request significantly enhances Ray Data's expression capabilities by integrating callable classes as UDFs. This change allows users to define stateful UDFs that can maintain context across data batches, which is crucial for many machine learning and data processing workflows. The system intelligently manages the execution strategy, automatically opting for actor-based computation when callable class UDFs are detected, while also providing options for manual override. The underlying UDF execution framework has been streamlined to support these new capabilities, ensuring robust and consistent behavior for both synchronous and asynchronous UDFs.

Highlights

  • Callable Class UDF Support: The pull request introduces comprehensive support for using callable classes as User-Defined Functions (UDFs) within Ray Data expressions, enabling stateful operations across data batches.
  • Automatic Compute Strategy Detection: Ray Data now automatically detects if an expression contains a callable class UDF and, if so, defaults to an ActorPoolStrategy for execution, ensuring proper state management. Otherwise, it uses TaskPoolStrategy.
  • Refactored UDF Instantiation and Calling: The internal logic for UDF instantiation and calling has been refactored to unify the handling of both functions and callable classes (including async UDFs) within an actor context, improving consistency and reusability.
  • Enhanced with_column API: The with_column API has been updated to allow explicit specification of the compute strategy, providing users with more control over UDF execution.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • python/ray/data/_internal/logical/operators/map_operator.py
    • Added _detect_and_get_compute_strategy to automatically select ActorPoolStrategy for callable class UDFs.
    • Modified Project operator to use the detected compute strategy.
  • python/ray/data/_internal/logical/rules/projection_pushdown.py
    • Introduced checks for resource and compute strategy compatibility before fusing Project operators.
    • Ensured fused_compute is passed to the new Project operator during fusion.
  • python/ray/data/_internal/planner/plan_expression/expression_evaluator.py
    • Updated visit_udf to use call_udf_from_actor_context for evaluating UDF expressions.
    • Introduced NOT_IN_ACTOR_CONTEXT sentinel to differentiate between a UDF returning None and not being in an actor context.
  • python/ray/data/_internal/planner/plan_expression/expression_visitors.py
    • Added _CallableClassUDFCollector to identify callable class UDFs in expression trees.
    • Modified UDFExpr creation to include callable_class_spec.
  • python/ray/data/_internal/planner/plan_udf_map_op.py
    • Introduced UDFSpec dataclass for callable class UDF specifications.
    • Refactored _MapActorContext to manage multiple UDF instances via a dictionary.
    • Added _collect_udf_specs_from_expressions for gathering unique callable class UDF specifications.
    • Implemented create_actor_context_init_fn for shared actor context initialization.
    • Updated _get_udf to use the new UDFSpec and create_actor_context_init_fn.
    • Added _call_udf_instance_with_async_bridge for unified async/sync UDF calling.
    • Clarified behavior for async generator UDFs in expressions (only the last yielded value is returned).
  • python/ray/data/dataset.py
    • Added compute parameter to the with_column method.
    • Updated docstrings and examples for with_column to reflect callable class UDF support and automatic actor semantics.
  • python/ray/data/expressions.py
    • Introduced _CallableClassSpec dataclass for callable class UDF metadata.
    • Added callable_class_spec attribute to UDFExpr.
    • Modified udf decorator to handle both functions and callable classes, using an ExpressionAwareCallableClass wrapper for classes.
    • Updated _create_udf_callable to accept callable_class_spec.
    • Enhanced structurally_equals for UDFExpr to compare callable_class_spec.
  • python/ray/data/tests/test_map.py
    • Updated mock_actor_async_ctx fixture to align with new _MapActorContext signature and ensure proper cleanup.
  • python/ray/data/tests/test_with_column.py
    • Added extensive unit tests for with_column with callable class UDFs, covering various scenarios including stateful, constructor args, multiple UDFs, and async UDFs.
Activity
  • The author goutamvenkat-anyscale initially discussed testing the changes in a multinode cluster and proposed a separate PR for any arising bugs.
  • The pull request was marked stale and subsequently closed by github-actions[bot] due to inactivity.
  • Several cursor[bot] comments highlighted bugs related to the udf decorator's class detection, async generator UDFs returning only the first value, and issues with UDF deduplication logic leading to incorrect sharing of instances or constructor arguments.
  • iamjustinhsu inquired about fusion behavior with mixed UDF types and internal planning, which was addressed by goutamvenkat-anyscale with detailed explanations and test cases.
  • Discussions also covered lazy instantiation, the structure of _MapActorContext, and the use of compute versus concurrency parameters.
  • A recent cursor[bot] comment identified a sentinel value collision where None was used for both 'not in actor context' and legitimate None returns from UDFs, causing confusing errors.
  • The author goutamvenkat-anyscale has been actively responding to comments and addressing feedback, including fixing the _CallableClassUDFCollector duplication and clarifying async generator behavior.

Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
@bveeramani bveeramani merged commit 2fab8e8 into ray-project:master Dec 30, 2025
6 checks passed
AYou0207 pushed a commit to AYou0207/ray that referenced this pull request Jan 13, 2026
)

## Why are these changes needed?

With this change the `@udf` decorator can be added to callable classes.
Allows for expressions to be used in conjunction with callable classes.

## Related issue number

Closes ray-project#56529

## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [x] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Goutam V. <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: jasonwrwang <jasonwrwang@tencent.com>
zzchun pushed a commit to antgroup/ant-ray that referenced this pull request Jan 27, 2026
)

## Why are these changes needed?

With this change the `@udf` decorator can be added to callable classes.
Allows for expressions to be used in conjunction with callable classes.

## Related issue number

Closes ray-project#56529 

## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [x] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Goutam V. <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
zzchun pushed a commit to antgroup/ant-ray that referenced this pull request Jan 28, 2026
)

## Why are these changes needed?

With this change the `@udf` decorator can be added to callable classes.
Allows for expressions to be used in conjunction with callable classes.

## Related issue number

Closes ray-project#56529 

## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [x] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Goutam V. <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
lee1258561 pushed a commit to pinterest/ray that referenced this pull request Feb 3, 2026
)

## Why are these changes needed?

With this change the `@udf` decorator can be added to callable classes.
Allows for expressions to be used in conjunction with callable classes.

## Related issue number

Closes ray-project#56529 

## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [x] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Goutam V. <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
zzchun pushed a commit to antgroup/ant-ray that referenced this pull request Feb 5, 2026
)

## Why are these changes needed?

With this change the `@udf` decorator can be added to callable classes.
Allows for expressions to be used in conjunction with callable classes.

## Related issue number

Closes ray-project#56529 

## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [x] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Goutam V. <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
)

## Why are these changes needed?

With this change the `@udf` decorator can be added to callable classes.
Allows for expressions to be used in conjunction with callable classes.

## Related issue number

Closes ray-project#56529

## Checks

- [x] I've signed off every commit(by using the -s flag, i.e., `git
commit -s`) in this PR.
- [x] I've run `scripts/format.sh` to lint the changes in this PR.
- [ ] I've included any doc changes needed for
https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I
added a
method in Tune, I've added it in `doc/source/tune/api/` under the
           corresponding `.rst` file.
- [ ] I've made sure the tests are passing. Note that there might be a
few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
   - [x] Unit tests
   - [ ] Release tests
   - [ ] This PR is not tested :(

---------

Signed-off-by: Goutam V. <goutam@anyscale.com>
Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Ray Data] with_column API support Actor

7 participants