[Data] Concurrency cap backpressure with tuning (Disabled)#59519
[Data] Concurrency cap backpressure with tuning (Disabled)#59519aslonnie merged 1 commit intoreleases/2.53.0from
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces several improvements and changes. The core change is the tuning of the concurrency cap backpressure policy by adjusting EWMA_ALPHA and K_DEV parameters to make it more sensitive. A constant was also renamed for better clarity. Additionally, there's a fix in the autoscaling coordinator to handle resource requests for resource types not present on a node, backed by a new regression test. The PR also includes widespread version updates across the codebase.
My review focuses on the logic changes. I've identified a potential issue in how downstream materializing operators are detected, which could lead to incorrect backpressure application in complex pipelines. I've also noted that renaming an environment variable is a breaking change and suggested a way to handle it gracefully. Overall, the changes are positive and improve the robustness of the backpressure mechanism.
| def has_materializing_downstream_op(self, op: PhysicalOperator) -> bool: | ||
| """Check if the operator has a downstream materializing operator.""" | ||
| return any( | ||
| isinstance(next_op, MATERIALIZING_OPERATORS) | ||
| for next_op in op.output_dependencies | ||
| ) |
There was a problem hiding this comment.
The current implementation of has_materializing_downstream_op only checks for immediate downstream operators. This might be insufficient in a pipeline with multiple operators, for example map -> map -> sort. In this case, the first map operator would not detect the downstream sort (which is a materializing operator) and could have backpressure applied, potentially starving the rest of the pipeline. Consider traversing the entire downstream dependency graph to check for any materializing operators.
| def has_materializing_downstream_op(self, op: PhysicalOperator) -> bool: | |
| """Check if the operator has a downstream materializing operator.""" | |
| return any( | |
| isinstance(next_op, MATERIALIZING_OPERATORS) | |
| for next_op in op.output_dependencies | |
| ) | |
| def has_materializing_downstream_op(self, op: PhysicalOperator) -> bool: | |
| """Check if the operator has a downstream materializing operator.""" | |
| queue = list(op.output_dependencies) | |
| visited = set(op.output_dependencies) | |
| while queue: | |
| curr_op = queue.pop(0) | |
| if isinstance(curr_op, MATERIALIZING_OPERATORS): | |
| return True | |
| for next_op in curr_op.output_dependencies: | |
| if next_op not in visited: | |
| visited.add(next_op) | |
| queue.append(next_op) | |
| return False |
| AVAILABLE_OBJECT_STORE_BUDGET_THRESHOLD = env_float( | ||
| "RAY_DATA_CONCURRENCY_CAP_AVAILABLE_OBJECT_STORE_BUDGET_THRESHOLD", 0.1 | ||
| ) |
There was a problem hiding this comment.
Renaming OBJECT_STORE_BUDGET_RATIO to AVAILABLE_OBJECT_STORE_BUDGET_THRESHOLD and its corresponding environment variable is a good clarification. However, this is a breaking change for users who might be setting the old environment variable (RAY_DATA_CONCURRENCY_CAP_OBJECT_STORE_BUDGET_RATIO). To ensure a smoother transition, consider supporting the old environment variable for a deprecation cycle. You could check for the old variable, issue a DeprecationWarning if it's used, and prioritize the new one if both are set.
8320c9f to
092f051
Compare
> Thank you for contributing to Ray! 🚀 > Please review the [Ray Contribution Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html) before opening a pull request. >⚠️ Remove these instructions before submitting your PR. > 💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete. > Briefly describe what this PR accomplishes and why it's needed. **EWMA_ALPHA** Update EWMA_ALPHA from 0.2->0.1. This makes adjusting level to be more in-favor of limiting concurrency by being more sensitive to downstreaming queuing. **K_DEV** Update K_DEV from 2.0->1.0. This makes stddev to be more in-favor of limiting concurrency by being more sensitive to downstreaming queuing. > Link related issues: "Fixes #1234", "Closes #1234", or "Related to > Optional: Add implementation details, API changes, usage examples, screenshots, etc. --------- Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
092f051 to
8b4f97c
Compare
…ct#59519) EWMA_ALPHA Update EWMA_ALPHA from 0.2->0.1. This makes adjusting level to be more in-favor of limiting concurrency by being more sensitive to downstreaming queuing. K_DEV Update K_DEV from 2.0->1.0. This makes stddev to be more in-favor of limiting concurrency by being more sensitive to downstreaming queuing. cherry-pick of ray-project#59392
Description
[Data] Concurrency cap backpressure with tuning (Disabled)
EWMA_ALPHA
Update EWMA_ALPHA from 0.2->0.1. This makes adjusting level to be more in-favor of limiting concurrency by being more sensitive to downstreaming queuing.
K_DEV
Update K_DEV from 2.0->1.0. This makes stddev to be more in-favor of limiting concurrency by being more sensitive to downstreaming queuing.
cherry-pick of #59392
Related issues
Additional information