Skip to content

[Broker] Backlog Quota not properly fetched for message_age BacklogQuotaType #11404

@MarvinCai

Description

@MarvinCai

Describe the bug
pr #10093 introduced time based backlog and new BacklogQuotaType message_age.
While we can use admin cli to set backlog limit for message_age type, when fetching backlog

.getOrDefault(BacklogQuotaType.destination_storage, defaultQuota))
.orElse(defaultQuota);
} catch (Exception e) {
log.warn("Failed to read policies data, will apply the default backlog quota: namespace={}", namespace, e);
return this.defaultQuota;
}
}
public BacklogQuotaImpl getBacklogQuota(TopicName topicName) {
String policyPath = AdminResource.path(POLICIES, topicName.getNamespace());
if (!isTopicLevelPoliciesEnable) {
return getBacklogQuota(topicName.getNamespace(), policyPath);
}
try {
if (pulsar.getTopicPoliciesService().cacheIsInitialized(topicName)) {
return Optional.ofNullable(pulsar.getTopicPoliciesService().getTopicPolicies(topicName))
.map(TopicPolicies::getBackLogQuotaMap)
.map(map -> map.get(BacklogQuotaType.destination_storage.name()))
, by default we only fetch for destination_storage type, so we set backlog to message_age type but when checking backlog we're not checking against limit we set for time based quota.

Metadata

Metadata

Assignees

Labels

type/bugThe PR fixed a bug or issue reported a bug

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions