|
51 | 51 | import java.util.regex.Pattern; |
52 | 52 | import java.util.stream.Collectors; |
53 | 53 | import javax.annotation.Nullable; |
| 54 | +import lombok.extern.slf4j.Slf4j; |
54 | 55 | import org.apache.commons.collections4.CollectionUtils; |
55 | 56 | import org.apache.commons.collections4.ListUtils; |
56 | 57 | import org.apache.commons.lang3.StringUtils; |
|
72 | 73 | import org.apache.pulsar.broker.stats.prometheus.metrics.Summary; |
73 | 74 | import org.apache.pulsar.broker.web.PulsarWebResource; |
74 | 75 | import org.apache.pulsar.client.admin.PulsarAdmin; |
| 76 | +import org.apache.pulsar.client.admin.PulsarAdminException; |
75 | 77 | import org.apache.pulsar.client.api.ClientBuilder; |
76 | 78 | import org.apache.pulsar.client.api.PulsarClient; |
77 | 79 | import org.apache.pulsar.client.api.PulsarClientException; |
|
123 | 125 | * |
124 | 126 | * @see org.apache.pulsar.broker.PulsarService |
125 | 127 | */ |
| 128 | +@Slf4j |
126 | 129 | public class NamespaceService implements AutoCloseable { |
127 | 130 | private static final Logger LOG = LoggerFactory.getLogger(NamespaceService.class); |
128 | 131 |
|
@@ -1400,40 +1403,86 @@ public CompletableFuture<List<String>> getOwnedTopicListForNamespaceBundle(Names |
1400 | 1403 | }); |
1401 | 1404 | } |
1402 | 1405 |
|
1403 | | - public CompletableFuture<Boolean> checkTopicExists(TopicName topic) { |
1404 | | - CompletableFuture<Boolean> future; |
1405 | | - // If the topic is persistent and the name includes `-partition-`, find the topic from the managed/ledger. |
1406 | | - if (topic.isPersistent() && topic.isPartitioned()) { |
1407 | | - future = pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic); |
| 1406 | + /*** |
| 1407 | + * Check topic exists( partitioned or non-partitioned ). |
| 1408 | + */ |
| 1409 | + public CompletableFuture<TopicExistsInfo> checkTopicExists(TopicName topic) { |
| 1410 | + return pulsar.getBrokerService() |
| 1411 | + .fetchPartitionedTopicMetadataAsync(TopicName.get(topic.toString())) |
| 1412 | + .thenCompose(metadata -> { |
| 1413 | + if (metadata.partitions > 0) { |
| 1414 | + return CompletableFuture.completedFuture( |
| 1415 | + TopicExistsInfo.newPartitionedTopicExists(metadata.partitions)); |
| 1416 | + } |
| 1417 | + return checkNonPartitionedTopicExists(topic) |
| 1418 | + .thenApply(b -> b ? TopicExistsInfo.newNonPartitionedTopicExists() |
| 1419 | + : TopicExistsInfo.newTopicNotExists()); |
| 1420 | + }); |
| 1421 | + } |
| 1422 | + |
| 1423 | + /*** |
| 1424 | + * Check non-partitioned topic exists. |
| 1425 | + */ |
| 1426 | + public CompletableFuture<Boolean> checkNonPartitionedTopicExists(TopicName topic) { |
| 1427 | + if (topic.isPersistent()) { |
| 1428 | + return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic); |
1408 | 1429 | } else { |
1409 | | - future = CompletableFuture.completedFuture(false); |
| 1430 | + return checkNonPersistentNonPartitionedTopicExists(topic.toString()); |
1410 | 1431 | } |
| 1432 | + } |
1411 | 1433 |
|
1412 | | - return future.thenCompose(found -> { |
1413 | | - if (found != null && found) { |
1414 | | - return CompletableFuture.completedFuture(true); |
| 1434 | + /** |
| 1435 | + * Regarding non-persistent topic, we do not know whether it exists or not. Redirect the request to the ownership |
| 1436 | + * broker of this topic. HTTP API has implemented the mechanism that redirect to ownership broker, so just call |
| 1437 | + * HTTP API here. |
| 1438 | + */ |
| 1439 | + public CompletableFuture<Boolean> checkNonPersistentNonPartitionedTopicExists(String topic) { |
| 1440 | + TopicName topicName = TopicName.get(topic); |
| 1441 | + // "non-partitioned & non-persistent" topics only exist on the owner broker. |
| 1442 | + return checkTopicOwnership(TopicName.get(topic)).thenCompose(isOwned -> { |
| 1443 | + // The current broker is the owner. |
| 1444 | + if (isOwned) { |
| 1445 | + CompletableFuture<Optional<Topic>> nonPersistentTopicFuture = pulsar.getBrokerService() |
| 1446 | + .getTopic(topic, false); |
| 1447 | + if (nonPersistentTopicFuture != null) { |
| 1448 | + return nonPersistentTopicFuture.thenApply(Optional::isPresent); |
| 1449 | + } else { |
| 1450 | + return CompletableFuture.completedFuture(false); |
| 1451 | + } |
1415 | 1452 | } |
1416 | 1453 |
|
1417 | | - return pulsar.getBrokerService() |
1418 | | - .fetchPartitionedTopicMetadataAsync(TopicName.get(topic.getPartitionedTopicName())) |
1419 | | - .thenCompose(metadata -> { |
1420 | | - if (metadata.partitions > 0) { |
1421 | | - return CompletableFuture.completedFuture(true); |
1422 | | - } |
1423 | | - |
1424 | | - if (topic.isPersistent()) { |
1425 | | - return pulsar.getPulsarResources().getTopicResources().persistentTopicExists(topic); |
1426 | | - } else { |
1427 | | - // The non-partitioned non-persistent topic only exist in the broker topics. |
1428 | | - CompletableFuture<Optional<Topic>> nonPersistentTopicFuture = |
1429 | | - pulsar.getBrokerService().getTopics().get(topic.toString()); |
1430 | | - if (nonPersistentTopicFuture == null) { |
| 1454 | + // Forward to the owner broker. |
| 1455 | + PulsarClientImpl pulsarClient; |
| 1456 | + try { |
| 1457 | + pulsarClient = (PulsarClientImpl) pulsar.getClient(); |
| 1458 | + } catch (Exception ex) { |
| 1459 | + // This error will never occur. |
| 1460 | + log.error("{} Failed to get partition metadata due to create internal admin client fails", topic, ex); |
| 1461 | + return FutureUtil.failedFuture(ex); |
| 1462 | + } |
| 1463 | + LookupOptions lookupOptions = LookupOptions.builder().readOnly(false).authoritative(true).build(); |
| 1464 | + return getBrokerServiceUrlAsync(TopicName.get(topic), lookupOptions) |
| 1465 | + .thenCompose(lookupResult -> { |
| 1466 | + if (!lookupResult.isPresent()) { |
| 1467 | + log.error("{} Failed to get partition metadata due can not find the owner broker", topic); |
| 1468 | + return FutureUtil.failedFuture(new ServiceUnitNotReadyException( |
| 1469 | + "No broker was available to own " + topicName)); |
| 1470 | + } |
| 1471 | + return pulsarClient.getLookup(lookupResult.get().getLookupData().getBrokerUrl()) |
| 1472 | + .getPartitionedTopicMetadata(topicName, false) |
| 1473 | + .thenApply(metadata -> true) |
| 1474 | + .exceptionallyCompose(ex -> { |
| 1475 | + Throwable actEx = FutureUtil.unwrapCompletionException(ex); |
| 1476 | + if (actEx instanceof PulsarClientException.NotFoundException |
| 1477 | + || actEx instanceof PulsarClientException.TopicDoesNotExistException |
| 1478 | + || actEx instanceof PulsarAdminException.NotFoundException) { |
1431 | 1479 | return CompletableFuture.completedFuture(false); |
1432 | 1480 | } else { |
1433 | | - return nonPersistentTopicFuture.thenApply(Optional::isPresent); |
| 1481 | + log.error("{} Failed to get partition metadata due to redirecting fails", topic, ex); |
| 1482 | + return CompletableFuture.failedFuture(ex); |
1434 | 1483 | } |
1435 | | - } |
1436 | | - }); |
| 1484 | + }); |
| 1485 | + }); |
1437 | 1486 | }); |
1438 | 1487 | } |
1439 | 1488 |
|
|
0 commit comments