Skip to content

[Data] ConcurrencyCapBackpressurePolicy - Only increase threshold#58023

Merged
alexeykudinkin merged 12 commits intomasterfrom
srinathk10/concurrency_cap_backpressure_policy_threshold_up_only
Oct 23, 2025
Merged

[Data] ConcurrencyCapBackpressurePolicy - Only increase threshold#58023
alexeykudinkin merged 12 commits intomasterfrom
srinathk10/concurrency_cap_backpressure_policy_threshold_up_only

Conversation

@srinathk10
Copy link
Copy Markdown
Contributor

@srinathk10 srinathk10 commented Oct 22, 2025

Thank you for contributing to Ray! 🚀
Please review the Ray Contribution Guide before opening a pull request.

⚠️ Remove these instructions before submitting your PR.

💡 Tip: Mark as draft if you want early feedback, or ready for review when it's complete.

Description

Briefly describe what this PR accomplishes and why it's needed.

[Data] ConcurrencyCapBackpressurePolicy - Only increase threshold

When _update_queue_threshold to adjust the queue threshold to cap concurrency based on current queued bytes,

  • Only allow increasing the threshold or maintaining it.
  • Cannot decrease threshold because the steady state of queued bytes is not known.

Related issues

Link related issues: "Fixes #1234", "Closes #1234", or "Related to #1234".

Additional information

Optional: Add implementation details, API changes, usage examples, screenshots, etc.

…e buildup

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
@srinathk10 srinathk10 requested a review from a team as a code owner October 22, 2025 22:25
Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request modifies the ConcurrencyCapBackpressurePolicy to only allow the backpressure threshold to increase, never decrease. This simplifies the policy by removing the logic for downward smoothing and idle resets. The changes in the core policy file are clear and correctly implement this new behavior. The associated tests have been updated to reflect this change, with the old downward smoothing test refactored to verify that the threshold does not decrease, and a new test added to explicitly check for threshold increases. My review identifies some misleading comments in the test files that should be corrected for clarity and maintainability.

# Test downward adjustment (should be smoothed)
# threshold = max(10 + 4*1, 150) = 150, which is < 200, so should be smoothed
# Test that threshold is maintained when calculated threshold is lower
# threshold = max(10 + 4*1, 150) = 150, which is < 200, so should be maintained
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The calculation in this comment is a bit misleading. It seems to use the initial _q_level_nbytes (10.0) and _q_level_dev (1.0) to calculate the threshold, but _update_queue_threshold updates these values using EWMA based on the new sample (q=150) before calculating the base threshold. This means the actual calculated threshold is different from 150. While the test's assertion is correct, the comment could be confusing for future maintenance. For better clarity, consider removing the specific calculation and just stating the expected behavior.

policy._queue_history[mock_op] = deque([50, 60, 70, 80, 90, 100])

# Test that threshold is increased when calculated threshold is higher
# threshold = max(50 + 4*20, 200) = 200, which is > 100, so should be updated
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to a previous comment, the calculation threshold = max(50 + 4*20, 200) = 200 in this comment doesn't account for the EWMA updates of level and dev that happen inside _update_queue_threshold. The actual calculated threshold will be different due to these updates. To avoid confusion, it would be clearer to either provide the full, accurate calculation (which might be too complex for a comment) or simplify the comment to describe the behavior without showing a potentially incorrect formula.

# threshold = max(10 + 4*1, 150) = 150, which is < 200, so should be smoothed
self.assertLess(threshold3, 200)
self.assertGreaterEqual(threshold3, 150)
# threshold = max(10 + 4*1, 150) = 150, which is < 200, so should maintain 200
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The calculation threshold = max(10 + 4*1, 150) = 150 in this comment is misleading as it doesn't account for the EWMA updates for level and dev that occur within _update_queue_threshold. The actual calculated threshold is different. While the test's assertion is correct, this discrepancy can make the test logic harder to follow. Please consider updating the comment to be more accurate or simplifying it to just describe the expected outcome.

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
@srinathk10 srinathk10 added the go add ONLY when ready to merge, run all tests label Oct 22, 2025
@srinathk10
Copy link
Copy Markdown
Contributor Author

Training Benchmark results

Baseline vs After

+-----------------------------------------------+-----------+--------+-----------+
| Benchmark                                     | Baseline  | After  | Δ (%)     |
+-----------------------------------------------+-----------+--------+-----------+
| skip_training.jpeg.local_fs_multi_gpus        | 2649      | 2636   |  -0.49%   |
| skip_training.jpeg.local_fs_multi_gpus.preserve_order | 2600      | 2621   |  +0.81%   |
| skip_training.jpeg.local_fs                   | 2040      | 2041   |  +0.05%   |
| skip_training.jpeg.local_fs.preserve_order    | 2025      | 2016   |  -0.44%   |
| skip_training.jpeg                            | 3297      | 3106   |  -5.80%   |
| skip_training.jpeg.preserve_order             | 1641      | 1559   |  -5.00%   |
| skip_training.parquet                         | 11645     | 11151  |  -4.24%   |
| skip_training.parquet.preserve_order          | 9831      | 10736  |  +9.19%   |
+-----------------------------------------------+-----------+--------+-----------+

Base automatically changed from srinathk10/concurrency_cap_backpressure_policy to master October 23, 2025 00:12
…licy_threshold_up_only

Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com>
@ray-gardener ray-gardener bot added the data Ray Data-related issues label Oct 23, 2025
@alexeykudinkin alexeykudinkin merged commit 4ba962c into master Oct 23, 2025
6 checks passed
@alexeykudinkin alexeykudinkin deleted the srinathk10/concurrency_cap_backpressure_policy_threshold_up_only branch October 23, 2025 02:27
xinyuangui2 pushed a commit to xinyuangui2/ray that referenced this pull request Oct 27, 2025
…y-project#58023)

> Thank you for contributing to Ray! 🚀
> Please review the [Ray Contribution
Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html)
before opening a pull request.

> ⚠️ Remove these instructions before submitting your PR.

> 💡 Tip: Mark as draft if you want early feedback, or ready for review
when it's complete.

## Description
> Briefly describe what this PR accomplishes and why it's needed.

### [Data] ConcurrencyCapBackpressurePolicy - Only increase threshold

When `_update_queue_threshold` to adjust the queue threshold to cap
concurrency based on current queued bytes,

- Only allow increasing the threshold or maintaining it.
- Cannot decrease threshold because the steady state of queued bytes is
not known.

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com>
Signed-off-by: xgui <xgui@anyscale.com>
landscapepainter pushed a commit to landscapepainter/ray that referenced this pull request Nov 17, 2025
…y-project#58023)

> Thank you for contributing to Ray! 🚀
> Please review the [Ray Contribution
Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html)
before opening a pull request.

> ⚠️ Remove these instructions before submitting your PR.

> 💡 Tip: Mark as draft if you want early feedback, or ready for review
when it's complete.

## Description
> Briefly describe what this PR accomplishes and why it's needed.

### [Data] ConcurrencyCapBackpressurePolicy - Only increase threshold

When `_update_queue_threshold` to adjust the queue threshold to cap
concurrency based on current queued bytes,

- Only allow increasing the threshold or maintaining it. 
- Cannot decrease threshold because the steady state of queued bytes is
not known.

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com>
Aydin-ab pushed a commit to Aydin-ab/ray-aydin that referenced this pull request Nov 19, 2025
…y-project#58023)

> Thank you for contributing to Ray! 🚀
> Please review the [Ray Contribution
Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html)
before opening a pull request.

> ⚠️ Remove these instructions before submitting your PR.

> 💡 Tip: Mark as draft if you want early feedback, or ready for review
when it's complete.

## Description
> Briefly describe what this PR accomplishes and why it's needed.

### [Data] ConcurrencyCapBackpressurePolicy - Only increase threshold

When `_update_queue_threshold` to adjust the queue threshold to cap
concurrency based on current queued bytes,

- Only allow increasing the threshold or maintaining it.
- Cannot decrease threshold because the steady state of queued bytes is
not known.

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com>
Signed-off-by: Aydin Abiar <aydin@anyscale.com>
Future-Outlier pushed a commit to Future-Outlier/ray that referenced this pull request Dec 7, 2025
…y-project#58023)

> Thank you for contributing to Ray! 🚀
> Please review the [Ray Contribution
Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html)
before opening a pull request.

> ⚠️ Remove these instructions before submitting your PR.

> 💡 Tip: Mark as draft if you want early feedback, or ready for review
when it's complete.

## Description
> Briefly describe what this PR accomplishes and why it's needed.

### [Data] ConcurrencyCapBackpressurePolicy - Only increase threshold

When `_update_queue_threshold` to adjust the queue threshold to cap
concurrency based on current queued bytes,

- Only allow increasing the threshold or maintaining it.
- Cannot decrease threshold because the steady state of queued bytes is
not known.

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
Blaze-DSP pushed a commit to Blaze-DSP/ray that referenced this pull request Dec 18, 2025
…y-project#58023)

> Thank you for contributing to Ray! 🚀
> Please review the [Ray Contribution
Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html)
before opening a pull request.

> ⚠️ Remove these instructions before submitting your PR.

> 💡 Tip: Mark as draft if you want early feedback, or ready for review
when it's complete.

## Description
> Briefly describe what this PR accomplishes and why it's needed.

### [Data] ConcurrencyCapBackpressurePolicy - Only increase threshold

When `_update_queue_threshold` to adjust the queue threshold to cap
concurrency based on current queued bytes,

- Only allow increasing the threshold or maintaining it. 
- Cannot decrease threshold because the steady state of queued bytes is
not known.

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
…y-project#58023)

> Thank you for contributing to Ray! 🚀
> Please review the [Ray Contribution
Guide](https://docs.ray.io/en/master/ray-contribute/getting-involved.html)
before opening a pull request.

> ⚠️ Remove these instructions before submitting your PR.

> 💡 Tip: Mark as draft if you want early feedback, or ready for review
when it's complete.

## Description
> Briefly describe what this PR accomplishes and why it's needed.

### [Data] ConcurrencyCapBackpressurePolicy - Only increase threshold

When `_update_queue_threshold` to adjust the queue threshold to cap
concurrency based on current queued bytes,

- Only allow increasing the threshold or maintaining it.
- Cannot decrease threshold because the steady state of queued bytes is
not known.

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Srinath Krishnamachari <srinath.krishnamachari@anyscale.com>
Signed-off-by: Srinath Krishnamachari <68668616+srinathk10@users.noreply.github.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Ray fails to serialize self-reference objects

2 participants