Search before asking
Version
3.0.1
Minimal reproduce step
- broker.conf:
systemTopicEnabled: "true"
topicLevelPoliciesEnabled: "true"
3.Setting the publishRate is to ensure the system topic has data
String namespace = "xxx/xxx";
List<String> topicList = pulsarAdmin.topics().getPartitionedTopicList(namespace);
topicList = topicList.subList(0, 20);
for (String topic : topicList) {
PublishRate publishRate = new PublishRate();
publishRate.publishThrottlingRateInMsg = 100;
pulsarAdmin.topicPolicies().setPublishRate(topic, publishRate);
}
- Concurrent query lookup:
topicList = pulsarAdmin.topics().getPartitionedTopicList(namespace);
topicList = topicList.subList(0, 20);
for (String topic : topicList) {
customThreadPool.execute(()->{
try {
while (true){
pulsarAdmin.lookups().lookupPartitionedTopic(topic);
}
} catch (PulsarAdminException e) {
throw new RuntimeException(e);
}
});
}
- Upgrade 3.0.1 from 2.11.1, this will restart broker(Or
unload namespace/topic).
If version 3.0.1 has already been installed, It can also be restart broker:
k rollout restart statefulset/pulsar-broker
Eventually it was discovered that some topic partitions had been deleted.
try {
stats = pulsarAdmin.topics().getPartitionedStats(topic, true);
} catch (PulsarAdminException e) {
throw new RuntimeException(e);
}
int partitions = stats.getMetadata().partitions;
int size = stats.getPartitions().size();
if (partitions != size) {
log.info("**********topic={},partitions={},size={}", topic, partitions, size);
// pulsarAdmin.topics().updatePartitionedTopic(topic, partitions);
}
What did you expect to see?
Data integrity.
What did you see instead?
About 200+ topics were deleted.
Anything else?
The logs of deleted topics:

The relevant stack logs:

Releted code:
|
public CompletableFuture<Void> checkReplication() { |
|
TopicName name = TopicName.get(topic); |
|
if (!name.isGlobal() || NamespaceService.isHeartbeatNamespace(name)) { |
|
return CompletableFuture.completedFuture(null); |
|
} |
|
|
|
if (log.isDebugEnabled()) { |
|
log.debug("[{}] Checking replication status", name); |
|
} |
|
|
|
List<String> configuredClusters = topicPolicies.getReplicationClusters().get(); |
|
int newMessageTTLInSeconds = topicPolicies.getMessageTTLInSeconds().get(); |
|
|
|
String localCluster = brokerService.pulsar().getConfiguration().getClusterName(); |
|
|
|
// if local cluster is removed from global namespace cluster-list : then delete topic forcefully |
|
// because pulsar doesn't serve global topic without local repl-cluster configured. |
|
if (TopicName.get(topic).isGlobal() && !configuredClusters.contains(localCluster)) { |
|
log.info("Deleting topic [{}] because local cluster is not part of " |
|
+ " global namespace repl list {}", topic, configuredClusters); |
|
return deleteForcefully(); |
|
} |
Are you willing to submit a PR?
Search before asking
Version
3.0.1
Minimal reproduce step
3.Setting the
publishRateis to ensure the system topic has dataunloadnamespace/topic).If version 3.0.1 has already been installed, It can also be restart broker:
Eventually it was discovered that some topic partitions had been deleted.
What did you expect to see?
Data integrity.
What did you see instead?
About 200+ topics were deleted.
Anything else?
The logs of deleted topics:

The relevant stack logs:

Releted code:
pulsar/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java
Lines 1569 to 1590 in 90a82ae
Are you willing to submit a PR?