Skip to content

Conversation

@codelipenghui
Copy link
Contributor

@codelipenghui codelipenghui commented Aug 15, 2019

Fix: #4899

Motivation

PIP-39 : (https://github.com/apache/pulsar/wiki/PIP-39%3A-Namespace-Change-Events)

Introduce system topic and topic policies service which can be used support topic level policies overwrite feature.

Modification

Added system topic interface and topic policies system topic.
Added topic policies interface and system topic based implementation.

@codelipenghui codelipenghui changed the title [WIP] [PIP-39] Introduce system topic [WIP] [PIP-39] Introduce system topic and topic policies service Aug 20, 2019
@codelipenghui
Copy link
Contributor Author

run Integration Tests

@sijie sijie added area/broker type/feature The PR added a new feature or issue requested a new feature labels Aug 21, 2019
@sijie sijie added this to the 2.5.0 milestone Aug 21, 2019
conf/broker.conf Outdated
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

topicLevelPoliciesEnabled

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we need to check if system topic is enabled?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PersistentTopic persistentTopic = new PersistentTopic(topic, ledger, BrokerService.this, isSystemTopic(topic));

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comments as above.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

define this constant in common module.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comments as above

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
reader.hasMoreEventsAsync().whenComplete((has, ex) -> {
reader.hasMoreEventsAsync().whenComplete((hasMore, ex) -> {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we are using Guava cache, how do we backfill the cache if the entries are evicted out of the cache?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Topic policies service will check the reader is have no more events can read before get topic policies, so when a reader evicted, the topic policies service will create a new reader and cache it.

@codelipenghui
Copy link
Contributor Author

@sijie Thanks for the review, i have addressed your comment.

@codelipenghui codelipenghui changed the title [WIP] [PIP-39] Introduce system topic and topic policies service [PIP-39] Introduce system topic and topic policies service Aug 27, 2019
@codelipenghui
Copy link
Contributor Author

run java8 tests

@codelipenghui codelipenghui changed the title [PIP-39] Introduce system topic and topic policies service [WIP][PIP-39] Introduce system topic and topic policies service Aug 28, 2019
@codelipenghui
Copy link
Contributor Author

retest this please

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the two get calls can potentially return different values. so as a good practice, I would recommend writing in the following way.

CompletableFuture<Optional<LookupResult>> future = targetMap.get(bundle);
if (future != null && !future.isDone()) {
    // ...
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't we return the future here, since there is already an ongoing lookup, no?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am actually wondering if it is actually clearer if we create a class SystemTopic extending PersistentTopic. So it will be:

class SystemTopic extends PersistentTopic {
}

PersistentTopic persistentTopic;
if (isSystemTopic(topic)) {
    persistentTopic = new PersistentTopic(topic, ledger, BrokerService.this);
} else {
    persistentTopic = new SystemTopic(topic, ledger, BrokerService.this);
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we move this method to PersistentTopic?

so the logic can be clear:


class PersistentTopic {

    public boolean isBacklogExceed() {
         // check the backlog logic
    }
}

class SystemTopic {
    public boolean isBacklogExceed() {
         return false;
    } 
}

so the method can be simplified as 

public boolean isBacklogExceeded(PersistentTopic topic) {
    return topic.isBacklogExceeded();
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
CompletableFuture<TopicPolicies> getTopicPoliciesWithoutCacheAsync(TopicName topicName);
CompletableFuture<TopicPolicies> getTopicPoliciesBypassCacheAsync(TopicName topicName);

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

see my comment above about making a new SystemTopic class and making it extend PersistentTopic.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if (isSystemTopic || policies.compaction_threshold != 0
if (isSystemTopic() || policies.compaction_threshold != 0

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason why do you change this field here?

@codelipenghui codelipenghui force-pushed the system_topic branch 2 times, most recently from fa7914a to 97993d8 Compare October 8, 2019 03:27
@codelipenghui
Copy link
Contributor Author

run java8 tests
run integration tests

1 similar comment
@codelipenghui
Copy link
Contributor Author

run java8 tests
run integration tests

@codelipenghui
Copy link
Contributor Author

@sijie I have addressed your comments, please take a look.

@codelipenghui codelipenghui changed the title [WIP][PIP-39] Introduce system topic and topic policies service [PIP-39] Introduce system topic and topic policies service Oct 8, 2019
@codelipenghui
Copy link
Contributor Author

run integration tests

@codelipenghui
Copy link
Contributor Author

run java8 tests

1 similar comment
@codelipenghui
Copy link
Contributor Author

run java8 tests

@codelipenghui
Copy link
Contributor Author

run java8 tests
run integration tests

@codelipenghui
Copy link
Contributor Author

run java8 tests

@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

3 similar comments
@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

@codelipenghui
Copy link
Contributor Author

/pulsarbot run-failure-checks

@Huanli-Meng
Copy link
Contributor

Pulsar docs to be updated (Proposed):
Concepts and architecture > Messaging > Namespace: add description about namespace change event description and how to use namespace change event to implement topic level policy.
Reference> Pulsar reference: add systemTopicEnabled and topicLevelPoliciesEnabled configuration in broker and standalone section.

@sijie , @codelipenghui , Please help check whether any doc updates are missed.

@codelipenghui
Copy link
Contributor Author

@Huanli-Meng We don't need to add a document for this PR because this is a basic framework that can be used to develop topic level policy on it. After we add a topic level policy, that is a suitable time to add documents.

@Huanli-Meng
Copy link
Contributor

@codelipenghui got it. Thanks.

@sijie sijie merged commit 9cdf3fc into apache:master Jun 2, 2020
@codelipenghui codelipenghui deleted the system_topic branch June 2, 2020 00:59
Huanli-Meng pushed a commit to Huanli-Meng/pulsar that referenced this pull request Jun 12, 2020
Fix: apache#4899
### Motivation
PIP-39 : (https://github.com/apache/pulsar/wiki/PIP-39%3A-Namespace-Change-Events)

Introduce system topic and topic policies service which can be used support topic level policies overwrite feature.

### Modification
Added system topic interface and topic policies system topic.
Added topic policies interface and system topic based implementation.
huangdx0726 pushed a commit to huangdx0726/pulsar that referenced this pull request Aug 24, 2020
Fix: apache#4899
### Motivation
PIP-39 : (https://github.com/apache/pulsar/wiki/PIP-39%3A-Namespace-Change-Events)

Introduce system topic and topic policies service which can be used support topic level policies overwrite feature.

### Modification
Added system topic interface and topic policies system topic.
Added topic policies interface and system topic based implementation.
AlvaroStream added a commit to AlvaroStream/pulsar-site that referenced this pull request Jun 28, 2023
Not valid from 2.6.0 

Thanks @gaoran10

> We introduce topic-level policies from 2.6.0, refer to apache/pulsar#4955, but I think we support retention policies at the topic level from 2.7.0, refer to apache/pulsar#7747.

>Make the default value of the configuration topicLevelPoliciesEnabled as true from 2.11.0, refer to apache/pulsar#15619.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

area/broker type/feature The PR added a new feature or issue requested a new feature

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Namespace Change Events Support

4 participants