-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[improve][broker] PIP-379: Key_Shared Draining Hashes for Improved Message Ordering #23352
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
923f379 to
48b2022
Compare
1855f33 to
a6d9bf3
Compare
codelipenghui
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The change looks good.
Just left a few minor comments.
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedCursor.java
Outdated
Show resolved
Hide resolved
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
Outdated
Show resolved
Hide resolved
...c/main/java/org/apache/pulsar/broker/service/ConsistentHashingStickyKeyConsumerSelector.java
Show resolved
Hide resolved
- it's used for quick profiling
nicoloboschi
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, nice work
.../apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
Show resolved
Hide resolved
.../apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
Show resolved
Hide resolved
.../apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
Show resolved
Hide resolved
| // deduplication for readMoreEntriesAsync calls | ||
| if (readMoreEntriesAsyncRequested.compareAndSet(false, true)) { | ||
| topic.getBrokerService().executor().execute(() -> { | ||
| readMoreEntriesAsyncRequested.set(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi, Lari, sorry to bother you, But when I read the source code, I'm confused about this:
- It uses an atomic flag
readMoreEntriesAsyncRequestedto "block" duplicate requests - But resets the flag right when the task starts, not after completion
- Since
topic.getBrokerService().executor()is a multi-threaded pool (has multiple core threads):
This lets new requests jump in immediately - Multiple readMoreEntries() can run concurrently
could you explain how this work? I would be so much appreciate!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@3pacccccc The intention isn't to prevent concurrent execution. The short comment "deduplication for readMoreEntriesAsync calls" explains the intention. In Pulsar code, readMoreEntries and readMoreEntriesAsync is called in multiple locations to ensure that all entries that were made available in a way or another would get dispatched. That's why the deduplication makes sense since it will continue to provide that guarantee while reducing the extra overhead of unnecessary concurrent or subsequent calls to readMoreEntries.
Since in most (or all) dispatcher implementations the method "readMoreEntries" itself is a synchronous method, it will serialize the execution eventually. I agree that this isn't an optimal solution how this is handled in dispatchers, but changing it wasn't the intention when the deduplication was added in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lhotari OK, I get it, thank you so much!
Fixes #23307
Fixes #21199
Fixes #15705
Fixes #21656
Fixes #20899
Fixes #20885
Implementation for PIP-379: Key_Shared Draining Hashes for Improved Message Ordering
Mailing list discussion about this PR
4.0.x docs for the implementation: https://pulsar.apache.org/docs/4.0.x/concepts-messaging/#preserving-order-of-message-delivery-by-key
Motivation
See PIP-379: Key_Shared Draining Hashes for Improved Message Ordering
PIP-379 aims to address several issues with the current implementation and introduce a more efficient mechanism for managing message ordering.
Problem:
The current Key_Shared implementation faces challenges including:
PIP-379 introduces a "draining hashes" concept to efficiently manage
message ordering by tracking affected hashes when consumer assignments
change.
Benefits:
This proposal would replace the existing "recently joined consumers"
mechanism, addressing its limitations while providing a more robust
solution.
Modifications
See PIP-379: Key_Shared Draining Hashes for Improved Message Ordering for high level design with simplified code examples.
Documentation
docdoc-requireddoc-not-neededdoc-complete