-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[improve][broker] Part-2 of PIP-434: Use ServerCnxThrottleTracker, instead of modifying channel.readable directly #24799
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[improve][broker] Part-2 of PIP-434: Use ServerCnxThrottleTracker, instead of modifying channel.readable directly #24799
Conversation
lhotari
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good work @poorbarcode, this is a great improvment to ServerCnxThrottleTracker. A few review comments.
| * @see ThrottleType | ||
| */ | ||
| public void markThrottled(ThrottleType type) { | ||
| ThrottleRes res = doMarkThrottled(type); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
At the beginning of the method, could we add a check here to ensure that this is executed within the event loop of the connection?
something like
assert serverCnx.ctx().executor().inEventLoop() : "This method should be called in serverCnx.ctx().executor()";
Another possibility would be to always accept the call and if it's not in the eventloop, then run the call to the event loop.
if (!serverCnx.ctx().executor().inEventLoop()) {
serverCnx.ctx().executor().execute(() -> markThrottled(type));
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, added the check for both markThrottled and unmarkThrottled
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This assertion breaks the protocol handler feature in streamnative/kop#1589
The root cause is that the public incrementPublishCount method is called directly by the protocol handler but it's not guaranteed to be called in the ServerCnx's event loop thread. In KoP, the ServerCnx object is just a mock object that does not serve any connection.
Currently, I made a workaround like:
// TODO: Here we cannot leverage the `PersistentTopic#incrementPublishCount` method because it could call
// `markThrottled` and `unmarkThrottled` from `ServerCnxThrottleTracker`, which has a strong assertion on
// the thread that calls this method. See https://github.com/apache/pulsar/pull/24799 for details.
try {
final var topic = (AbstractTopic) producer.getTopic();
final var bytesInCounter = AbstractTopic.class.getDeclaredField("bytesInCounter");
bytesInCounter.setAccessible(true);
((LongAdder) bytesInCounter.get(topic)).add(readableBytes);
final var msgInCounter = AbstractTopic.class.getDeclaredField("msgInCounter");
msgInCounter.setAccessible(true);
((LongAdder) msgInCounter.get(topic)).add(numMessages);
} catch (NoSuchFieldException | ClassCastException | IllegalAccessException e) {
if (errorLoggedForReflection.compareAndSet(false, true)) {
log.error("Unable to update bytes and msg for topic", e);
}
}Though the publish rate limiter will not be supported by the downstream protocol handler. /cc @tjiuming @Demogorgon314
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For now, it's okay to keep this change. Because if calling setAutoRead out of the Netty thread does not work, the previous publish rate limiting feature in KoP might actually never work as expected.
...broker/src/main/java/org/apache/pulsar/broker/resourcegroup/ResourceGroupPublishLimiter.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java
Outdated
Show resolved
Hide resolved
lhotari
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please check the review comments
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ServerCnxThrottleTracker.java
Outdated
Show resolved
Hide resolved
lhotari
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, great job Yubiao!
|
/pulsarbot rerun-failure-checks |
2ffe240 to
3d2b4a3
Compare
|
Rebase master |
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #24799 +/- ##
============================================
+ Coverage 74.29% 74.30% +0.01%
+ Complexity 33825 33474 -351
============================================
Files 1913 1913
Lines 149281 149312 +31
Branches 17325 17330 +5
============================================
+ Hits 110902 110949 +47
+ Misses 29540 29517 -23
- Partials 8839 8846 +7
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Motivation
See the PIP-434
Modifications
ServerCnxThrottleTracker, instead of modifyingchannel.readabledirectlyServerCnxThrottleTrackerpulsar-io-thread)Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: x