-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[feat][pip] PIP-448: Topic-level Delayed Message Tracker for Memory Optimization #24928
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces PIP-448, a proposal to optimize delayed message tracking in Apache Pulsar by implementing a topic-level shared tracker instead of per-subscription trackers. The proposal aims to significantly reduce memory consumption for topics with multiple subscriptions.
Key changes:
- Introduces a new topic-level
InMemoryTopicDelayedDeliveryTrackerManagerthat stores delayed message metadata once per topic rather than once per subscription - Proposes lightweight
InMemoryTopicDelayedDeliveryTrackerViewproxies for each subscription that delegate to the shared manager - Reduces memory complexity from O(num_delayed_messages * num_subscriptions) to O(num_delayed_messages)
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
|
I think this is a good improvement, but based on this improvement, is there a test or benchmark that can visually demonstrate the results? |
|
I recently made some simple adjustments to OpenMessaging Benchmark to support the delayed message feature, while testing many PRs related to the delayed message module. For this PR, we must first monitor memory, but the most important thing is to test whether a problem with one subscription under a large number of subscriptions affects other subscriptions. Any good suggestions for the result comparison part? Which indicators should we visualize? |
|
|
||
| # High Level Design | ||
|
|
||
| The core idea is to introduce a new, opt-in `DelayedDeliveryTrackerFactory` that implements a shared, topic-level tracking strategy. This is achieved with two new components: a `TopicDelayedDeliveryTrackerManager` and a subscription-scoped `InMemoryTopicDelayedDeliveryTracker`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not reference DelayedDeliveryTracker at the interface level, which would also apply to BucketDelayedDeliveryTracker?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What you mean is why not to implement DelayedDeliveryTracker directly, but to add a TopicDelayedDeliveryTrackerManager?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I see,you have designed a new data structure for the InMemoryTopicDelayedDeliveryTrackerManager, unable to adapt to other DelayedDeliveryTracker.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes.
At the code level, the new TopicDelayedDeliveryTrackerManager already exposes createOrGetTracker(AbstractPersistentDispatcherMultipleConsumers) returning the DelayedDeliveryTracker interface:
public interface TopicDelayedDeliveryTrackerManager extends AutoCloseable {
DelayedDeliveryTracker createOrGetTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher);
// ...
}so the dispatcher only depends on the DelayedDeliveryTracker interface and not on the concrete in-memory implementation. The InMemoryTopicDelayedDeliveryTracker is just one implementation that acts as a proxy to the shared topic-level manager in this PIP.
In the PIP text I focused on the in-memory implementation (e.g. InMemoryTopicDelayedDeliveryTracker*) because this proposal explicitly keeps BucketDelayedDeliveryTracker out of scope (see the “Out of Scope” section). The goal of PIP-448 is to address the memory footprint of the legacy in-memory tracker first, without changing any semantics of the persistent, bucket-based tracker.
|
|
||
| ### `InMemoryTopicDelayedDeliveryTracker` (New Class) | ||
| This class implements the `DelayedDeliveryTracker` interface for a single subscription. | ||
| * **Role**: Acts as a lightweight proxy, forwarding all operations (e.g., `addMessage`, `getScheduledMessages`) to the shared `InMemoryTopicDelayedDeliveryTrackerManager`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Whether it's possible to consider using a special cursor to replay delayed messages to build shared delayed message tracker, instead of relying on the subscription-level DelayedDeliveryTracker to forward messages?
We can check and poll the scheduled message IDs from the shared delayed message tracker to add to the replay queue (MessageRedeliveryController) of all subscriptions.
Main Issue: #24600
Motivation
The primary motivation for this proposal is to address the high memory consumption caused by the current per-subscription delayed message tracking mechanism. For topics with hundreds or thousands of subscriptions, the memory footprint for delayed messages becomes prohibitively large. Each delayed message's position is duplicated across every subscription's tracker, leading to a memory usage pattern of
O(num_delayed_messages * num_subscriptions).This excessive memory usage can cause:
By optimizing the delayed message tracking to be more memory-efficient, we can enhance broker stability and scalability, allowing Pulsar to better support these critical use cases.
Documentation
docdoc-requireddoc-not-neededdoc-complete