-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Search before asking
- I searched in the issues and found nothing similar.
Motivation
In the current delayed message delivery, there's an opportunity to reduce unnecessary reads to storage.
In Pulsar, there are 2 implementations for the delayed delivery tracker, InMemoryDelayedDeliveryTracker and BucketDelayedDeliveryTracker.
This is configured by the delayedDeliveryTrackerFactoryClassName configuration key. The default setting chooses the in memory implementation:
Lines 614 to 617 in b02d52c
| # Class name of the factory that implements the delayed deliver tracker. | |
| # If value is "org.apache.pulsar.broker.delayed.BucketDelayedDeliveryTrackerFactory", | |
| # will create bucket based delayed message index tracker. | |
| delayedDeliveryTrackerFactoryClassName=org.apache.pulsar.broker.delayed.InMemoryDelayedDeliveryTrackerFactory |
The BucketDelayedDeliveryTracker contains an optimization to skip messages in reading which have been "indexed":
Lines 446 to 457 in 11a615e
| protected Predicate<Position> createReadEntriesSkipConditionForNormalRead() { | |
| Predicate<Position> skipCondition = null; | |
| // Filter out and skip read delayed messages exist in DelayedDeliveryTracker | |
| if (delayedDeliveryTracker.isPresent()) { | |
| final DelayedDeliveryTracker deliveryTracker = delayedDeliveryTracker.get(); | |
| if (deliveryTracker instanceof BucketDelayedDeliveryTracker) { | |
| skipCondition = position -> ((BucketDelayedDeliveryTracker) deliveryTracker) | |
| .containsMessage(position.getLedgerId(), position.getEntryId()); | |
| } | |
| } | |
| return skipCondition; | |
| } |
This already reduces reads when BucketDelayedDeliveryTracker is in use.
The state of the InMemoryDelayedDeliveryTracker gets cleared after all consumers have disconnected:
Lines 187 to 202 in 12b0579
| if (consumerList.isEmpty()) { | |
| if (havePendingRead || havePendingReplayRead) { | |
| // There is a pending read from previous run. We must wait for it to complete and then rewind | |
| shouldRewindBeforeReadingOrReplaying = true; | |
| } else { | |
| cursor.rewind(); | |
| shouldRewindBeforeReadingOrReplaying = false; | |
| } | |
| redeliveryMessages.clear(); | |
| delayedDeliveryTracker.ifPresent(tracker -> { | |
| // Don't clean up BucketDelayedDeliveryTracker, otherwise we will lose the bucket snapshot | |
| if (tracker instanceof InMemoryDelayedDeliveryTracker) { | |
| tracker.clear(); | |
| } | |
| }); | |
| } |
Solution
It would be useful to keep state also in the InMemoryDelayedDeliveryTracker and skip reading delayed messages when the information is already available for the delivery time of a specific entry.
Alternatives
No response
Anything else?
No response
Are you willing to submit a PR?
- I'm willing to submit a PR!