-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[feat] PIP-442: Add memory limits for CommandGetTopicsOfNamespace #24833
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…fNamespace and CommandWatchTopicList
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #24833 +/- ##
============================================
+ Coverage 74.28% 74.32% +0.04%
- Complexity 33900 34019 +119
============================================
Files 1913 1920 +7
Lines 149510 150062 +552
Branches 17373 17404 +31
============================================
+ Hits 111060 111538 +478
- Misses 29586 29635 +49
- Partials 8864 8889 +25
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR implements PIP-442 to add memory limits for topic listing operations (CommandGetTopicsOfNamespace and CommandWatchTopicList). It introduces an asynchronous dual memory limiter that tracks separate limits for heap memory (topic list assembly) and direct memory (network buffers) to prevent broker/proxy memory exhaustion when handling large namespaces with thousands of topics.
- Introduces
AsyncSemaphoreandAsyncDualMemoryLimiterinterfaces with comprehensive implementations - Integrates memory limiting into broker and proxy topic listing operations with cancellation support
- Adds 6 new configuration parameters for heap/direct memory limits, timeouts, and queue sizes
- Exposes 26 new metrics (13 Prometheus + 13 OpenTelemetry) for monitoring memory usage and queue behavior
Reviewed Changes
Copilot reviewed 40 out of 40 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/ | Core semaphore and dual memory limiter implementations with utility methods |
| pulsar-broker/src/main/java/org/apache/pulsar/broker/service/ | Broker-side integration for topic listing memory control |
| pulsar-proxy/src/main/java/org/apache/pulsar/proxy/server/ | Proxy-side integration for topic listing memory control |
| pulsar-broker-common/src/main/java/org/apache/pulsar/broker/topiclistlimit/ | TopicListMemoryLimiter with Prometheus and OpenTelemetry metrics |
| conf/*.conf | Configuration files with new memory limiting parameters |
| Test files | Comprehensive unit and integration tests for the new functionality |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
...ker-common/src/main/java/org/apache/pulsar/broker/topiclistlimit/TopicListMemoryLimiter.java
Outdated
Show resolved
Hide resolved
...-common/src/test/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterUtilTest.java
Outdated
Show resolved
Hide resolved
...-common/src/test/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterImplTest.java
Outdated
Show resolved
Hide resolved
pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiterUtil.java
Outdated
Show resolved
Hide resolved
- if there's a request for the same id, just use it. The client should use unique ids for different watchers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
Copilot reviewed 39 out of 39 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
pulsar-common/src/main/java/org/apache/pulsar/common/semaphore/AsyncDualMemoryLimiter.java
Show resolved
Hide resolved
...ker-common/src/main/java/org/apache/pulsar/broker/topiclistlimit/TopicListMemoryLimiter.java
Show resolved
Hide resolved
|
Cherry-picked to branch-4.0. Mailing list discussion thread was https://lists.apache.org/thread/0n0nf2g7jm75lv3n8ymlxcvx7o2x4mnw |
…ache#24833) (cherry picked from commit 3aaf345) (cherry picked from commit 116155a)
…ache#24833) (cherry picked from commit 3aaf345) (cherry picked from commit 116155a)
…ache#24833) (cherry picked from commit 3aaf345) (cherry picked from commit 116155a)
…ache#24833) (cherry picked from commit 3aaf345) (cherry picked from commit 116155a)
…ache#24833) (cherry picked from commit 3aaf345) (cherry picked from commit 116155a)
PIP-442
Motivation
See PIP-442.
Apache Pulsar brokers currently lack memory limits for topic listing operations (
CommandGetTopicsOfNamespaceandCommandWatchTopicList), creating a significant gap in the broker's otherwise comprehensive memory management framework. This can lead to:While Pulsar has robust memory limits for message publishing (
maxMessagePublishBufferSizeInMB), managed ledger operations (managedLedgerMaxReadsInFlightSizeInMB,managedLedgerCacheSizeMB), and concurrent requests (maxConcurrentLookupRequest), topic listing operations remain unbounded.Modifications
This PR implements PIP-442 by introducing memory-aware flow control for topic listing operations:
Core Components
AsyncSemaphore Interface & Implementation
AsyncDualMemoryLimiter Interface & Implementation
AsyncDualMemoryLimiterUtil
TopicListMemoryLimiter
Integration Points
Broker Integration:
ServerCnx.handleGetTopicsOfNamespace()to acquire initial heap permits (1KB estimate), update to actual size after topic retrieval, then acquire direct memory permits for response serializationPulsarCommandSenderImplto acquire direct memory permits before serialization and release after write completionTopicListMemoryLimiterinBrokerServicewith configured limitsProxy Integration:
LookupProxyHandler.handleGetTopicsOfNamespace()with similar permit acquisition flowTopicListMemoryLimiterinProxyServicewith configured limitsConfiguration
Added six new configuration parameters to
broker.conf,proxy.conf, andstandalone.conf:Metrics
Added 12 new metrics per instance (broker/proxy) with both Prometheus and OpenTelemetry support:
Verifying this change
This change added tests and can be verified as follows:
Added
PatternConsumerBackPressureMultipleConsumersTest: Integration test that creates 8,192 partitioned topics and sends 500 concurrentgetTopicsOfNamespacerequests from 200 different client connections. The test intentionally reduces available direct memory to reproduce memory pressure scenarios and verifies all requests complete successfully with memory limiting in place.Added
ProxyPatternConsumerBackPressureMultipleConsumersTest: Extended version of the broker test that routes requests through the Pulsar Proxy to verify memory limiting works correctly in proxy scenarios.Added
AsyncSemaphoreImplTest: Comprehensive unit tests for the async semaphore implementation covering:Added
AsyncDualMemoryLimiterImplTest: Unit tests for the dual memory limiter covering:Added
AsyncDualMemoryLimiterUtilTest: Tests for utility methods covering:Verified metrics: Tests validate that all 26 metrics are properly registered and report correct values.
Documentation
docdoc-requireddoc-not-neededdoc-complete