Skip to content

Conversation

@poorbarcode
Copy link
Contributor

Motivation

See the PIP-434

Modifications

  • Use ServerCnxThrottleTracker, instead of modifying channel.readable directly
  • Changes of ServerCnxThrottleTracker
    • Merge multiple tracker states into one.
    • Split the counter for each throttling type, to avoid them affecting each other, and make troubleshooting easier.
    • Instead of modifying per tracker state with CAS, let all changes in the same thread(pulsar-io-thread)
    • Add a new test to confirm that the limited state will be resumed after a period of the rate limiter, even though the producer was closed

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: x

@poorbarcode poorbarcode added this to the 4.2.0 milestone Sep 29, 2025
@poorbarcode poorbarcode self-assigned this Sep 29, 2025
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Sep 29, 2025
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good work @poorbarcode, this is a great improvment to ServerCnxThrottleTracker. A few review comments.

* @see ThrottleType
*/
public void markThrottled(ThrottleType type) {
ThrottleRes res = doMarkThrottled(type);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At the beginning of the method, could we add a check here to ensure that this is executed within the event loop of the connection?

something like

assert serverCnx.ctx().executor().inEventLoop() : "This method should be called in serverCnx.ctx().executor()";

Another possibility would be to always accept the call and if it's not in the eventloop, then run the call to the event loop.

if (!serverCnx.ctx().executor().inEventLoop()) {
   serverCnx.ctx().executor().execute(() -> markThrottled(type));
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, added the check for both markThrottled and unmarkThrottled

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This assertion breaks the protocol handler feature in streamnative/kop#1589

The root cause is that the public incrementPublishCount method is called directly by the protocol handler but it's not guaranteed to be called in the ServerCnx's event loop thread. In KoP, the ServerCnx object is just a mock object that does not serve any connection.

Currently, I made a workaround like:

            // TODO: Here we cannot leverage the `PersistentTopic#incrementPublishCount` method because it could call
            //   `markThrottled` and `unmarkThrottled` from `ServerCnxThrottleTracker`, which has a strong assertion on
            //   the thread that calls this method. See https://github.com/apache/pulsar/pull/24799 for details.
            try {
                final var topic = (AbstractTopic) producer.getTopic();
                final var bytesInCounter = AbstractTopic.class.getDeclaredField("bytesInCounter");
                bytesInCounter.setAccessible(true);
                ((LongAdder) bytesInCounter.get(topic)).add(readableBytes);
                final var msgInCounter = AbstractTopic.class.getDeclaredField("msgInCounter");
                msgInCounter.setAccessible(true);
                ((LongAdder) msgInCounter.get(topic)).add(numMessages);
            } catch (NoSuchFieldException | ClassCastException | IllegalAccessException e) {
                if (errorLoggedForReflection.compareAndSet(false, true)) {
                    log.error("Unable to update bytes and msg for topic", e);
                }
            }

Though the publish rate limiter will not be supported by the downstream protocol handler. /cc @tjiuming @Demogorgon314

Copy link
Contributor

@BewareMyPower BewareMyPower Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now, it's okay to keep this change. Because if calling setAutoRead out of the Netty thread does not work, the previous publish rate limiting feature in KoP might actually never work as expected.

@lhotari lhotari self-requested a review October 6, 2025 20:50
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please check the review comments

@poorbarcode poorbarcode requested a review from lhotari October 9, 2025 03:27
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, great job Yubiao!

@poorbarcode
Copy link
Contributor Author

/pulsarbot rerun-failure-checks

@poorbarcode poorbarcode force-pushed the improve/ServerCnxThrottleTracker branch from 2ffe240 to 3d2b4a3 Compare October 15, 2025 12:34
@poorbarcode
Copy link
Contributor Author

Rebase master

@codecov-commenter
Copy link

codecov-commenter commented Oct 15, 2025

Codecov Report

❌ Patch coverage is 85.10638% with 14 lines in your changes missing coverage. Please review.
✅ Project coverage is 74.30%. Comparing base (a091ea7) to head (2221332).
⚠️ Report is 123 commits behind head on master.

Files with missing lines Patch % Lines
...ulsar/broker/service/ServerCnxThrottleTracker.java 81.53% 3 Missing and 9 partials ⚠️
...va/org/apache/pulsar/broker/service/ServerCnx.java 75.00% 2 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #24799      +/-   ##
============================================
+ Coverage     74.29%   74.30%   +0.01%     
+ Complexity    33825    33474     -351     
============================================
  Files          1913     1913              
  Lines        149281   149312      +31     
  Branches      17325    17330       +5     
============================================
+ Hits         110902   110949      +47     
+ Misses        29540    29517      -23     
- Partials       8839     8846       +7     
Flag Coverage Δ
inttests 26.28% <21.27%> (-0.13%) ⬇️
systests 22.79% <30.85%> (-0.02%) ⬇️
unittests 73.84% <85.10%> (+0.03%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...ker/resourcegroup/ResourceGroupPublishLimiter.java 88.88% <100.00%> (+1.93%) ⬆️
...rg/apache/pulsar/broker/service/AbstractTopic.java 88.42% <100.00%> (+0.06%) ⬆️
...rg/apache/pulsar/broker/service/BrokerService.java 83.62% <100.00%> (+0.31%) ⬆️
...ava/org/apache/pulsar/broker/service/Producer.java 81.00% <ø> (-0.19%) ⬇️
.../pulsar/broker/service/PublishRateLimiterImpl.java 88.88% <100.00%> (+0.48%) ⬆️
...org/apache/pulsar/broker/service/TransportCnx.java 50.00% <ø> (ø)
...va/org/apache/pulsar/broker/service/ServerCnx.java 72.48% <75.00%> (-0.10%) ⬇️
...ulsar/broker/service/ServerCnxThrottleTracker.java 81.69% <81.53%> (+0.83%) ⬆️

... and 71 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@lhotari lhotari merged commit acad78c into apache:master Oct 15, 2025
96 of 98 checks passed
lhotari pushed a commit that referenced this pull request Oct 28, 2025
…stead of modifying channel.readable directly (#24799)

(cherry picked from commit acad78c)
lhotari pushed a commit that referenced this pull request Oct 28, 2025
…stead of modifying channel.readable directly (#24799)

(cherry picked from commit acad78c)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants