[improve][broker] Repeat the handleMetadataChanges callback when configurationMetadataStore equals localMetadataStore #22519
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Motivation
Repeat the
handleMetadataChangescallback whenconfigurationMetadataStoreequalslocalMetadataStore.This is caused by repeatedly registering the same callback
handleMetadataChangesfor the same object in classBrokerService.pulsar.getLocalMetadataStore().registerListener(this::handleMetadataChanges); pulsar.getConfigurationMetadataStore().registerListener(this::handleMetadataChanges);For example, when we perform the update namespaces policies, here we call set-message-ttl:
sh bin/pulsar-admin namespaces set-message-ttl --messageTTL 60 test-tenant/test-nsWe will see the server output two lines of the same log:
`
2024-04-16T21:43:05,378+0800 [pulsar-15-5] INFO org.apache.pulsar.broker.service.BrokerService (BrokerService.java:2446) - [test-tenant/test-ns] updating with Policies(auth_policies=AuthPoliciesImpl(namespaceAuthentication={}, topicAuthentication={}, subscriptionAuthentication={}), replication_clusters=[standalone], bundles=BundlesDataImpl(boundaries=[0x00000000, 0x40000000, 0x80000000, 0xc0000000, 0xffffffff], numBundles=4), backlog_quota_map={}, clusterDispatchRate={}, topicDispatchRate={}, subscriptionDispatchRate={}, replicatorDispatchRate={}, clusterSubscribeRate={}, persistence=null, deduplicationEnabled=null, autoTopicCreationOverride=null, autoSubscriptionCreationOverride=null, publishMaxMessageRate={}, latency_stats_sample_rate={}, message_ttl_in_seconds=60, subscription_expiration_time_minutes=null, retention_policies=RetentionPolicies{retentionTimeInMinutes=10, retentionSizeInMB=-1}, deleted=false, encryption_required=false, delayed_delivery_policies=null, inactive_topic_policies=null, subscription_auth_mode=None, max_producers_per_topic=null, max_consumers_per_topic=null, max_consumers_per_subscription=null, max_unacked_messages_per_consumer=null, max_unacked_messages_per_subscription=null, max_subscriptions_per_topic=null, compaction_threshold=null, offload_threshold=-1, offload_threshold_in_seconds=-1, offload_deletion_lag_ms=null, max_topics_per_namespace=null, schema_auto_update_compatibility_strategy=null, schema_compatibility_strategy=UNDEFINED, is_allow_auto_update_schema=null, schema_validation_enforced=false, offload_policies=null, deduplicationSnapshotIntervalSeconds=null, subscription_types_enabled=[], properties={}, resource_group_name=null, migrated=false, dispatcherPauseOnAckStatePersistentEnabled=null, entryFilters=null)
2024-04-16T21:43:05,378+0800 [pulsar-15-12] INFO org.apache.pulsar.broker.service.BrokerService (BrokerService.java:2446) - [test-tenant/test-ns] updating with Policies(auth_policies=AuthPoliciesImpl(namespaceAuthentication={}, topicAuthentication={}, subscriptionAuthentication={}), replication_clusters=[standalone], bundles=BundlesDataImpl(boundaries=[0x00000000, 0x40000000, 0x80000000, 0xc0000000, 0xffffffff], numBundles=4), backlog_quota_map={}, clusterDispatchRate={}, topicDispatchRate={}, subscriptionDispatchRate={}, replicatorDispatchRate={}, clusterSubscribeRate={}, persistence=null, deduplicationEnabled=null, autoTopicCreationOverride=null, autoSubscriptionCreationOverride=null, publishMaxMessageRate={}, latency_stats_sample_rate={}, message_ttl_in_seconds=60, subscription_expiration_time_minutes=null, retention_policies=RetentionPolicies{retentionTimeInMinutes=10, retentionSizeInMB=-1}, deleted=false, encryption_required=false, delayed_delivery_policies=null, inactive_topic_policies=null, subscription_auth_mode=None, max_producers_per_topic=null, max_consumers_per_topic=null, max_consumers_per_subscription=null, max_unacked_messages_per_consumer=null, max_unacked_messages_per_subscription=null, max_subscriptions_per_topic=null, compaction_threshold=null, offload_threshold=-1, offload_threshold_in_seconds=-1, offload_deletion_lag_ms=null, max_topics_per_namespace=null, schema_auto_update_compatibility_strategy=null, schema_compatibility_strategy=UNDEFINED, is_allow_auto_update_schema=null, schema_validation_enforced=false, offload_policies=null, deduplicationSnapshotIntervalSeconds=null, subscription_types_enabled=[], properties={}, resource_group_name=null, migrated=false, dispatcherPauseOnAckStatePersistentEnabled=null, entryFilters=null)
`
Modifications
Do not register listener for
configurationMetadataStorewhenconfigurationMetadataStoreequalslocalMetadataStore.Documentation
docdoc-requireddoc-not-neededdoc-complete