4646import java .util .concurrent .CyclicBarrier ;
4747import java .util .concurrent .atomic .AtomicBoolean ;
4848import lombok .Cleanup ;
49+ import org .apache .bookkeeper .mledger .ManagedCursor ;
4950import org .apache .bookkeeper .mledger .Position ;
51+ import org .apache .bookkeeper .mledger .impl .ManagedCursorContainer ;
5052import org .apache .bookkeeper .mledger .impl .ManagedLedgerImpl ;
5153import org .apache .commons .lang3 .tuple .Pair ;
5254import org .apache .pulsar .broker .BrokerTestUtil ;
@@ -151,6 +153,7 @@ void setup() throws Exception {
151153 config .setBacklogQuotaCheckIntervalInSeconds (TIME_TO_CHECK_BACKLOG_QUOTA );
152154 config .setManagedLedgerMaxEntriesPerLedger (MAX_ENTRIES_PER_LEDGER );
153155 config .setManagedLedgerMinLedgerRolloverTimeMinutes (0 );
156+ config .setManagedLedgerDefaultMarkDeleteRateLimit (1000 );
154157 config .setAllowAutoTopicCreationType (TopicType .NON_PARTITIONED );
155158 config .setSystemTopicEnabled (true );
156159 config .setTopicLevelPoliciesEnabled (true );
@@ -704,35 +707,43 @@ public void backlogsAgeMetricsNoPreciseWithoutBacklogQuota() throws Exception {
704707 final String topic1 = "persistent://prop/ns-quota/topic2" + UUID .randomUUID ();
705708
706709 final String subName1 = "c1" ;
707- final int numMsgs = 5 ;
710+ final int numMsgs = 7 ;
708711
709712 Consumer <byte []> consumer1 = client .newConsumer ().topic (topic1 ).subscriptionName (subName1 )
710713 .acknowledgmentGroupTime (0 , SECONDS )
711714 .subscribe ();
712715 Producer <byte []> producer = createProducer (client , topic1 );
713716
714717 byte [] content = new byte [1024 ];
718+ // 1. Send messages
719+ // The manager ledger max entries is 5, so we can send 7 messages to make sure we have multiple ledgers
720+ // When send msg 4, the ledger closed.
721+ // Second: 1 2 3 4 5 6 7
722+ // msg idx: [0 1 2 3 4] [5 6]
715723 for (int i = 0 ; i < numMsgs ; i ++) {
716- Thread .sleep (3000 ); // Guarantees if we use wrong message in age, to show up in failed test
717- producer .send (content );
724+ Thread .sleep (1000 );
725+ MessageId send = producer .send (content );
718726 }
727+ long lastLedgerCloseTime = System .currentTimeMillis () - 2000 ;
719728
729+ // 2. Receive msg-0 and ack it.
730+ String c1MarkDeletePositionBefore =
731+ admin .topics ().getInternalStats (topic1 ).cursors .get (subName1 ).markDeletePosition ;
720732 Message <byte []> oldestMessage = consumer1 .receive ();
721733 consumer1 .acknowledge (oldestMessage );
722- log .info ("Moved subscription 1, by 1 message" );
723-
724- // Unload topic to trigger the ledger close
725- unloadAndLoadTopic (topic1 , producer );
726- long unloadTime = System .currentTimeMillis ();
727- waitForQuotaCheckToRunTwice ();
734+ c1MarkDeletePositionBefore = waitForMarkDeletePositionToChange (topic1 , subName1 ,
735+ c1MarkDeletePositionBefore );
736+ log .info ("Moved subscription 1, by 1 message {}" , oldestMessage .getMessageId ());
728737
729- Metrics metrics = prometheusMetricsClient .getMetrics ();
738+ // 3. Expected the oldestBacklogMessageAgeSeconds is based on last ledger close time
739+ long expectedMessageAgeSeconds =
740+ MILLISECONDS .toSeconds (System .currentTimeMillis () - lastLedgerCloseTime );
741+ PersistentTopic topicRef = (PersistentTopic ) pulsar .getBrokerService ().getTopicReference (topic1 ).get ();
742+ topicRef .updateOldPositionInfo ();
730743 TopicStats topicStats = getTopicStats (topic1 );
731-
732- long expectedMessageAgeSeconds = MILLISECONDS .toSeconds (System .currentTimeMillis () - unloadTime );
733744 assertThat (topicStats .getOldestBacklogMessageAgeSeconds ())
734745 .isCloseTo (expectedMessageAgeSeconds , within (1L ));
735-
746+ Metrics metrics = prometheusMetricsClient . getMetrics ();
736747 Metric backlogAgeMetric =
737748 metrics .findSingleMetricByNameAndLabels ("pulsar_storage_backlog_age_seconds" ,
738749 Pair .of ("topic" , topic1 ));
@@ -741,6 +752,24 @@ public void backlogsAgeMetricsNoPreciseWithoutBacklogQuota() throws Exception {
741752 entry ("namespace" , namespace ),
742753 entry ("topic" , topic1 ));
743754 assertThat ((long ) backlogAgeMetric .value ).isCloseTo (expectedMessageAgeSeconds , within (2L ));
755+
756+ // 4. Move consumer to `end - 1`, then OldestBacklogMessageAgeSeconds should be `-1`, because the
757+ // second ledger is not closed yet.
758+ for (int i = 1 ; i < numMsgs - 1 ; i ++) {
759+ Message <byte []> msg = consumer1 .receive ();
760+ consumer1 .acknowledge (msg );
761+ }
762+ waitForMarkDeletePositionToChange (topic1 , subName1 , c1MarkDeletePositionBefore );
763+ ManagedCursorContainer cursors = (ManagedCursorContainer ) topicRef .getManagedLedger ().getCursors ();
764+ ManagedCursor subCursor = cursors .get (subName1 );
765+ Awaitility .await ().pollInterval (100 , MILLISECONDS ).atMost (5 , SECONDS ).until (
766+ () -> subCursor .getMarkDeletedPosition ().equals (subCursor .getPersistentMarkDeletedPosition ()));
767+ topicRef .updateOldPositionInfo ();
768+ topicStats = getTopicStats (topic1 , true );
769+ assertThat (topicStats .getSubscriptions ().get (subName1 ).getMsgBacklog ())
770+ .isEqualTo (1L );
771+ assertThat (topicStats .getOldestBacklogMessageAgeSeconds ())
772+ .isEqualTo (-1L );
744773 }
745774 }
746775
0 commit comments