-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[pulsar-broker] Cache unack-messageId into openRangeSet #3819
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
rerun java8 tests |
4 similar comments
|
rerun java8 tests |
|
rerun java8 tests |
|
rerun java8 tests |
|
rerun java8 tests |
|
run java8 tests |
| import org.apache.bookkeeper.mledger.ManagedLedgerInfo.CursorInfo; | ||
| import org.apache.bookkeeper.mledger.ManagedLedgerInfo.MessageRangeInfo; | ||
| import org.apache.bookkeeper.test.MockedBookKeeperTestCase; | ||
| import org.assertj.core.util.Lists; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be java.utils
| import org.apache.pulsar.common.configuration.FieldContext; | ||
| import org.apache.pulsar.common.configuration.PulsarConfiguration; | ||
| import org.apache.pulsar.common.policies.data.BacklogQuota; | ||
| import org.apache.pulsar.common.util.collections.ConcurrentOpenLongPairRangeSet; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Import shouldn't be needed here
| protected long ledgerId; | ||
| protected long entryId; | ||
|
|
||
| public static PositionImpl earliest = new PositionImpl(-1, -1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also make earliest/latest as final here? (Just happened to realize they were not already...)
managed-ledger/src/main/java/org/apache/bookkeeper/mledger/ManagedLedgerConfig.java
Outdated
Show resolved
Hide resolved
| long getNumIndividualDeletedEntriesToSkip(long numEntries) { | ||
| long totalEntriesToSkip = 0; | ||
| long deletedMessages = 0; | ||
| AtomicLong totalEntriesToSkip = new AtomicLong(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of the 2 AtomicLong and 2 AtomicReference instances, we could use thread locals here, since it's just to be able to update them from within the lambda.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure.
| this.name = cursorName; | ||
| this.individualDeletedMessages = config.isUnackedRangesOpenCacheSetEnabled() | ||
| ? new ConcurrentOpenLongPairRangeSet<PositionImpl>(4096, positionRangeConverter) | ||
| : new LongPairRangeSet.DefaultRangeSet<>(positionRangeConverter); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are both cases being tested?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes..I have tested both.. however, let me data-providers to test both scenarios in existing test cases.
| assertEquals(cursorInfo.markDelete.entryId, -1); | ||
|
|
||
| assertEquals(cursorInfo.individualDeletedMessages.size(), 1); | ||
| assertEquals(cursorInfo.individualDeletedMessages.size(), 2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this due to a change in behavior?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, ConcurrentOpenLongPairRangeSet has separate BitSet for every ledger and while creating ranges it doesn't merge range from different ledgers (doesn't merge last entry of previous ledger with first entry of next ledger) so, instead one combined ledger-range it returns multiple ranges when ledgers are different.
|
rerun integration tests |
|
@merlimat addressed your comments, added data-provider in existing ManagedCursor tests to test both usecases and also added separate test which test ack-holes and markDelete with both usecases. |
|
rerun cpp tests |
1 similar comment
|
rerun cpp tests |
### Motivation Some parameters are added in the `broker.conf` and `standalone.conf` files. However, those parameters are not updated in the docs. See the following PRs for details: #4150, #4066, #4197, #3819, #4261, #4273, #4320. ### Modifications Add those parameter info, and sync docs with the code. Does not update the description quite much, there are two reasons for this: 1. Keep doc content consistent with code. We need to update the description for those parameters in the code first, and then sync them in docs. 2. Will adopt a generator to generate those content automatically in the near future.
### Motivation Some parameters are added in the `broker.conf` and `standalone.conf` files. However, those parameters are not updated in the docs. See the following PRs for details: apache#4150, apache#4066, apache#4197, apache#3819, apache#4261, apache#4273, apache#4320. ### Modifications Add those parameter info, and sync docs with the code. Does not update the description quite much, there are two reasons for this: 1. Keep doc content consistent with code. We need to update the description for those parameters in the code first, and then sync them in docs. 2. Will adopt a generator to generate those content automatically in the near future.
Motivation
This PR is on top of #3818. With this PR changes, ML can use OpenRangeSet to cache unack-messageIds. It also provides option
managedLedgerUnackedRangesOpenCacheSetEnabledto switch back to existing RangeSet so, user can flip back to original behavior to avoid any rollback. We will removemanagedLedgerUnackedRangesOpenCacheSetEnabledoption in future release once it will not be experimental.Result
Broker will not face gc-pauses in case client generates large number of unack messages into broker.
I performed perf and functional test on the changes:
PositionImplobjectsPositionImplobjects into memory