Skip to content

Conversation

@rdhabalia
Copy link
Contributor

Motivation

Pulsar-broker caches individually deleted message-ids into the RangeSet which stores large number of messageId objects into main memory for longer time. Sometimes some of the clients don't behave properly and generate large number of unack messages and triggers frequent redelivery which continuously allocates large number of messageId objects and it creates very high GC pressure and broker ends up into high gc-pauses which is not acceptable.

To solve this problem, broker should cache messageId into OpenRangeSet to avoid object allocation of messageIds.

Modification

This PR introduces ConcurrentOpenLongPairRangeSet that stores ranges without allocating objects.

Note:

This PR contains data-structure change for RangeSet and I will create separate PR to use ConcurrentOpenLongPairRangeSet into managed-ledger.

@rdhabalia rdhabalia added this to the 2.4.0 milestone Mar 14, 2019
@rdhabalia rdhabalia self-assigned this Mar 14, 2019
@rdhabalia rdhabalia changed the title [pulsar-common] add open Concurrent Open-LongPair RangeSet [pulsar-common] add open Concurrent LongPair RangeSet Mar 14, 2019
@rdhabalia rdhabalia requested a review from massakam March 14, 2019 07:23
@rdhabalia rdhabalia modified the milestones: 2.4.0, 2.3.1 Mar 14, 2019
@rdhabalia
Copy link
Contributor Author

rerun java8 tests

@merlimat merlimat modified the milestones: 2.3.1, 2.4.0 Mar 29, 2019
@rdhabalia
Copy link
Contributor Author

rerun java8 tests

1 similar comment
@merlimat
Copy link
Contributor

rerun java8 tests

@merlimat
Copy link
Contributor

@rdhabalia Sure, I forgot about this one. Looking now

private volatile String cachedToString = "[]";
private volatile boolean updatedAfterCached = true;

public ConcurrentOpenLongPairRangeSet(LongPairConsumer<T> consumer) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clarify what is the purpose of the "consumer"? Since it's not typical to have a consumer on a set.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConcurrentOpenLongPairRangeSet<T> stores T (PositionImpl) into bits. However, many of the apis require converter to create T (PositionImpl) from the stored bit. eg: Range<T> firstRange(), void forEach(RangeProcessor<T> action), Range<T> span(), Collection<Range<T>> asRanges() so, instead adding Converter-Consumer in signature of all apis, I have added as part of constructor.
Also, com.google.common.collect.RangeSet doesn't use this converter-consumer because it directly stores T(PositionImpl) and retrieves it from the stored data-structure. So, we have LongPairRangeSet interface which mostly follows com.google.common.collect.RangeSet api-definition and i don't want to change them by adding converter-consumer in each of those methods.

* Usage:
* a. This can be used if one doesn't want to create object for every new inserted {@code range}
* b. It creates {@link BitSet} for every unique first-key of the range.
* So, this rangeSet is not suitable for large number of unique keys.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could this be improved by pointing to a static bitset for unique keys?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think static bitset will work because every RangeSet will have its own state.
eg:
topic T1 has two managed-cursors mc1, mc2.
mc1 and mc2 need separate BitSet to store their separate ranges. so, static bitset will not work here.


@Override
public String toString() {
if (updatedAfterCached) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The toString() should typically never be called in "real-world" (other than tests or debug logs). Updating the volatile flag will have a small overhead that we could avoid by removing this optimization.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually toString() is also called everytime when we call stats or stats-internal. so, this flag can help us to use cached output if it's not changed and can save cpu.

* @throws NegativeArraySizeException
* if the specified initial size is negative
*/
public ConcurrentBitSet(int nbits) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you clarify why use a bit-set instead of (first, last) pair for the ranges?

Copy link
Contributor

@merlimat merlimat left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@rdhabalia
Copy link
Contributor Author

rerun cpp tests
rerun integration tests
rerun java8 tests

@rdhabalia rdhabalia merged commit ce685dc into apache:master May 18, 2019
@rdhabalia rdhabalia deleted the redel_gc branch May 18, 2019 06:56
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants