Skip to content

Conversation

@vinkal-chudgar
Copy link
Contributor

@vinkal-chudgar vinkal-chudgar commented Nov 10, 2025

Fixes #24964

Motivation

BinaryProtoLookupService#getTopicsUnderNamespace currently does not deduplicate concurrent requests with identical parameters (namespace, mode, topicsPattern, topicsHash). For each call, it creates a new CompletableFuture and immediately invokes the helper to obtain a connection and send a request, which can cause request amplification when multiple components query the same namespace simultaneously.

Current behavior in code

  • BinaryProtoLookupService#getTopicsUnderNamespace allocates a new future and invokes the private helper unconditionally (no shared in-flight future).
  • Sibling methods, in the same class, already deduplicate:
    • BinaryProtoLookupService#getBroker uses lookupInProgress.computeIfAbsent(...) with removal in whenComplete(...).
    • BinaryProtoLookupService#getPartitionedTopicMetadata uses partitionedMetadataInProgress.computeIfAbsent(...) with removal in whenComplete(...).

This inconsistency means BinaryProtoLookupService#getTopicsUnderNamespace can issue duplicate network requests for identical inputs, increasing broker load, while BinaryProtoLookupService#getBroker and BinaryProtoLookupService#getPartitionedTopicMetadata already deduplicate.

Real impact scenarios in the current codebase

  • Pattern consumers recheck together.
    PatternMultiTopicsConsumerImpl calls client.getLookup().getTopicsUnderNamespace(...) during periodic rechecks. When multiple pattern consumers recheck at the same time, they make concurrent calls with identical (namespace, mode, topicsPattern, topicsHash). Without deduplication, each call sends a separate request to the broker

  • Several pattern consumers start at once.
    During initial subscribe, PulsarClientImpl#patternTopicSubscribeAsync also calls BinaryProtoLookupService#getTopicsUnderNamespace. Creating several pattern consumers around the same time leads to multiple concurrent calls with identical parameters (namespace, mode, topicsPattern, topicsHash); without deduplication, each call becomes a separate broker request.

Effect

  • Request amplification: multiple identical lookups to the broker.
  • Increased broker load: unnecessary processing of duplicate requests.
  • Client overhead: duplicate I/O and response handling.
  • Inconsistency: behavior diverges from BinaryProtoLookupService#getBroker and BinaryProtoLookupService#getPartitionedTopicMetadata, which already coalesce identical in-flight requests.

What this PR changes
This PR adds deduplication to BinaryProtoLookupService#getTopicsUnderNamespace. Concurrent calls with the same (namespace, mode, topicsPattern, topicsHash) now share one CompletableFuture and one network request, reducing duplicate broker traffic and client work. The implementation follows the same pattern already used by BinaryProtoLookupService#getBroker and BinaryProtoLookupService#getPartitionedTopicMetadata (coalesce via computeIfAbsent, remove on whenComplete), keeping retry/backoff logic unchanged.

Modifications

  • Added deduplication map

    private final ConcurrentHashMap<TopicsUnderNamespaceKey, CompletableFuture<GetTopicsResult>>
              topicsUnderNamespaceInProgress = new ConcurrentHashMap<>();
    
  • Implemented composite key class
    Added TopicsUnderNamespaceKey (static inner class) to uniquely identify requests by (namespace, mode, topicsPattern, topicsHash)

  • Refactored BinaryProtoLookupService#getTopicsUnderNamespace method:
    BinaryProtoLookupService#getTopicsUnderNamespace now:

    • Uses ConcurrentHashMap.computeIfAbsent(...) to atomically coalesce identical in-flight requests and return the same CompletableFuture.
    • Registers whenComplete(...) on the shared future to remove the map entry after completion (success or failure), matching the pattern used by BinaryProtoLookupService#getBroker and BinaryProtoLookupService#getPartitionedTopicMetadata.
  • Added unit tests in BinaryProtoLookupServiceTest

Verifying this change

  • Make sure that the change passes the CI checks.

Additional validation:

  • BinaryProtoLookupServiceTest#testGetTopicsUnderNamespaceDeduplication
    Verifies that two in-flight calls with the same (namespace, mode, topicsPattern, topicsHash) return the same CompletableFuture and only one call is made to the connection pool; after the shared future completes, a new call with the same parameters returns a new CompletableFuture and makes one more call to the connection pool.

  • BinaryProtoLookupServiceTest#testGetTopicsUnderNamespaceDeduplicationDifferentHash
    Verifies that calls with different topicsHash values are not combined: each returns a different CompletableFuture and makes a separate call to the connection pool. Cleanup is per key; completing one future does not affect the other in-flight call.

(Tests are deterministic: no sleeps or timing assumptions; they do not execute the network request path)

Personal CI Results

Tested in Personal CI fork: vinkal-chudgar#4

Status: All checks have passed (50 successful checks, 2 skipped)

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: vinkal-chudgar#4

…ookupService

Deduplicate concurrent getTopicsUnderNamespace requests with identical parameters(namespace, mode, topicsPattern, topicsHash) into a single CompletableFuture to reduce duplicate broker lookups.
Follows existing deduplication pattern used by getBroker() and getPartitionedTopicMetadata().

Add unit tests verifying deduplication behavior and cleanup after completion.

Signed-off-by: Vinkal Chudgar <vinkal.chudgar@gmail.com>
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Nov 10, 2025
@lhotari
Copy link
Member

lhotari commented Nov 10, 2025

BinaryProtoLookupService#getTopicsUnderNamespace currently does not deduplicate concurrent requests with identical parameters (namespace, mode, topicsPattern, topicsHash). For each call, it creates a new CompletableFuture and immediately invokes the helper to obtain a connection and send a request, which can cause request amplification when multiple components query the same namespace simultaneously.

@vinkal-chudgar This change itself is reasonable, but I'm just wondering if this PR is addressing a real problem. How common would it be to have multiple consumers with exactly the same topics pattern using the same Pulsar client instance?
Do you have this use case your self?

@lhotari
Copy link
Member

lhotari commented Nov 10, 2025

@vinkal-chudgar Would you be interested in handling #24963. I noticed that bug while looking into the details.

@vinkal-chudgar
Copy link
Contributor Author

vinkal-chudgar commented Nov 10, 2025

@vinkal-chudgar Would you be interested in handling #24963. I noticed that bug while looking into the details.

Sure @lhotari . could you please assign #24963 to me?

@vinkal-chudgar vinkal-chudgar marked this pull request as ready for review November 10, 2025 14:35
@vinkal-chudgar
Copy link
Contributor Author

vinkal-chudgar commented Nov 10, 2025

BinaryProtoLookupService#getTopicsUnderNamespace currently does not deduplicate concurrent requests with identical parameters (namespace, mode, topicsPattern, topicsHash). For each call, it creates a new CompletableFuture and immediately invokes the helper to obtain a connection and send a request, which can cause request amplification when multiple components query the same namespace simultaneously.

@vinkal-chudgar This change itself is reasonable, but I'm just wondering if this PR is addressing a real problem. How common would it be to have multiple consumers with exactly the same topics pattern using the same Pulsar client instance? Do you have this use case your self?

@lhotari - To the best of my knowledge, yes, this PR addresses a real case that occurs within a single PulsarClient instance. Please correct me if you think otherwise.

1. Is this a real problem?

The client currently creates a new future and starts a new request for every call to getTopicsUnderNamespace(...) even when the inputs are identical. In contrast, getBroker(...) and getPartitionedTopicMetadata(...) already coalesce identical in-flight calls. Pattern topic discovery uses getTopicsUnderNamespace(...) both during initial subscribe and during periodic rechecks, so identical calls can happen at the same time within the same PulsarClient / BinaryProtoLookupService instance. In that window the client issues duplicate requests that this PR removes.

2. How common is “same pattern, same client”?

It depends on the application, but it is not unusual. Pulsar encourages reusing one PulsarClient per process. It is common to have more than one pattern consumer in the same process for the same namespace and regex but with different subscriptions or modules. There are two concrete moments where identical calls line up within the same PulsarClient / BinaryProtoLookupService instance:

  • Initial subscribe with a topics pattern: PulsarClientImpl#subscribeAsync(...) routes to patternTopicSubscribeAsync(...) when topicsPattern is set. That method calls lookup.getTopicsUnderNamespace(namespaceName, subscriptionMode, regex, null) to seed the topic set.

  • Periodic rechecks for pattern subscriptions: PatternMultiTopicsConsumerImpl also calls lookup.getTopicsUnderNamespace(...) during its scheduled rechecks. When multiple pattern consumers were created around the same time and share the same (namespace, mode, topicsPattern, topicsHash), these rechecks can line up and issue the same call concurrently.

Even if this is not constant traffic, when it happens it amplifies requests. The fix is small and keeps behavior consistent with the other two lookup paths.

3. Do I have this use case myself?

No, I do not have a personal production deployment that hits this case. I did see it surface in CI: while working on unrelated changes, a CI run failed on PatternConsumerBackPressureTest.testInfiniteGetThousandsTopics about 7 to 8 days ago. That test drives many concurrent getTopicsUnderNamespace(...) calls on the same client with the same inputs. It highlighted that this path does not coalesce identical in-flight calls today.

P.S. I have created Issue #24964 with all the required details

@lhotari
Copy link
Member

lhotari commented Nov 11, 2025

@lhotari - To the best of my knowledge, yes, this PR addresses a real case that occurs within a single PulsarClient instance. Please correct me if you think otherwise.

@vinkal-chudgar I was interested to hear what the possible real world impact could have been if you faced this in production. I do agree that it makes sense to make this change to be consistent.

@codecov-commenter
Copy link

Codecov Report

❌ Patch coverage is 76.47059% with 8 lines in your changes missing coverage. Please review.
✅ Project coverage is 74.32%. Comparing base (f29ca21) to head (b49c913).
⚠️ Report is 5 commits behind head on master.

Files with missing lines Patch % Lines
...e/pulsar/client/impl/BinaryProtoLookupService.java 76.47% 3 Missing and 5 partials ⚠️
Additional details and impacted files

Impacted file tree graph

@@              Coverage Diff              @@
##             master   #24962       +/-   ##
=============================================
+ Coverage     38.69%   74.32%   +35.62%     
- Complexity    13362    34046    +20684     
=============================================
  Files          1863     1920       +57     
  Lines        145975   150178     +4203     
  Branches      16928    17424      +496     
=============================================
+ Hits          56487   111620    +55133     
+ Misses        81858    29695    -52163     
- Partials       7630     8863     +1233     
Flag Coverage Δ
inttests 26.31% <2.94%> (+0.02%) ⬆️
systests 22.89% <70.58%> (+0.03%) ⬆️
unittests 73.84% <76.47%> (+38.85%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...e/pulsar/client/impl/BinaryProtoLookupService.java 78.40% <76.47%> (+7.42%) ⬆️

... and 1416 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@lhotari lhotari added this to the 4.2.0 milestone Nov 11, 2025
@lhotari lhotari merged commit 1902735 into apache:master Nov 11, 2025
53 of 54 checks passed
lhotari pushed a commit that referenced this pull request Nov 11, 2025
…ookupService (#24962)

Signed-off-by: Vinkal Chudgar <vinkal.chudgar@gmail.com>
(cherry picked from commit 1902735)
lhotari pushed a commit that referenced this pull request Nov 11, 2025
…ookupService (#24962)

Signed-off-by: Vinkal Chudgar <vinkal.chudgar@gmail.com>
(cherry picked from commit 1902735)
manas-ctds pushed a commit to datastax/pulsar that referenced this pull request Nov 13, 2025
…ookupService (apache#24962)

Signed-off-by: Vinkal Chudgar <vinkal.chudgar@gmail.com>
(cherry picked from commit 1902735)
(cherry picked from commit d90da78)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Nov 14, 2025
…ookupService (apache#24962)

Signed-off-by: Vinkal Chudgar <vinkal.chudgar@gmail.com>
(cherry picked from commit 1902735)
(cherry picked from commit d90da78)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[improve][client] Add deduplication for concurrent getTopicsUnderNamespace requests to prevent request amplification

3 participants