-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Search before reporting
- I searched in the issues and found nothing similar.
Motivation
Problem Statement
BinaryProtoLookupService#getTopicsUnderNamespace does not deduplicate concurrent requests with identical parameters (namespace, mode, topicsPattern, topicsHash). Each call creates a new CompletableFuture and immediately invokes the private helper to get a connection and send a request. When multiple callers ask for the same namespace at the same time, this causes request amplification and unnecessary broker load.
In the same class, sibling methods do deduplicate:
BinaryProtoLookupService#getBrokeruseslookupInProgress.computeIfAbsent(...)with removal inwhenComplete(...).BinaryProtoLookupService#getPartitionedTopicMetadatausespartitionedMetadataInProgress.computeIfAbsent(...)with removal inwhenComplete(...).
This inconsistency means BinaryProtoLookupService#getTopicsUnderNamespace can issue duplicate network requests for identical inputs, increasing broker load, while BinaryProtoLookupService#getBroker and BinaryProtoLookupService#getPartitionedTopicMetadata already deduplicate.
This is a performance improvement and consistency change, not a functional bug. It brings getTopicsUnderNamespace to the same coalescing pattern as BinaryProtoLookupService#getBroker and BinaryProtoLookupService#getPartitionedTopicMetadata
Current Behavior
When multiple threads or components request topics under the same namespace with identical parameters (namespace, mode, pattern, hash), each call results in a separate network request to the broker:
pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
Lines 395 to 411 in 0896c0a
| @Override | |
| public CompletableFuture<GetTopicsResult> getTopicsUnderNamespace(NamespaceName namespace, | |
| Mode mode, | |
| String topicsPattern, | |
| String topicsHash) { | |
| CompletableFuture<GetTopicsResult> topicsFuture = new CompletableFuture<>(); | |
| AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs()); | |
| Backoff backoff = new BackoffBuilder() | |
| .setInitialTime(100, TimeUnit.MILLISECONDS) | |
| .setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS) | |
| .setMax(1, TimeUnit.MINUTES) | |
| .create(); | |
| getTopicsUnderNamespace(namespace, backoff, opTimeoutMs, topicsFuture, mode, | |
| topicsPattern, topicsHash); | |
| return topicsFuture; | |
| } |
Expected Behavior
Concurrent requests with identical parameters should be deduplicated, sharing a single network request and result, consistent with the patterns used in BinaryProtoLookupService#getBroker and BinaryProtoLookupService#getPartitionedTopicMetadata.
Impact Analysis
Request amplification
Within the same PulsarClient / BinaryProtoLookupService instance, pattern subscriptions call client.getLookup().getTopicsUnderNamespace(...) (e.g., PatternMultiTopicsConsumerImpl and during initial subscribe in PulsarClientImpl#patternTopicSubscribeAsync). When multiple pattern consumers perform periodic rechecks or start at the same time, they can issue concurrent calls with the same (namespace, mode, topicsPattern, topicsHash). Without deduplication, each call sends a separate request to the broker.
Broker load
Each duplicate call causes extra work on the broker for the same data.
Client resource waste
Duplicate network I/O and duplicate response handling on the client side; extra CompletableFuture instances are created for identical requests.
Inconsistency within the same class
BinaryProtoLookupService#getBroker and BinaryProtoLookupService#getPartitionedTopicMetadata already coalesce identical in-flight calls; BinaryProtoLookupService#getTopicsUnderNamespace currently does not.
Evidence from Codebase
The deduplication pattern is already implemented and proven effective in sibling methods: BinaryProtoLookupService#getBroker and BinaryProtoLookupService#getPartitionedTopicMetadata
-
BinaryProtoLookupService#getBrokerpulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
Lines 69 to 70 in 0896c0a
private final ConcurrentHashMap<Pair<TopicName, Map<String, String>>, CompletableFuture<LookupTopicResult>> lookupInProgress = new ConcurrentHashMap<>(); pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
Lines 155 to 181 in 0896c0a
public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName) { long startTime = System.nanoTime(); final MutableObject<CompletableFuture> newFutureCreated = new MutableObject<>(); final Pair<TopicName, Map<String, String>> key = Pair.of(topicName, client.getConfiguration().getLookupProperties()); try { return lookupInProgress.computeIfAbsent(key, tpName -> { CompletableFuture<LookupTopicResult> newFuture = findBroker(serviceNameResolver.resolveHost(), false, topicName, 0, key.getRight()); newFutureCreated.setValue(newFuture); newFuture.thenRun(() -> { histoGetBroker.recordSuccess(System.nanoTime() - startTime); }).exceptionally(x -> { histoGetBroker.recordFailure(System.nanoTime() - startTime); return null; }); return newFuture; }); } finally { if (newFutureCreated.getValue() != null) { newFutureCreated.getValue().whenComplete((v, ex) -> { lookupInProgress.remove(key, newFutureCreated.getValue()); }); } } } -
BinaryProtoLookupService#getPartitionedTopicMetadatapulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
Lines 72 to 73 in 0896c0a
private final ConcurrentHashMap<TopicName, CompletableFuture<PartitionedTopicMetadata>> partitionedMetadataInProgress = new ConcurrentHashMap<>(); pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/BinaryProtoLookupService.java
Lines 188 to 206 in 0896c0a
public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata( TopicName topicName, boolean metadataAutoCreationEnabled, boolean useFallbackForNonPIP344Brokers) { final MutableObject<CompletableFuture> newFutureCreated = new MutableObject<>(); try { return partitionedMetadataInProgress.computeIfAbsent(topicName, tpName -> { CompletableFuture<PartitionedTopicMetadata> newFuture = getPartitionedTopicMetadataAsync( topicName, metadataAutoCreationEnabled, useFallbackForNonPIP344Brokers); newFutureCreated.setValue(newFuture); return newFuture; }); } finally { if (newFutureCreated.getValue() != null) { newFutureCreated.getValue().whenComplete((v, ex) -> { partitionedMetadataInProgress.remove(topicName, newFutureCreated.getValue()); }); } } }
Solution
No response
Alternatives
No response
Anything else?
No response
Are you willing to submit a PR?
- I'm willing to submit a PR!