Skip to content

Commit 9aed736

Browse files
authored
[fix] [broker] response not-found error if topic does not exist when calling getPartitionedTopicMetadata (#22838)
1 parent f3d4d5a commit 9aed736

File tree

14 files changed

+899
-449
lines changed

14 files changed

+899
-449
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/PersistentTopicsBase.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -561,13 +561,13 @@ protected CompletableFuture<PartitionedTopicMetadata> internalGetPartitionedMeta
561561
// is a non-partitioned topic so we shouldn't check if the topic exists.
562562
return pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName)
563563
.thenCompose(brokerAllowAutoTopicCreation -> {
564-
if (checkAllowAutoCreation) {
564+
if (checkAllowAutoCreation && brokerAllowAutoTopicCreation) {
565565
// Whether it exists or not, auto create a non-partitioned topic by client.
566566
return CompletableFuture.completedFuture(metadata);
567567
} else {
568568
// If it does not exist, response a Not Found error.
569569
// Otherwise, response a non-partitioned metadata.
570-
return internalCheckTopicExists(topicName).thenApply(__ -> metadata);
570+
return internalCheckNonPartitionedTopicExists(topicName).thenApply(__ -> metadata);
571571
}
572572
});
573573
}
@@ -715,6 +715,17 @@ public void updatePropertiesFailed(ManagedLedgerException exception, Object ctx)
715715

716716
protected CompletableFuture<Void> internalCheckTopicExists(TopicName topicName) {
717717
return pulsar().getNamespaceService().checkTopicExists(topicName)
718+
.thenAccept(info -> {
719+
boolean exists = info.isExists();
720+
info.recycle();
721+
if (!exists) {
722+
throw new RestException(Status.NOT_FOUND, getTopicNotFoundErrorMessage(topicName.toString()));
723+
}
724+
});
725+
}
726+
727+
protected CompletableFuture<Void> internalCheckNonPartitionedTopicExists(TopicName topicName) {
728+
return pulsar().getNamespaceService().checkNonPartitionedTopicExists(topicName)
718729
.thenAccept(exist -> {
719730
if (!exist) {
720731
throw new RestException(Status.NOT_FOUND, getTopicNotFoundErrorMessage(topicName.toString()));
@@ -5338,8 +5349,10 @@ protected CompletableFuture<Void> validateShadowTopics(List<String> shadowTopics
53385349
"Only persistent topic can be set as shadow topic"));
53395350
}
53405351
futures.add(pulsar().getNamespaceService().checkTopicExists(shadowTopicName)
5341-
.thenAccept(isExists -> {
5342-
if (!isExists) {
5352+
.thenAccept(info -> {
5353+
boolean exists = info.isExists();
5354+
info.recycle();
5355+
if (!exists) {
53435356
throw new RestException(Status.PRECONDITION_FAILED,
53445357
"Shadow topic [" + shadowTopic + "] not exists.");
53455358
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/NonPersistentTopics.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,8 +98,20 @@ public void getPartitionedMetadata(
9898
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative,
9999
@ApiParam(value = "Is check configuration required to automatically create topic")
100100
@QueryParam("checkAllowAutoCreation") @DefaultValue("false") boolean checkAllowAutoCreation) {
101-
super.getPartitionedMetadata(asyncResponse, tenant, namespace, encodedTopic, authoritative,
102-
checkAllowAutoCreation);
101+
validateTopicName(tenant, namespace, encodedTopic);
102+
validateTopicOwnershipAsync(topicName, authoritative).whenComplete((__, ex) -> {
103+
if (ex != null) {
104+
Throwable actEx = FutureUtil.unwrapCompletionException(ex);
105+
if (isNot307And404Exception(actEx)) {
106+
log.error("[{}] Failed to get internal stats for topic {}", clientAppId(), topicName, ex);
107+
}
108+
resumeAsyncResponseExceptionally(asyncResponse, actEx);
109+
} else {
110+
// "super.getPartitionedMetadata" will handle error itself.
111+
super.getPartitionedMetadata(asyncResponse, tenant, namespace, encodedTopic, authoritative,
112+
checkAllowAutoCreation);
113+
}
114+
});
103115
}
104116

105117
@GET

pulsar-broker/src/main/java/org/apache/pulsar/broker/lookup/TopicLookupBase.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -67,16 +67,22 @@ protected CompletableFuture<LookupData> internalLookupTopicAsync(final TopicName
6767
.thenCompose(__ -> validateGlobalNamespaceOwnershipAsync(topicName.getNamespaceObject()))
6868
.thenCompose(__ -> validateTopicOperationAsync(topicName, TopicOperation.LOOKUP, null))
6969
.thenCompose(__ -> {
70+
// Case-1: Non-persistent topic.
7071
// Currently, it's hard to check the non-persistent-non-partitioned topic, because it only exists
7172
// in the broker, it doesn't have metadata. If the topic is non-persistent and non-partitioned,
72-
// we'll return the true flag.
73-
CompletableFuture<Boolean> existFuture = (!topicName.isPersistent() && !topicName.isPartitioned())
74-
? CompletableFuture.completedFuture(true)
75-
: pulsar().getNamespaceService().checkTopicExists(topicName)
76-
.thenCompose(exists -> exists ? CompletableFuture.completedFuture(true)
77-
: pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName));
78-
79-
return existFuture;
73+
// we'll return the true flag. So either it is a partitioned topic or not, the result will be true.
74+
if (!topicName.isPersistent()) {
75+
return CompletableFuture.completedFuture(true);
76+
}
77+
// Case-2: Persistent topic.
78+
return pulsar().getNamespaceService().checkTopicExists(topicName).thenCompose(info -> {
79+
boolean exists = info.isExists();
80+
info.recycle();
81+
if (exists) {
82+
return CompletableFuture.completedFuture(true);
83+
}
84+
return pulsar().getBrokerService().isAllowAutoTopicCreationAsync(topicName);
85+
});
8086
})
8187
.thenCompose(exist -> {
8288
if (!exist) {

pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java

Lines changed: 75 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import java.util.regex.Pattern;
5252
import java.util.stream.Collectors;
5353
import javax.annotation.Nullable;
54+
import lombok.extern.slf4j.Slf4j;
5455
import org.apache.commons.collections4.CollectionUtils;
5556
import org.apache.commons.collections4.ListUtils;
5657
import org.apache.commons.lang3.StringUtils;
@@ -72,6 +73,7 @@
7273
import org.apache.pulsar.broker.stats.prometheus.metrics.Summary;
7374
import org.apache.pulsar.broker.web.PulsarWebResource;
7475
import org.apache.pulsar.client.admin.PulsarAdmin;
76+
import org.apache.pulsar.client.admin.PulsarAdminException;
7577
import org.apache.pulsar.client.api.ClientBuilder;
7678
import org.apache.pulsar.client.api.PulsarClient;
7779
import org.apache.pulsar.client.api.PulsarClientException;
@@ -123,6 +125,7 @@
123125
*
124126
* @see org.apache.pulsar.broker.PulsarService
125127
*/
128+
@Slf4j
126129
public class NamespaceService implements AutoCloseable {
127130
private static final Logger LOG = LoggerFactory.getLogger(NamespaceService.class);
128131

@@ -1400,40 +1403,86 @@ public CompletableFuture<List<String>> getOwnedTopicListForNamespaceBundle(Names
14001403
});
14011404
}
14021405

1403-
public CompletableFuture<Boolean> checkTopicExists(TopicName topic) {
1404-
CompletableFuture<Boolean> future;
1405-
// If the topic is persistent and the name includes `-partition-`, find the topic from the managed/ledger.
1406-
if (topic.isPersistent() && topic.isPartitioned()) {
1407-
future = pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
1406+
/***
1407+
* Check topic exists( partitioned or non-partitioned ).
1408+
*/
1409+
public CompletableFuture<TopicExistsInfo> checkTopicExists(TopicName topic) {
1410+
return pulsar.getBrokerService()
1411+
.fetchPartitionedTopicMetadataAsync(TopicName.get(topic.toString()))
1412+
.thenCompose(metadata -> {
1413+
if (metadata.partitions > 0) {
1414+
return CompletableFuture.completedFuture(
1415+
TopicExistsInfo.newPartitionedTopicExists(metadata.partitions));
1416+
}
1417+
return checkNonPartitionedTopicExists(topic)
1418+
.thenApply(b -> b ? TopicExistsInfo.newNonPartitionedTopicExists()
1419+
: TopicExistsInfo.newTopicNotExists());
1420+
});
1421+
}
1422+
1423+
/***
1424+
* Check non-partitioned topic exists.
1425+
*/
1426+
public CompletableFuture<Boolean> checkNonPartitionedTopicExists(TopicName topic) {
1427+
if (topic.isPersistent()) {
1428+
return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
14081429
} else {
1409-
future = CompletableFuture.completedFuture(false);
1430+
return checkNonPersistentNonPartitionedTopicExists(topic.toString());
14101431
}
1432+
}
14111433

1412-
return future.thenCompose(found -> {
1413-
if (found != null && found) {
1414-
return CompletableFuture.completedFuture(true);
1434+
/**
1435+
* Regarding non-persistent topic, we do not know whether it exists or not. Redirect the request to the ownership
1436+
* broker of this topic. HTTP API has implemented the mechanism that redirect to ownership broker, so just call
1437+
* HTTP API here.
1438+
*/
1439+
public CompletableFuture<Boolean> checkNonPersistentNonPartitionedTopicExists(String topic) {
1440+
TopicName topicName = TopicName.get(topic);
1441+
// "non-partitioned & non-persistent" topics only exist on the owner broker.
1442+
return checkTopicOwnership(TopicName.get(topic)).thenCompose(isOwned -> {
1443+
// The current broker is the owner.
1444+
if (isOwned) {
1445+
CompletableFuture<Optional<Topic>> nonPersistentTopicFuture = pulsar.getBrokerService()
1446+
.getTopic(topic, false);
1447+
if (nonPersistentTopicFuture != null) {
1448+
return nonPersistentTopicFuture.thenApply(Optional::isPresent);
1449+
} else {
1450+
return CompletableFuture.completedFuture(false);
1451+
}
14151452
}
14161453

1417-
return pulsar.getBrokerService()
1418-
.fetchPartitionedTopicMetadataAsync(TopicName.get(topic.getPartitionedTopicName()))
1419-
.thenCompose(metadata -> {
1420-
if (metadata.partitions > 0) {
1421-
return CompletableFuture.completedFuture(true);
1422-
}
1423-
1424-
if (topic.isPersistent()) {
1425-
return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic);
1426-
} else {
1427-
// The non-partitioned non-persistent topic only exist in the broker topics.
1428-
CompletableFuture<Optional<Topic>> nonPersistentTopicFuture =
1429-
pulsar.getBrokerService().getTopics().get(topic.toString());
1430-
if (nonPersistentTopicFuture == null) {
1454+
// Forward to the owner broker.
1455+
PulsarClientImpl pulsarClient;
1456+
try {
1457+
pulsarClient = (PulsarClientImpl) pulsar.getClient();
1458+
} catch (Exception ex) {
1459+
// This error will never occur.
1460+
log.error("{} Failed to get partition metadata due to create internal admin client fails", topic, ex);
1461+
return FutureUtil.failedFuture(ex);
1462+
}
1463+
LookupOptions lookupOptions = LookupOptions.builder().readOnly(false).authoritative(true).build();
1464+
return getBrokerServiceUrlAsync(TopicName.get(topic), lookupOptions)
1465+
.thenCompose(lookupResult -> {
1466+
if (!lookupResult.isPresent()) {
1467+
log.error("{} Failed to get partition metadata due can not find the owner broker", topic);
1468+
return FutureUtil.failedFuture(new ServiceUnitNotReadyException(
1469+
"No broker was available to own " + topicName));
1470+
}
1471+
return pulsarClient.getLookup(lookupResult.get().getLookupData().getBrokerUrl())
1472+
.getPartitionedTopicMetadata(topicName, false)
1473+
.thenApply(metadata -> true)
1474+
.exceptionallyCompose(ex -> {
1475+
Throwable actEx = FutureUtil.unwrapCompletionException(ex);
1476+
if (actEx instanceof PulsarClientException.NotFoundException
1477+
|| actEx instanceof PulsarClientException.TopicDoesNotExistException
1478+
|| actEx instanceof PulsarAdminException.NotFoundException) {
14311479
return CompletableFuture.completedFuture(false);
14321480
} else {
1433-
return nonPersistentTopicFuture.thenApply(Optional::isPresent);
1481+
log.error("{} Failed to get partition metadata due to redirecting fails", topic, ex);
1482+
return CompletableFuture.failedFuture(ex);
14341483
}
1435-
}
1436-
});
1484+
});
1485+
});
14371486
});
14381487
}
14391488

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.broker.namespace;
20+
21+
import io.netty.util.Recycler;
22+
import lombok.Getter;
23+
import org.apache.pulsar.common.policies.data.TopicType;
24+
25+
public class TopicExistsInfo {
26+
27+
private static final Recycler<TopicExistsInfo> RECYCLER = new Recycler<>() {
28+
@Override
29+
protected TopicExistsInfo newObject(Handle<TopicExistsInfo> handle) {
30+
return new TopicExistsInfo(handle);
31+
}
32+
};
33+
34+
private static TopicExistsInfo nonPartitionedExists = new TopicExistsInfo(true, 0);
35+
36+
private static TopicExistsInfo notExists = new TopicExistsInfo(false, 0);
37+
38+
public static TopicExistsInfo newPartitionedTopicExists(Integer partitions){
39+
TopicExistsInfo info = RECYCLER.get();
40+
info.exists = true;
41+
info.partitions = partitions.intValue();
42+
return info;
43+
}
44+
45+
public static TopicExistsInfo newNonPartitionedTopicExists(){
46+
return nonPartitionedExists;
47+
}
48+
49+
public static TopicExistsInfo newTopicNotExists(){
50+
return notExists;
51+
}
52+
53+
private final Recycler.Handle<TopicExistsInfo> handle;
54+
55+
@Getter
56+
private int partitions;
57+
@Getter
58+
private boolean exists;
59+
60+
private TopicExistsInfo(Recycler.Handle<TopicExistsInfo> handle) {
61+
this.handle = handle;
62+
}
63+
64+
private TopicExistsInfo(boolean exists, int partitions) {
65+
this.handle = null;
66+
this.partitions = partitions;
67+
this.exists = exists;
68+
}
69+
70+
public void recycle() {
71+
if (this == notExists || this == nonPartitionedExists || this.handle == null) {
72+
return;
73+
}
74+
this.exists = false;
75+
this.partitions = 0;
76+
this.handle.recycle(this);
77+
}
78+
79+
public TopicType getTopicType() {
80+
return this.partitions > 0 ? TopicType.PARTITIONED : TopicType.NON_PARTITIONED;
81+
}
82+
}

0 commit comments

Comments
 (0)