-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[PIP-39] Introduce system topic and topic policies service #4955
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
run Integration Tests |
conf/broker.conf
Outdated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
topicLevelPoliciesEnabled
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need to check if system topic is enabled?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PersistentTopic persistentTopic = new PersistentTopic(topic, ledger, BrokerService.this, isSystemTopic(topic));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comments as above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
define this constant in common module.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comments as above
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| reader.hasMoreEventsAsync().whenComplete((has, ex) -> { | |
| reader.hasMoreEventsAsync().whenComplete((hasMore, ex) -> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are using Guava cache, how do we backfill the cache if the entries are evicted out of the cache?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Topic policies service will check the reader is have no more events can read before get topic policies, so when a reader evicted, the topic policies service will create a new reader and cache it.
|
@sijie Thanks for the review, i have addressed your comment. |
|
run java8 tests |
|
retest this please |
fc8721a to
1516044
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the two get calls can potentially return different values. so as a good practice, I would recommend writing in the following way.
CompletableFuture<Optional<LookupResult>> future = targetMap.get(bundle);
if (future != null && !future.isDone()) {
// ...
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't we return the future here, since there is already an ongoing lookup, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am actually wondering if it is actually clearer if we create a class SystemTopic extending PersistentTopic. So it will be:
class SystemTopic extends PersistentTopic {
}
PersistentTopic persistentTopic;
if (isSystemTopic(topic)) {
persistentTopic = new PersistentTopic(topic, ledger, BrokerService.this);
} else {
persistentTopic = new SystemTopic(topic, ledger, BrokerService.this);
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move this method to PersistentTopic?
so the logic can be clear:
class PersistentTopic {
public boolean isBacklogExceed() {
// check the backlog logic
}
}
class SystemTopic {
public boolean isBacklogExceed() {
return false;
}
}
so the method can be simplified as
public boolean isBacklogExceeded(PersistentTopic topic) {
return topic.isBacklogExceeded();
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| CompletableFuture<TopicPolicies> getTopicPoliciesWithoutCacheAsync(TopicName topicName); | |
| CompletableFuture<TopicPolicies> getTopicPoliciesBypassCacheAsync(TopicName topicName); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
see my comment above about making a new SystemTopic class and making it extend PersistentTopic.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| if (isSystemTopic || policies.compaction_threshold != 0 | |
| if (isSystemTopic() || policies.compaction_threshold != 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any reason why do you change this field here?
2fb4f9b to
4a50ab0
Compare
fa7914a to
97993d8
Compare
|
run java8 tests |
1 similar comment
|
run java8 tests |
|
@sijie I have addressed your comments, please take a look. |
|
run integration tests |
|
run java8 tests |
1 similar comment
|
run java8 tests |
0ffb5e7 to
ad2c296
Compare
|
run java8 tests |
|
run java8 tests |
a405454 to
1f5a174
Compare
|
/pulsarbot run-failure-checks |
|
/pulsarbot run-failure-checks |
3 similar comments
|
/pulsarbot run-failure-checks |
|
/pulsarbot run-failure-checks |
|
/pulsarbot run-failure-checks |
|
/pulsarbot run-failure-checks |
|
/pulsarbot run-failure-checks |
|
/pulsarbot run-failure-checks |
|
Pulsar docs to be updated (Proposed): @sijie , @codelipenghui , Please help check whether any doc updates are missed. |
|
@Huanli-Meng We don't need to add a document for this PR because this is a basic framework that can be used to develop topic level policy on it. After we add a topic level policy, that is a suitable time to add documents. |
|
@codelipenghui got it. Thanks. |
Fix: apache#4899 ### Motivation PIP-39 : (https://github.com/apache/pulsar/wiki/PIP-39%3A-Namespace-Change-Events) Introduce system topic and topic policies service which can be used support topic level policies overwrite feature. ### Modification Added system topic interface and topic policies system topic. Added topic policies interface and system topic based implementation.
Fix: apache#4899 ### Motivation PIP-39 : (https://github.com/apache/pulsar/wiki/PIP-39%3A-Namespace-Change-Events) Introduce system topic and topic policies service which can be used support topic level policies overwrite feature. ### Modification Added system topic interface and topic policies system topic. Added topic policies interface and system topic based implementation.
Not valid from 2.6.0 Thanks @gaoran10 > We introduce topic-level policies from 2.6.0, refer to apache/pulsar#4955, but I think we support retention policies at the topic level from 2.7.0, refer to apache/pulsar#7747. >Make the default value of the configuration topicLevelPoliciesEnabled as true from 2.11.0, refer to apache/pulsar#15619.
Fix: #4899
Motivation
PIP-39 : (https://github.com/apache/pulsar/wiki/PIP-39%3A-Namespace-Change-Events)
Introduce system topic and topic policies service which can be used support topic level policies overwrite feature.
Modification
Added system topic interface and topic policies system topic.
Added topic policies interface and system topic based implementation.