Skip to content

Conversation

@Denovo1998
Copy link
Contributor

Main Issue: #24600

Motivation

The primary motivation for this proposal is to address the high memory consumption caused by the current per-subscription delayed message tracking mechanism. For topics with hundreds or thousands of subscriptions, the memory footprint for delayed messages becomes prohibitively large. Each delayed message's position is duplicated across every subscription's tracker, leading to a memory usage pattern of O(num_delayed_messages * num_subscriptions).

This excessive memory usage can cause:

  • Increased memory pressure on Pulsar brokers.
  • More frequent and longer Garbage Collection (GC) pauses, impacting broker performance.
  • Potential OutOfMemoryErrors, leading to broker instability.
  • Limited scalability for use cases that rely on many subscriptions per topic, such as IoT or large-scale microservices with shared subscriptions.

By optimizing the delayed message tracking to be more memory-efficient, we can enhance broker stability and scalability, allowing Pulsar to better support these critical use cases.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Copilot AI review requested due to automatic review settings November 1, 2025 12:41
@github-actions github-actions bot added PIP doc-not-needed Your PR changes do not impact docs labels Nov 1, 2025
Copy link

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR introduces PIP-448, a proposal to optimize delayed message tracking in Apache Pulsar by implementing a topic-level shared tracker instead of per-subscription trackers. The proposal aims to significantly reduce memory consumption for topics with multiple subscriptions.

Key changes:

  • Introduces a new topic-level InMemoryTopicDelayedDeliveryTrackerManager that stores delayed message metadata once per topic rather than once per subscription
  • Proposes lightweight InMemoryTopicDelayedDeliveryTrackerView proxies for each subscription that delegate to the shared manager
  • Reduces memory complexity from O(num_delayed_messages * num_subscriptions) to O(num_delayed_messages)

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

@Technoboy- Technoboy- added this to the 4.2.0 milestone Nov 6, 2025
@Technoboy- Technoboy- requested a review from coderzc November 14, 2025 08:28
@Technoboy-
Copy link
Contributor

I think this is a good improvement, but based on this improvement, is there a test or benchmark that can visually demonstrate the results?

@Denovo1998
Copy link
Contributor Author

@Denovo1998
Copy link
Contributor Author

I recently made some simple adjustments to OpenMessaging Benchmark to support the delayed message feature, while testing many PRs related to the delayed message module.

#24600 (comment)

For this PR, we must first monitor memory, but the most important thing is to test whether a problem with one subscription under a large number of subscriptions affects other subscriptions.

Any good suggestions for the result comparison part? Which indicators should we visualize?
@Technoboy- @codelipenghui @lhotari @coderzc


# High Level Design

The core idea is to introduce a new, opt-in `DelayedDeliveryTrackerFactory` that implements a shared, topic-level tracking strategy. This is achieved with two new components: a `TopicDelayedDeliveryTrackerManager` and a subscription-scoped `InMemoryTopicDelayedDeliveryTracker`.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not reference DelayedDeliveryTracker at the interface level, which would also apply to BucketDelayedDeliveryTracker?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What you mean is why not to implement DelayedDeliveryTracker directly, but to add a TopicDelayedDeliveryTrackerManager?

Copy link
Member

@coderzc coderzc Dec 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see,you have designed a new data structure for the InMemoryTopicDelayedDeliveryTrackerManager, unable to adapt to other DelayedDeliveryTracker.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

At the code level, the new TopicDelayedDeliveryTrackerManager already exposes createOrGetTracker(AbstractPersistentDispatcherMultipleConsumers) returning the DelayedDeliveryTracker interface:

public interface TopicDelayedDeliveryTrackerManager extends AutoCloseable {
    DelayedDeliveryTracker createOrGetTracker(AbstractPersistentDispatcherMultipleConsumers dispatcher);
    // ...
}

so the dispatcher only depends on the DelayedDeliveryTracker interface and not on the concrete in-memory implementation. The InMemoryTopicDelayedDeliveryTracker is just one implementation that acts as a proxy to the shared topic-level manager in this PIP.

In the PIP text I focused on the in-memory implementation (e.g. InMemoryTopicDelayedDeliveryTracker*) because this proposal explicitly keeps BucketDelayedDeliveryTracker out of scope (see the “Out of Scope” section). The goal of PIP-448 is to address the memory footprint of the legacy in-memory tracker first, without changing any semantics of the persistent, bucket-based tracker.


### `InMemoryTopicDelayedDeliveryTracker` (New Class)
This class implements the `DelayedDeliveryTracker` interface for a single subscription.
* **Role**: Acts as a lightweight proxy, forwarding all operations (e.g., `addMessage`, `getScheduledMessages`) to the shared `InMemoryTopicDelayedDeliveryTrackerManager`.
Copy link
Member

@coderzc coderzc Dec 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Whether it's possible to consider using a special cursor to replay delayed messages to build shared delayed message tracker, instead of relying on the subscription-level DelayedDeliveryTracker to forward messages?

We can check and poll the scheduled message IDs from the shared delayed message tracker to add to the replay queue (MessageRedeliveryController) of all subscriptions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs PIP

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants