Skip to content

Conversation

@lhotari
Copy link
Member

@lhotari lhotari commented Oct 9, 2025

PIP-442

Motivation

See PIP-442.
Apache Pulsar brokers currently lack memory limits for topic listing operations (CommandGetTopicsOfNamespace and CommandWatchTopicList), creating a significant gap in the broker's otherwise comprehensive memory management framework. This can lead to:

  • Broker/Proxy Memory Exhaustion: Large namespaces with thousands of topics can trigger OutOfMemoryErrors when multiple clients simultaneously request topic lists
  • Unpredictable Resource Usage: Operators cannot reliably predict or limit total memory consumption due to this unbounded allocation path
  • Performance Degradation: Large topic list operations cause GC pressure and latency spikes affecting all broker operations

While Pulsar has robust memory limits for message publishing (maxMessagePublishBufferSizeInMB), managed ledger operations (managedLedgerMaxReadsInFlightSizeInMB, managedLedgerCacheSizeMB), and concurrent requests (maxConcurrentLookupRequest), topic listing operations remain unbounded.

Modifications

This PR implements PIP-442 by introducing memory-aware flow control for topic listing operations:

Core Components

  1. AsyncSemaphore Interface & Implementation

    • Generic asynchronous semaphore abstraction with configurable timeouts and queue limits
    • Supports permit acquisition, updates, and cancellation
    • Tracks queue size and wait times for monitoring
  2. AsyncDualMemoryLimiter Interface & Implementation

    • Dual memory pool tracking for heap memory (topic list assembly) and direct memory (network buffers)
    • Separate limits and queues for each memory type
    • Helper methods for common patterns (acquire-execute-release)
  3. AsyncDualMemoryLimiterUtil

    • Utility methods for acquiring permits and writing responses
    • Handles serialization and network buffer management
    • Ensures permits are released after write completion or failure
  4. TopicListMemoryLimiter

    • Extends AsyncDualMemoryLimiterImpl with metrics integration
    • Exposes both Prometheus and OpenTelemetry metrics
    • Tracks memory usage, queue sizes, wait times, and timeouts

Integration Points

Broker Integration:

  • Modified ServerCnx.handleGetTopicsOfNamespace() to acquire initial heap permits (1KB estimate), update to actual size after topic retrieval, then acquire direct memory permits for response serialization
  • Modified PulsarCommandSenderImpl to acquire direct memory permits before serialization and release after write completion
  • Created TopicListMemoryLimiter in BrokerService with configured limits

Proxy Integration:

  • Modified LookupProxyHandler.handleGetTopicsOfNamespace() with similar permit acquisition flow
  • Handles both heap memory (for topic list from broker) and direct memory (for response to client)
  • Created TopicListMemoryLimiter in ProxyService with configured limits

Configuration

Added six new configuration parameters to broker.conf, proxy.conf, and standalone.conf:

maxTopicListInFlightHeapMemSizeMB=100
maxTopicListInFlightDirectMemSizeMB=100
maxTopicListInFlightHeapMemSizePermitsAcquireTimeoutMillis=25000
maxTopicListInFlightHeapMemSizePermitsAcquireQueueSize=10000
maxTopicListInFlightDirectMemSizePermitsAcquireTimeoutMillis=25000
maxTopicListInFlightDirectMemSizePermitsAcquireQueueSize=10000

Metrics

Added 12 new metrics per instance (broker/proxy) with both Prometheus and OpenTelemetry support:

  • Memory usage and limits (heap and direct)
  • Queue sizes and limits
  • Wait time summaries (P50, P95, P99, Max)
  • Timeout counters

Verifying this change

  • Make sure that the change passes the CI checks

This change added tests and can be verified as follows:

  • Added PatternConsumerBackPressureMultipleConsumersTest: Integration test that creates 8,192 partitioned topics and sends 500 concurrent getTopicsOfNamespace requests from 200 different client connections. The test intentionally reduces available direct memory to reproduce memory pressure scenarios and verifies all requests complete successfully with memory limiting in place.

  • Added ProxyPatternConsumerBackPressureMultipleConsumersTest: Extended version of the broker test that routes requests through the Pulsar Proxy to verify memory limiting works correctly in proxy scenarios.

  • Added AsyncSemaphoreImplTest: Comprehensive unit tests for the async semaphore implementation covering:

    • Basic acquire/release operations
    • Queueing behavior when permits unavailable
    • Timeout and queue full exceptions
    • Cancellation support
    • Permit updates (increase/decrease)
    • Concurrent operations
  • Added AsyncDualMemoryLimiterImplTest: Unit tests for the dual memory limiter covering:

    • Independent heap and direct memory limiting
    • Permit acquisition and updates for both types
    • Queue behavior and timeout handling
    • Concurrent operations with mixed memory types
    • Proper resource cleanup
  • Added AsyncDualMemoryLimiterUtilTest: Tests for utility methods covering:

    • Successful permit acquisition and response writing
    • Error handling for permit acquisition failures
    • Write failures and proper permit release
    • Cancellation scenarios
    • Concurrent operations
  • Verified metrics: Tests validate that all 26 metrics are properly registered and report correct values.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@lhotari lhotari added this to the 4.2.0 milestone Oct 9, 2025
@lhotari lhotari self-assigned this Oct 9, 2025
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Oct 9, 2025
@lhotari lhotari changed the title [feat][broker/proxy] PIP-442: Add memory limits for CommandGetTopicsOfNamespace/CommandWatchTopicList [feat] PIP-442: Add memory limits for CommandGetTopicsOfNamespace/CommandWatchTopicList Oct 9, 2025
@lhotari lhotari closed this Oct 9, 2025
@lhotari lhotari reopened this Oct 9, 2025
@codecov-commenter
Copy link

codecov-commenter commented Oct 9, 2025

Codecov Report

❌ Patch coverage is 87.76860% with 74 lines in your changes missing coverage. Please review.
✅ Project coverage is 74.32%. Comparing base (d9a36b2) to head (09cadc2).
⚠️ Report is 21 commits behind head on master.

Files with missing lines Patch % Lines
...apache/pulsar/proxy/server/LookupProxyHandler.java 45.45% 22 Missing and 2 partials ⚠️
...va/org/apache/pulsar/broker/service/ServerCnx.java 73.33% 14 Missing and 2 partials ⚠️
...he/pulsar/common/semaphore/AsyncSemaphoreImpl.java 92.10% 5 Missing and 7 partials ⚠️
...pache/pulsar/broker/admin/impl/NamespacesBase.java 76.47% 5 Missing and 3 partials ⚠️
.../broker/topiclistlimit/TopicListMemoryLimiter.java 95.67% 4 Missing and 3 partials ⚠️
...roker/topiclistlimit/TopicListSizeResultCache.java 84.00% 0 Missing and 4 partials ⚠️
...r/common/semaphore/AsyncDualMemoryLimiterImpl.java 95.34% 1 Missing and 1 partial ⚠️
...pulsar/broker/service/PulsarCommandSenderImpl.java 66.66% 0 Missing and 1 partial ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #24833      +/-   ##
============================================
+ Coverage     74.28%   74.32%   +0.04%     
- Complexity    33900    34019     +119     
============================================
  Files          1913     1920       +7     
  Lines        149510   150062     +552     
  Branches      17373    17404      +31     
============================================
+ Hits         111060   111538     +478     
- Misses        29586    29635      +49     
- Partials       8864     8889      +25     
Flag Coverage Δ
inttests 26.27% <31.57%> (-0.01%) ⬇️
systests 22.80% <52.39%> (+0.08%) ⬆️
unittests 73.85% <87.76%> (+0.04%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...org/apache/pulsar/broker/ServiceConfiguration.java 98.22% <100.00%> (+0.01%) ⬆️
.../org/apache/pulsar/broker/admin/v1/Namespaces.java 52.77% <100.00%> (ø)
.../org/apache/pulsar/broker/admin/v2/Namespaces.java 90.88% <100.00%> (ø)
...rg/apache/pulsar/broker/service/BrokerService.java 83.34% <100.00%> (-0.02%) ⬇️
...va/org/apache/pulsar/common/protocol/Commands.java 90.99% <100.00%> (-0.01%) ⬇️
...ulsar/common/semaphore/AsyncDualMemoryLimiter.java 100.00% <100.00%> (ø)
...r/common/semaphore/AsyncDualMemoryLimiterUtil.java 100.00% <100.00%> (ø)
...apache/pulsar/common/semaphore/AsyncSemaphore.java 100.00% <100.00%> (ø)
...apache/pulsar/proxy/server/ProxyConfiguration.java 95.67% <100.00%> (+0.16%) ⬆️
...a/org/apache/pulsar/proxy/server/ProxyService.java 80.52% <100.00%> (+0.36%) ⬆️
... and 8 more

... and 85 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@github-actions github-actions bot added the PIP label Oct 10, 2025
@lhotari lhotari requested a review from Copilot October 10, 2025 18:24
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements PIP-442 to add memory limits for topic listing operations (CommandGetTopicsOfNamespace and CommandWatchTopicList). It introduces an asynchronous dual memory limiter that tracks separate limits for heap memory (topic list assembly) and direct memory (network buffers) to prevent broker/proxy memory exhaustion when handling large namespaces with thousands of topics.

  • Introduces AsyncSemaphore and AsyncDualMemoryLimiter interfaces with comprehensive implementations
  • Integrates memory limiting into broker and proxy topic listing operations with cancellation support
  • Adds 6 new configuration parameters for heap/direct memory limits, timeouts, and queue sizes
  • Exposes 26 new metrics (13 Prometheus + 13 OpenTelemetry) for monitoring memory usage and queue behavior

Reviewed Changes

Copilot reviewed 40 out of 40 changed files in this pull request and generated 4 comments.

Show a summary per file
File Description
pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/ Core semaphore and dual memory limiter implementations with utility methods
pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ Broker-side integration for topic listing memory control
pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ Proxy-side integration for topic listing memory control
pulsar-broker-common/src/main/java/org/apache/pulsar/broker/topiclistlimit/ TopicListMemoryLimiter with Prometheus and OpenTelemetry metrics
conf/*.conf Configuration files with new memory limiting parameters
Test files Comprehensive unit and integration tests for the new functionality

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

@lhotari lhotari changed the title [feat] PIP-442: Add memory limits for CommandGetTopicsOfNamespace/CommandWatchTopicList [feat] PIP-442: Add memory limits for CommandGetTopicsOfNamespace Nov 8, 2025
@lhotari lhotari requested a review from Copilot November 8, 2025 11:11
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

Copilot reviewed 39 out of 39 changed files in this pull request and generated 2 comments.


💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@lhotari lhotari merged commit 3aaf345 into apache:master Nov 8, 2025
96 of 100 checks passed
lhotari added a commit that referenced this pull request Nov 11, 2025
lhotari added a commit that referenced this pull request Nov 28, 2025
@lhotari
Copy link
Member Author

lhotari commented Nov 28, 2025

Cherry-picked to branch-4.0. Mailing list discussion thread was https://lists.apache.org/thread/0n0nf2g7jm75lv3n8ymlxcvx7o2x4mnw

ganesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Dec 1, 2025
ganesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Dec 1, 2025
ganesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Dec 19, 2025
ganesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Dec 19, 2025
ganesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Dec 19, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants