Skip to content

[improve][client] Add deduplication for concurrent getTopicsUnderNamespace requests to prevent request amplification #24964

@vinkal-chudgar

Description

@vinkal-chudgar

Search before reporting

  • I searched in the issues and found nothing similar.

Motivation

Problem Statement

BinaryProtoLookupService#getTopicsUnderNamespace does not deduplicate concurrent requests with identical parameters (namespace, mode, topicsPattern, topicsHash). Each call creates a new CompletableFuture and immediately invokes the private helper to get a connection and send a request. When multiple callers ask for the same namespace at the same time, this causes request amplification and unnecessary broker load.

In the same class, sibling methods do deduplicate:

  • BinaryProtoLookupService#getBroker uses lookupInProgress.computeIfAbsent(...) with removal in whenComplete(...).
  • BinaryProtoLookupService#getPartitionedTopicMetadata uses partitionedMetadataInProgress.computeIfAbsent(...) with removal in whenComplete(...).

This inconsistency means BinaryProtoLookupService#getTopicsUnderNamespace can issue duplicate network requests for identical inputs, increasing broker load, while BinaryProtoLookupService#getBroker and BinaryProtoLookupService#getPartitionedTopicMetadata already deduplicate.

This is a performance improvement and consistency change, not a functional bug. It brings getTopicsUnderNamespace to the same coalescing pattern as BinaryProtoLookupService#getBroker and BinaryProtoLookupService#getPartitionedTopicMetadata

Current Behavior

When multiple threads or components request topics under the same namespace with identical parameters (namespace, mode, pattern, hash), each call results in a separate network request to the broker:

@Override
public CompletableFuture<GetTopicsResult> getTopicsUnderNamespace(NamespaceName namespace,
Mode mode,
String topicsPattern,
String topicsHash) {
CompletableFuture<GetTopicsResult> topicsFuture = new CompletableFuture<>();
AtomicLong opTimeoutMs = new AtomicLong(client.getConfiguration().getOperationTimeoutMs());
Backoff backoff = new BackoffBuilder()
.setInitialTime(100, TimeUnit.MILLISECONDS)
.setMandatoryStop(opTimeoutMs.get() * 2, TimeUnit.MILLISECONDS)
.setMax(1, TimeUnit.MINUTES)
.create();
getTopicsUnderNamespace(namespace, backoff, opTimeoutMs, topicsFuture, mode,
topicsPattern, topicsHash);
return topicsFuture;
}

Expected Behavior

Concurrent requests with identical parameters should be deduplicated, sharing a single network request and result, consistent with the patterns used in BinaryProtoLookupService#getBroker and BinaryProtoLookupService#getPartitionedTopicMetadata.

Impact Analysis

Request amplification
Within the same PulsarClient / BinaryProtoLookupService instance, pattern subscriptions call client.getLookup().getTopicsUnderNamespace(...) (e.g., PatternMultiTopicsConsumerImpl and during initial subscribe in PulsarClientImpl#patternTopicSubscribeAsync). When multiple pattern consumers perform periodic rechecks or start at the same time, they can issue concurrent calls with the same (namespace, mode, topicsPattern, topicsHash). Without deduplication, each call sends a separate request to the broker.

Broker load
Each duplicate call causes extra work on the broker for the same data.

Client resource waste
Duplicate network I/O and duplicate response handling on the client side; extra CompletableFuture instances are created for identical requests.

Inconsistency within the same class
BinaryProtoLookupService#getBroker and BinaryProtoLookupService#getPartitionedTopicMetadata already coalesce identical in-flight calls; BinaryProtoLookupService#getTopicsUnderNamespace currently does not.

Evidence from Codebase

The deduplication pattern is already implemented and proven effective in sibling methods: BinaryProtoLookupService#getBroker and BinaryProtoLookupService#getPartitionedTopicMetadata

  1. BinaryProtoLookupService#getBroker

    private final ConcurrentHashMap<Pair<TopicName, Map<String, String>>, CompletableFuture<LookupTopicResult>>
    lookupInProgress = new ConcurrentHashMap<>();

    public CompletableFuture<LookupTopicResult> getBroker(TopicName topicName) {
    long startTime = System.nanoTime();
    final MutableObject<CompletableFuture> newFutureCreated = new MutableObject<>();
    final Pair<TopicName, Map<String, String>> key = Pair.of(topicName,
    client.getConfiguration().getLookupProperties());
    try {
    return lookupInProgress.computeIfAbsent(key, tpName -> {
    CompletableFuture<LookupTopicResult> newFuture = findBroker(serviceNameResolver.resolveHost(), false,
    topicName, 0, key.getRight());
    newFutureCreated.setValue(newFuture);
    newFuture.thenRun(() -> {
    histoGetBroker.recordSuccess(System.nanoTime() - startTime);
    }).exceptionally(x -> {
    histoGetBroker.recordFailure(System.nanoTime() - startTime);
    return null;
    });
    return newFuture;
    });
    } finally {
    if (newFutureCreated.getValue() != null) {
    newFutureCreated.getValue().whenComplete((v, ex) -> {
    lookupInProgress.remove(key, newFutureCreated.getValue());
    });
    }
    }
    }

  2. BinaryProtoLookupService#getPartitionedTopicMetadata

    private final ConcurrentHashMap<TopicName, CompletableFuture<PartitionedTopicMetadata>>
    partitionedMetadataInProgress = new ConcurrentHashMap<>();

    public CompletableFuture<PartitionedTopicMetadata> getPartitionedTopicMetadata(
    TopicName topicName, boolean metadataAutoCreationEnabled, boolean useFallbackForNonPIP344Brokers) {
    final MutableObject<CompletableFuture> newFutureCreated = new MutableObject<>();
    try {
    return partitionedMetadataInProgress.computeIfAbsent(topicName, tpName -> {
    CompletableFuture<PartitionedTopicMetadata> newFuture = getPartitionedTopicMetadataAsync(
    topicName, metadataAutoCreationEnabled,
    useFallbackForNonPIP344Brokers);
    newFutureCreated.setValue(newFuture);
    return newFuture;
    });
    } finally {
    if (newFutureCreated.getValue() != null) {
    newFutureCreated.getValue().whenComplete((v, ex) -> {
    partitionedMetadataInProgress.remove(topicName, newFutureCreated.getValue());
    });
    }
    }
    }

Solution

No response

Alternatives

No response

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    type/enhancementThe enhancements for the existing features or docs. e.g. reduce memory usage of the delayed messages

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions