Skip to content

[Data] Concurrency cap backpressure with tuning (Disabled)#59519

Merged
aslonnie merged 1 commit intoreleases/2.53.0from
srinathk10/concurrency_cap_off_2.53.0
Dec 17, 2025
Merged

[Data] Concurrency cap backpressure with tuning (Disabled)#59519
aslonnie merged 1 commit intoreleases/2.53.0from
srinathk10/concurrency_cap_off_2.53.0

Conversation

@srinathk10
Copy link
Copy Markdown
Contributor

@srinathk10 srinathk10 commented Dec 17, 2025

Thank you for contributing to Ray! 🚀
Please review the Ray Contribution Guide before opening a pull request.

⚠️ Remove these instructions before submitting your PR.

💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete.

Description

Briefly describe what this PR accomplishes and why it's needed.

[Data] Concurrency cap backpressure with tuning (Disabled)

EWMA_ALPHA
Update EWMA_ALPHA from 0.2->0.1. This makes adjusting level to be more in-favor of limiting concurrency by being more sensitive to downstreaming queuing.

K_DEV
Update K_DEV from 2.0->1.0. This makes stddev to be more in-favor of limiting concurrency by being more sensitive to downstreaming queuing.

cherry-pick of #59392

Related issues

Link related issues: "Fixes #1234", "Closes #1234", or "Related to #1234".

Additional information

Optional: Add implementation details, API changes, usage examples, screenshots, etc.

@srinathk10 srinathk10 changed the base branch from master to releases/2.53.0 December 17, 2025 22:16
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces several improvements and changes. The core change is the tuning of the concurrency cap backpressure policy by adjusting EWMA_ALPHA and K_DEV parameters to make it more sensitive. A constant was also renamed for better clarity. Additionally, there's a fix in the autoscaling coordinator to handle resource requests for resource types not present on a node, backed by a new regression test. The PR also includes widespread version updates across the codebase.

My review focuses on the logic changes. I've identified a potential issue in how downstream materializing operators are detected, which could lead to incorrect backpressure application in complex pipelines. I've also noted that renaming an environment variable is a breaking change and suggested a way to handle it gracefully. Overall, the changes are positive and improve the robustness of the backpressure mechanism.

Comment on lines +405 to +410
def has_materializing_downstream_op(self, op: PhysicalOperator) -> bool:
"""Check if the operator has a downstream materializing operator."""
return any(
isinstance(next_op, MATERIALIZING_OPERATORS)
for next_op in op.output_dependencies
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The current implementation of has_materializing_downstream_op only checks for immediate downstream operators. This might be insufficient in a pipeline with multiple operators, for example map -> map -> sort. In this case, the first map operator would not detect the downstream sort (which is a materializing operator) and could have backpressure applied, potentially starving the rest of the pipeline. Consider traversing the entire downstream dependency graph to check for any materializing operators.

Suggested change
def has_materializing_downstream_op(self, op: PhysicalOperator) -> bool:
"""Check if the operator has a downstream materializing operator."""
return any(
isinstance(next_op, MATERIALIZING_OPERATORS)
for next_op in op.output_dependencies
)
def has_materializing_downstream_op(self, op: PhysicalOperator) -> bool:
"""Check if the operator has a downstream materializing operator."""
queue = list(op.output_dependencies)
visited = set(op.output_dependencies)
while queue:
curr_op = queue.pop(0)
if isinstance(curr_op, MATERIALIZING_OPERATORS):
return True
for next_op in curr_op.output_dependencies:
if next_op not in visited:
visited.add(next_op)
queue.append(next_op)
return False

Comment on lines +54 to 56
AVAILABLE_OBJECT_STORE_BUDGET_THRESHOLD = env_float(
"RAY_DATA_CONCURRENCY_CAP_AVAILABLE_OBJECT_STORE_BUDGET_THRESHOLD", 0.1
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Renaming OBJECT_STORE_BUDGET_RATIO to AVAILABLE_OBJECT_STORE_BUDGET_THRESHOLD and its corresponding environment variable is a good clarification. However, this is a breaking change for users who might be setting the old environment variable (RAY_DATA_CONCURRENCY_CAP_OBJECT_STORE_BUDGET_RATIO). To ensure a smoother transition, consider supporting the old environment variable for a deprecation cycle. You could check for the old variable, issue a DeprecationWarning if it's used, and prioritize the new one if both are set.

@srinathk10 srinathk10 force-pushed the srinathk10/concurrency_cap_off_2.53.0 branch from 8320c9f to 092f051 Compare December 17, 2025 22:31
> Thank you for contributing to Ray! 🚀
> Please review the [Ray Contribution
Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html)
before opening a pull request.

> ⚠️ Remove these instructions before submitting your PR.

> 💡 Tip: Mark as draft if you want early feedback, or ready for review
when it's complete.

> Briefly describe what this PR accomplishes and why it's needed.

**EWMA_ALPHA**
Update EWMA_ALPHA from 0.2->0.1. This makes adjusting level to be more
in-favor of limiting concurrency by being more sensitive to
downstreaming queuing.

**K_DEV**
Update K_DEV from 2.0->1.0. This makes stddev to be more in-favor of
limiting concurrency by being more sensitive to downstreaming queuing.

> Link related issues: "Fixes #1234", "Closes #1234", or "Related to

> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
@srinathk10 srinathk10 force-pushed the srinathk10/concurrency_cap_off_2.53.0 branch from 092f051 to 8b4f97c Compare December 17, 2025 22:37
@aslonnie aslonnie merged commit 0de2118 into releases/2.53.0 Dec 17, 2025
5 of 6 checks passed
@aslonnie aslonnie deleted the srinathk10/concurrency_cap_off_2.53.0 branch December 17, 2025 23:48
weiquanlee pushed a commit to antgroup/ant-ray that referenced this pull request Jan 5, 2026
…ct#59519)

EWMA_ALPHA
Update EWMA_ALPHA from 0.2->0.1. This makes adjusting level to be more in-favor of limiting concurrency by being more sensitive to downstreaming queuing.

K_DEV
Update K_DEV from 2.0->1.0. This makes stddev to be more in-favor of limiting concurrency by being more sensitive to downstreaming queuing.

cherry-pick of ray-project#59392
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Ray fails to serialize self-reference objects

2 participants