Skip to content

Commit 54da0c8

Browse files
authored
[fix][broker] BacklogMessageAge is not reset when cursor mdPosition is on an open ledger (apache#24915)
1 parent 39bb675 commit 54da0c8

File tree

2 files changed

+44
-13
lines changed

2 files changed

+44
-13
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3877,6 +3877,8 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
38773877
oldestMarkDeleteCursorInfo.getCursor().getName(),
38783878
checkResult.getEstimatedOldestUnacknowledgedMessageTimestamp(),
38793879
oldestMarkDeleteCursorInfo.getVersion()));
3880+
} else {
3881+
TIME_BASED_BACKLOG_QUOTA_CHECK_RESULT_UPDATER.set(this, null);
38803882
}
38813883

38823884
return CompletableFuture.completedFuture(null);

pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BacklogQuotaManagerTest.java

Lines changed: 42 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@
4646
import java.util.concurrent.CyclicBarrier;
4747
import java.util.concurrent.atomic.AtomicBoolean;
4848
import lombok.Cleanup;
49+
import org.apache.bookkeeper.mledger.ManagedCursor;
4950
import org.apache.bookkeeper.mledger.Position;
51+
import org.apache.bookkeeper.mledger.impl.ManagedCursorContainer;
5052
import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl;
5153
import org.apache.commons.lang3.tuple.Pair;
5254
import org.apache.pulsar.broker.BrokerTestUtil;
@@ -151,6 +153,7 @@ void setup() throws Exception {
151153
config.setBacklogQuotaCheckIntervalInSeconds(TIME_TO_CHECK_BACKLOG_QUOTA);
152154
config.setManagedLedgerMaxEntriesPerLedger(MAX_ENTRIES_PER_LEDGER);
153155
config.setManagedLedgerMinLedgerRolloverTimeMinutes(0);
156+
config.setManagedLedgerDefaultMarkDeleteRateLimit(1000);
154157
config.setAllowAutoTopicCreationType(TopicType.NON_PARTITIONED);
155158
config.setSystemTopicEnabled(true);
156159
config.setTopicLevelPoliciesEnabled(true);
@@ -704,35 +707,43 @@ public void backlogsAgeMetricsNoPreciseWithoutBacklogQuota() throws Exception {
704707
final String topic1 = "persistent://prop/ns-quota/topic2" + UUID.randomUUID();
705708

706709
final String subName1 = "c1";
707-
final int numMsgs = 5;
710+
final int numMsgs = 7;
708711

709712
Consumer<byte[]> consumer1 = client.newConsumer().topic(topic1).subscriptionName(subName1)
710713
.acknowledgmentGroupTime(0, SECONDS)
711714
.subscribe();
712715
Producer<byte[]> producer = createProducer(client, topic1);
713716

714717
byte[] content = new byte[1024];
718+
// 1. Send messages
719+
// The manager ledger max entries is 5, so we can send 7 messages to make sure we have multiple ledgers
720+
// When send msg 4, the ledger closed.
721+
// Second: 1 2 3 4 5 6 7
722+
// msg idx: [0 1 2 3 4] [5 6]
715723
for (int i = 0; i < numMsgs; i++) {
716-
Thread.sleep(3000); // Guarantees if we use wrong message in age, to show up in failed test
717-
producer.send(content);
724+
Thread.sleep(1000);
725+
MessageId send = producer.send(content);
718726
}
727+
long lastLedgerCloseTime = System.currentTimeMillis() - 2000;
719728

729+
// 2. Receive msg-0 and ack it.
730+
String c1MarkDeletePositionBefore =
731+
admin.topics().getInternalStats(topic1).cursors.get(subName1).markDeletePosition;
720732
Message<byte[]> oldestMessage = consumer1.receive();
721733
consumer1.acknowledge(oldestMessage);
722-
log.info("Moved subscription 1, by 1 message");
723-
724-
// Unload topic to trigger the ledger close
725-
unloadAndLoadTopic(topic1, producer);
726-
long unloadTime = System.currentTimeMillis();
727-
waitForQuotaCheckToRunTwice();
734+
c1MarkDeletePositionBefore = waitForMarkDeletePositionToChange(topic1, subName1,
735+
c1MarkDeletePositionBefore);
736+
log.info("Moved subscription 1, by 1 message {}", oldestMessage.getMessageId());
728737

729-
Metrics metrics = prometheusMetricsClient.getMetrics();
738+
// 3. Expected the oldestBacklogMessageAgeSeconds is based on last ledger close time
739+
long expectedMessageAgeSeconds =
740+
MILLISECONDS.toSeconds(System.currentTimeMillis() - lastLedgerCloseTime);
741+
PersistentTopic topicRef = (PersistentTopic) pulsar.getBrokerService().getTopicReference(topic1).get();
742+
topicRef.updateOldPositionInfo();
730743
TopicStats topicStats = getTopicStats(topic1);
731-
732-
long expectedMessageAgeSeconds = MILLISECONDS.toSeconds(System.currentTimeMillis() - unloadTime);
733744
assertThat(topicStats.getOldestBacklogMessageAgeSeconds())
734745
.isCloseTo(expectedMessageAgeSeconds, within(1L));
735-
746+
Metrics metrics = prometheusMetricsClient.getMetrics();
736747
Metric backlogAgeMetric =
737748
metrics.findSingleMetricByNameAndLabels("pulsar_storage_backlog_age_seconds",
738749
Pair.of("topic", topic1));
@@ -741,6 +752,24 @@ public void backlogsAgeMetricsNoPreciseWithoutBacklogQuota() throws Exception {
741752
entry("namespace", namespace),
742753
entry("topic", topic1));
743754
assertThat((long) backlogAgeMetric.value).isCloseTo(expectedMessageAgeSeconds, within(2L));
755+
756+
// 4. Move consumer to `end - 1`, then OldestBacklogMessageAgeSeconds should be `-1`, because the
757+
// second ledger is not closed yet.
758+
for (int i = 1; i < numMsgs - 1; i++) {
759+
Message<byte[]> msg = consumer1.receive();
760+
consumer1.acknowledge(msg);
761+
}
762+
waitForMarkDeletePositionToChange(topic1, subName1, c1MarkDeletePositionBefore);
763+
ManagedCursorContainer cursors = (ManagedCursorContainer) topicRef.getManagedLedger().getCursors();
764+
ManagedCursor subCursor = cursors.get(subName1);
765+
Awaitility.await().pollInterval(100, MILLISECONDS).atMost(5, SECONDS).until(
766+
() -> subCursor.getMarkDeletedPosition().equals(subCursor.getPersistentMarkDeletedPosition()));
767+
topicRef.updateOldPositionInfo();
768+
topicStats = getTopicStats(topic1, true);
769+
assertThat(topicStats.getSubscriptions().get(subName1).getMsgBacklog())
770+
.isEqualTo(1L);
771+
assertThat(topicStats.getOldestBacklogMessageAgeSeconds())
772+
.isEqualTo(-1L);
744773
}
745774
}
746775

0 commit comments

Comments
 (0)