Skip to content

Commit 29778d5

Browse files
Apurva007poorbarcode
authored andcommitted
[improve][broker]Find the target position at most once, during expiring messages for a topic, even though there are many subscriptions (apache#24622)
(cherry picked from commit 84205eb)
1 parent eef82db commit 29778d5

File tree

7 files changed

+171
-23
lines changed

7 files changed

+171
-23
lines changed

managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpFindNewest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -250,6 +250,9 @@ private Position findNextValidPosition(Position searchPosition, Exception except
250250
return nextPosition;
251251
}
252252

253+
/**
254+
* Find the largest entry that matches the given predicate.
255+
*/
253256
public void find() {
254257
if (cursor != null ? cursor.hasMoreEntries(searchPosition) : ledger.hasMoreEntries(searchPosition)) {
255258
ledger.asyncReadEntry(searchPosition, this, null);

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/MessageExpirer.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,19 @@
2525
@InterfaceStability.Evolving
2626
public interface MessageExpirer {
2727

28+
/**
29+
* Mark delete the largest position that is less than or equals the {@param position}.
30+
*/
2831
boolean expireMessages(Position position);
2932

33+
/**
34+
* Mark delete the largest message that publish timestamp is less than the result of the expression
35+
* "{@link System#currentTimeMillis - {@param messageTTLInSeconds})".
36+
*/
3037
boolean expireMessages(int messageTTLInSeconds);
3138

39+
/**
40+
* Async implementation of {@link #expireMessages(int)}.
41+
*/
3242
CompletableFuture<Boolean> expireMessagesAsync(int messageTTLInSeconds);
3343
}

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitor.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.apache.bookkeeper.mledger.ManagedLedgerException.NonRecoverableLedgerException;
3535
import org.apache.bookkeeper.mledger.Position;
3636
import org.apache.bookkeeper.mledger.PositionFactory;
37+
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
3738
import org.apache.bookkeeper.mledger.proto.MLDataFormats;
3839
import org.apache.pulsar.broker.service.MessageExpirer;
3940
import org.apache.pulsar.client.impl.MessageImpl;
@@ -145,6 +146,20 @@ private void checkExpiryByLedgerClosureTime(ManagedCursor cursor, int messageTTL
145146
public boolean expireMessages(Position messagePosition) {
146147
// If it's beyond last position of this topic, do nothing.
147148
Position topicLastPosition = this.topic.getLastPosition();
149+
ManagedLedger managedLedger = cursor.getManagedLedger();
150+
if (managedLedger instanceof ManagedLedgerImpl ml) {
151+
// Confirm the position is valid.
152+
Optional<MLDataFormats.ManagedLedgerInfo.LedgerInfo> ledgerInfoOptional =
153+
ml.getOptionalLedgerInfo(messagePosition.getLedgerId());
154+
if (ledgerInfoOptional.isPresent()) {
155+
if (messagePosition.getEntryId() >= 0
156+
&& ledgerInfoOptional.get().getEntries() - 1 >= messagePosition.getEntryId()) {
157+
findEntryComplete(messagePosition, null);
158+
return true;
159+
}
160+
}
161+
}
162+
// Fallback to the slower solution if the managed ledger is not an instance of ManagedLedgerImpl.
148163
if (topicLastPosition.compareTo(messagePosition) < 0) {
149164
if (log.isDebugEnabled()) {
150165
log.debug("[{}][{}] Ignore expire-message scheduled task, given position {} is beyond "

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 70 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
import org.apache.bookkeeper.mledger.PositionFactory;
8989
import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
9090
import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer.CursorInfo;
91+
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
9192
import org.apache.bookkeeper.mledger.util.Futures;
9293
import org.apache.commons.collections4.CollectionUtils;
9394
import org.apache.commons.lang3.StringUtils;
@@ -2115,19 +2116,82 @@ private CompletableFuture<Void> checkShadowReplication() {
21152116
@Override
21162117
public void checkMessageExpiry() {
21172118
int messageTtlInSeconds = topicPolicies.getMessageTTLInSeconds().get();
2118-
if (messageTtlInSeconds != 0) {
2119+
if (messageTtlInSeconds <= 0) {
2120+
return;
2121+
}
2122+
2123+
ManagedLedger managedLedger = getManagedLedger();
2124+
if (managedLedger instanceof ManagedLedgerImpl ml) {
2125+
checkMessageExpiryWithSharedPosition(ml, messageTtlInSeconds);
2126+
} else {
2127+
// Fallback to the slower solution if managed ledger is not an instance of ManagedLedgerImpl: each
2128+
// subscription find position and handle expiring itself.
2129+
checkMessageExpiryWithoutSharedPosition(messageTtlInSeconds);
2130+
}
2131+
}
2132+
2133+
private void checkMessageExpiryWithoutSharedPosition(int messageTtlInSeconds) {
2134+
subscriptions.forEach((__, sub) -> {
2135+
if (!isCompactionSubscription(sub.getName())
2136+
&& (additionalSystemCursorNames.isEmpty()
2137+
|| !additionalSystemCursorNames.contains(sub.getName()))) {
2138+
sub.expireMessagesAsync(messageTtlInSeconds);
2139+
}
2140+
});
2141+
replicators.forEach((__, replicator)
2142+
-> ((PersistentReplicator) replicator).expireMessagesAsync(messageTtlInSeconds));
2143+
shadowReplicators.forEach((__, replicator)
2144+
-> ((PersistentReplicator) replicator).expireMessagesAsync(messageTtlInSeconds));
2145+
}
2146+
2147+
private void checkMessageExpiryWithSharedPosition(ManagedLedgerImpl ml, int messageTtlInSeconds) {
2148+
// Find the target position at one time, then expire all subscriptions and replicators.
2149+
ManagedCursor cursor = ml.getCursors().getCursorWithOldestPosition().getCursor();
2150+
PersistentMessageFinder finder = new PersistentMessageFinder(topic, cursor, brokerService.getPulsar()
2151+
.getConfig().getManagedLedgerCursorResetLedgerCloseTimestampMaxClockSkewMillis());
2152+
// Find the target position.
2153+
long expiredMessageTimestamp = System.currentTimeMillis() - TimeUnit.SECONDS.toMillis(messageTtlInSeconds);
2154+
CompletableFuture<Position> positionToMarkDelete = new CompletableFuture<>();
2155+
finder.findMessages(expiredMessageTimestamp, new AsyncCallbacks.FindEntryCallback() {
2156+
@Override
2157+
public void findEntryComplete(Position position, Object ctx) {
2158+
positionToMarkDelete.complete(position);
2159+
}
2160+
2161+
@Override
2162+
public void findEntryFailed(ManagedLedgerException exception, Optional<Position> failedReadPosition,
2163+
Object ctx) {
2164+
log.error("[{}] Error finding expired position, failed reading position is {}", topic,
2165+
failedReadPosition.orElse(null), exception);
2166+
// Since we have logged the error, we can skip to print error log at next step.
2167+
positionToMarkDelete.complete(null);
2168+
}
2169+
});
2170+
positionToMarkDelete.thenAccept(position -> {
2171+
if (position == null) {
2172+
// Nothing need to be expired.
2173+
return;
2174+
}
2175+
// Expire messages by position, which is more efficient.
21192176
subscriptions.forEach((__, sub) -> {
21202177
if (!isCompactionSubscription(sub.getName())
21212178
&& (additionalSystemCursorNames.isEmpty()
2122-
|| !additionalSystemCursorNames.contains(sub.getName()))) {
2123-
sub.expireMessagesAsync(messageTtlInSeconds);
2179+
|| !additionalSystemCursorNames.contains(sub.getName()))) {
2180+
// The variable "position" is to mark delete position.
2181+
// Regarding the method "expireMessages(position)", it will mark delete the target position if the
2182+
// position is valid, otherwise, it mark deletes the previous valid position.
2183+
// So we give it the position to be mark deleted.
2184+
sub.expireMessages(position);
21242185
}
21252186
});
21262187
replicators.forEach((__, replicator)
2127-
-> ((PersistentReplicator) replicator).expireMessagesAsync(messageTtlInSeconds));
2188+
-> ((PersistentReplicator) replicator).expireMessages(position));
21282189
shadowReplicators.forEach((__, replicator)
2129-
-> ((PersistentReplicator) replicator).expireMessagesAsync(messageTtlInSeconds));
2130-
}
2190+
-> ((PersistentReplicator) replicator).expireMessages(position));
2191+
}).exceptionally(ex -> {
2192+
log.error("[{}] Failed to expire messages by position", topic, ex);
2193+
return null;
2194+
});
21312195
}
21322196

21332197
@Override

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/persistent/PersistentMessageExpiryMonitorTest.java renamed to pulsar-broker/src/test/java/org/apache/bookkeeper/mledger/impl/PersistentMessageExpiryMonitorTest.java

Lines changed: 66 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,28 +16,31 @@
1616
* specific language governing permissions and limitations
1717
* under the License.
1818
*/
19-
package org.apache.pulsar.broker.service.persistent;
19+
package org.apache.bookkeeper.mledger.impl;
2020

2121
import static org.mockito.ArgumentMatchers.any;
2222
import static org.mockito.ArgumentMatchers.anyBoolean;
23+
import static org.mockito.ArgumentMatchers.anyLong;
2324
import static org.mockito.Mockito.doAnswer;
25+
import static org.mockito.Mockito.spy;
2426
import static org.testng.AssertJUnit.assertEquals;
2527
import java.util.Map;
2628
import java.util.concurrent.CompletableFuture;
2729
import java.util.concurrent.CountDownLatch;
30+
import java.util.concurrent.TimeUnit;
2831
import java.util.concurrent.atomic.AtomicInteger;
2932
import lombok.extern.slf4j.Slf4j;
33+
import org.apache.bookkeeper.client.api.ReadHandle;
3034
import org.apache.bookkeeper.mledger.AsyncCallbacks;
3135
import org.apache.bookkeeper.mledger.Position;
32-
import org.apache.bookkeeper.mledger.impl.ManagedCursorImpl;
33-
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
3436
import org.apache.pulsar.broker.BrokerTestUtil;
37+
import org.apache.pulsar.broker.service.persistent.PersistentMessageExpiryMonitor;
38+
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
3539
import org.apache.pulsar.client.api.MessageId;
3640
import org.apache.pulsar.client.api.Producer;
3741
import org.apache.pulsar.client.api.ProducerConsumerBase;
3842
import org.apache.pulsar.client.api.Schema;
3943
import org.awaitility.Awaitility;
40-
import org.mockito.Mockito;
4144
import org.testng.Assert;
4245
import org.testng.annotations.AfterClass;
4346
import org.testng.annotations.BeforeClass;
@@ -59,6 +62,11 @@ protected void cleanup() throws Exception {
5962
super.internalCleanup();
6063
}
6164

65+
@Override
66+
protected void doInitConf() throws Exception {
67+
conf.setMessageExpiryCheckIntervalInMinutes(60);
68+
}
69+
6270
/***
6371
* Confirm the anti-concurrency mechanism "expirationCheckInProgressUpdater" works.
6472
*/
@@ -76,7 +84,7 @@ void testConcurrentlyExpireMessages() throws Exception {
7684
(PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get();
7785
ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
7886
ManagedCursorImpl cursor = (ManagedCursorImpl) ml.getCursors().get(cursorName);
79-
ManagedCursorImpl spyCursor = Mockito.spy(cursor);
87+
ManagedCursorImpl spyCursor = spy(cursor);
8088

8189
// Make the mark-deleting delay.
8290
CountDownLatch firstFindingCompleted = new CountDownLatch(1);
@@ -98,14 +106,6 @@ void testConcurrentlyExpireMessages() throws Exception {
98106
calledFindPositionCount.incrementAndGet();
99107
return invocationOnMock.callRealMethod();
100108
}).when(spyCursor).asyncFindNewestMatching(any(), any(), any(), any(), any(), any(), anyBoolean());
101-
doAnswer(invocationOnMock -> {
102-
calledFindPositionCount.incrementAndGet();
103-
return invocationOnMock.callRealMethod();
104-
}).when(spyCursor).asyncFindNewestMatching(any(), any(), any(), any(), anyBoolean());
105-
doAnswer(invocationOnMock -> {
106-
calledFindPositionCount.incrementAndGet();
107-
return invocationOnMock.callRealMethod();
108-
}).when(spyCursor).asyncFindNewestMatching(any(), any(), any(), any());
109109

110110
// Sleep 2s to make "find(1s)" get a position.
111111
Thread.sleep(2000);
@@ -138,4 +138,57 @@ void testConcurrentlyExpireMessages() throws Exception {
138138
producer.close();
139139
admin.topics().delete(topicName);
140140
}
141+
142+
/***
143+
* Verify finding position task only executes once for multiple subscriptions of a topic.
144+
*/
145+
@Test(invocationCount = 2)
146+
void testTopicExpireMessages() throws Exception {
147+
// Create topic.
148+
final String topicName = BrokerTestUtil.newUniqueName("persistent://public/default/tp");
149+
admin.topics().createNonPartitionedTopic(topicName);
150+
PersistentTopic persistentTopic =
151+
(PersistentTopic) pulsar.getBrokerService().getTopic(topicName, false).join().get();
152+
final String cursorName1 = "s1";
153+
final String cursorName2 = "s2";
154+
Producer<String> producer = pulsarClient.newProducer(Schema.STRING).topic(topicName).create();
155+
admin.topics().createSubscriptionAsync(topicName, cursorName1, MessageId.earliest);
156+
admin.topics().createSubscriptionAsync(topicName, cursorName2, MessageId.earliest);
157+
admin.topicPolicies().setMessageTTL(topicName, 1);
158+
Awaitility.await().untilAsserted(() -> {
159+
assertEquals(1, persistentTopic.getHierarchyTopicPolicies().getMessageTTLInSeconds().get().intValue());
160+
});
161+
ManagedLedgerImpl ml = (ManagedLedgerImpl) persistentTopic.getManagedLedger();
162+
ml.getConfig().setMaxEntriesPerLedger(2);
163+
ml.getConfig().setMinimumRolloverTime(1, TimeUnit.MILLISECONDS);
164+
long firstLedger = ml.currentLedger.getId();
165+
System.out.println("maxEntriesPerLedger 1 : " + ml.getConfig().getMaxEntriesPerLedger());
166+
// Trigger 3 ledgers creation.
167+
for (int i = 0; i < 5; i++) {
168+
producer.send("" + i);
169+
Thread.sleep(100);
170+
}
171+
System.out.println("maxEntriesPerLedger 2 : " + ml.getConfig().getMaxEntriesPerLedger());
172+
assertEquals(3, ml.getLedgersInfo().size());
173+
// Do a injection to count the access of the first ledger.
174+
AtomicInteger accessedCount = new AtomicInteger();
175+
ReadHandle readHandle = ml.getLedgerHandle(firstLedger).get();
176+
ReadHandle spyReadHandle = spy(readHandle);
177+
doAnswer(invocationOnMock -> {
178+
long startEntry = (long) invocationOnMock.getArguments()[0];
179+
if (startEntry == 0) {
180+
accessedCount.incrementAndGet();
181+
}
182+
return invocationOnMock.callRealMethod();
183+
}).when(spyReadHandle).readAsync(anyLong(), anyLong());
184+
ml.ledgerCache.put(firstLedger, CompletableFuture.completedFuture(spyReadHandle));
185+
// Verify: the first ledger will be accessed only once after expiry for two subscriptions.
186+
persistentTopic.checkMessageExpiry();
187+
Thread.sleep(2000);
188+
assertEquals(1, accessedCount.get());
189+
190+
// cleanup.
191+
producer.close();
192+
admin.topics().delete(topicName);
193+
}
141194
}

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/PersistentTopicE2ETest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1119,12 +1119,12 @@ public void testMessageExpiryWithFewExpiredBacklog() throws Exception {
11191119
rolloverPerIntervalStats();
11201120
assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs);
11211121

1122-
Thread.sleep(TimeUnit.SECONDS.toMillis(messageTTLSecs));
1122+
Thread.sleep(TimeUnit.SECONDS.toMillis(messageTTLSecs - 1));
11231123
runMessageExpiryCheck();
11241124

11251125
assertEquals(subRef.getNumberOfEntriesInBacklog(false), numMsgs);
11261126

1127-
Thread.sleep(TimeUnit.SECONDS.toMillis(messageTTLSecs / 2));
1127+
Thread.sleep(TimeUnit.SECONDS.toMillis(1 + messageTTLSecs / 2));
11281128
runMessageExpiryCheck();
11291129

11301130
assertEquals(subRef.getNumberOfEntriesInBacklog(false), 0);

pulsar-broker/src/test/java/org/apache/pulsar/broker/stats/PrometheusMetricsTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -801,11 +801,14 @@ public void testPerTopicExpiredStat() throws Exception {
801801
p2.close();
802802
// Let the message expire
803803
for (String topic : topicList) {
804+
// The TTL value can not be set to a negative value, the mininum value is 1.
804805
PersistentTopic persistentTopic = (PersistentTopic) pulsar.getBrokerService()
805806
.getTopicIfExists(topic).get().get();
806-
persistentTopic.getBrokerService().getPulsar().getConfiguration().setTtlDurationDefaultInSeconds(-1);
807-
persistentTopic.getHierarchyTopicPolicies().getMessageTTLInSeconds().updateBrokerValue(-1);
807+
persistentTopic.getBrokerService().getPulsar().getConfiguration().setTtlDurationDefaultInSeconds(1);
808+
persistentTopic.getHierarchyTopicPolicies().getMessageTTLInSeconds().updateBrokerValue(1);
808809
}
810+
// Wait 2 seconds to expire message.
811+
Thread.sleep(2000);
809812
pulsar.getBrokerService().forEachTopic(Topic::checkMessageExpiry);
810813
//wait for checkMessageExpiry
811814
PersistentSubscription sub = (PersistentSubscription)

0 commit comments

Comments
 (0)