Skip to content

Conversation

@vinkal-chudgar
Copy link
Contributor

@vinkal-chudgar vinkal-chudgar commented Nov 2, 2025

Fixes #24908

Motivation

NonPersistentTopicTest.testMsgDropStat fails intermittently on CI:

java.lang.AssertionError: expected [true] but found [false]
    at org.apache.pulsar.client.api.NonPersistentTopicTest.testMsgDropStat(NonPersistentTopicTest.java:905)

Failure point: assertTrue(latch.await(5, TimeUnit.SECONDS)) times out because no publisher drop is observed within 5 seconds (no MessageIdImpl with entryId == -1).

Root cause (in the original test loop)

for (int i = 0; i < totalProduceMessages; i++) {
executor.submit(() -> {
try {
MessageId msgId = producer.send(msgData);
int count = messagesSent.incrementAndGet();
// process at least 20% of messages before signalling the latch
// a non-persistent message will return entryId as -1 when it has been dropped
// due to setMaxConcurrentNonPersistentMessagePerConnection limit
// also ensure that it has happened before the latch is signalled
if (count > totalProduceMessages * 0.2 && msgId != null
&& ((MessageIdImpl) msgId).getEntryId() == -1) {
latch.countDown();
}
Thread.sleep(10);
} catch (PulsarClientException e) {
throw new RuntimeException(e);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException(e);
}
});
}
assertTrue(latch.await(5, TimeUnit.SECONDS));

Why it flakes:

  • The test sets maxConcurrentNonPersistentMessagePerConnection = 1. In ServerCnx#handleSend, the broker drops only when nonPersistentPendingMessages > maxNonPersistentMessagePerConnection. With maxConcurrentNonPersistentMessagePerConnection = 1, the first concurrent send increments the counter from 0 to 1 and is accepted. The second concurrent send checks 1 > 1 (false), then increments the counter to 2 and is accepted. Only a third overlapping send sees 2 > 1 and is dropped. Therefore, at least three overlapping sends are required to trigger any drop. The original loop does not guarantee that level of overlap.
    if (producer.isNonPersistentTopic()) {
    // avoid processing non-persist message if reached max concurrent-message limit
    if (nonPersistentPendingMessages > maxNonPersistentPendingMessages) {
    final long producerId = send.getProducerId();
    final long sequenceId = send.getSequenceId();
    final long highestSequenceId = send.getHighestSequenceId();
    service.getTopicOrderedExecutor().executeOrdered(producer.getTopic().getName(), () -> {
    commandSender.sendSendReceiptResponse(producerId, sequenceId, highestSequenceId, -1, -1);
    });
    producer.recordMessageDrop(send.getNumMessages());
    return;
    } else {
    nonPersistentPendingMessages++;
    }
    }
  • The executor threads that call producer.send() are not coordinated to start together, so overlap of send() calls on the same connection is left to scheduler timing and is therefore probabilistic.
  • The loop uses Thread.sleep(10), introducing timing assumptions that vary under CI load.
  • Latch gating creates an additional race: The latch is decremented only when a drop is observed after 20% of messages have been sent. If drops occur before the 20% threshold and later sends do not drop, the latch never opens and the test times out at the await. (See NonPersistentTopicTest.java lines 0887-0894 for the latch condition and line 0905 for the timeout.)

This PR makes the publisher-drop trigger reliable without changing the test scenario or API usage.

Modifications

Test-only changes in NonPersistentTopicTest.testMsgDropStat. No production code or docs changed.

  • Replace the unsynchronized, sleep-based loop with a small synchronized batch of send() calls.
    Use a CyclicBarrier to start a fixed number of producer.send() calls at the same time, explicitly creating the required overlap without sleeps or ad-hoc timing. Execute the batch inside a bounded Awaitility wait until at least one send() returns a MessageIdImpl with entryId == -1, then assert that condition.

  • Changed maxConcurrentNonPersistentMessagePerConnection from 1 to 0

    conf.setMaxConcurrentNonPersistentMessagePerConnection(0);

    Rational: In ServerCnx#handleSend for non-persistent topics, a publish is dropped only when nonPersistentPendingMessages > maxNonPersistentMessagePerConnection. Lowering the limit from 1 to 0 reduces the overlap requirement from three in-flight sends to two, which, combined with the synchronized start, makes the publisher drop reliably detectable in this test.

Verifying this change

  • Make sure that the change passes the CI checks.

Personal CI Results

Tested in Personal CI fork: vinkal-chudgar#3

Status: All checks have passed (48 successful checks, 3 skipped)

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: vinkal-chudgar#3

…istent publisher drop

Reliably trigger and verify connection-level non-persistent drop while preserving the original scenario and API.

- Set maxConcurrentNonPersistentMessagePerConnection=0 in the test and restart the broker. Since ServerCnx drops when inFlight > max, setting the limit to 0 causes any second overlapping send on the same connection to be dropped (entryId == -1) and recorded.

- Replace the previous large task loop that used Thread.sleep and a 20% latch with a small, synchronized batch of producer.send() calls started together using a CyclicBarrier. This explicitly creates the required overlap without sleeps or ad hoc timing.

No production code changes. Test only.

Signed-off-by: Vinkal Chudgar <vinkal.chudgar@gmail.com>
@github-actions github-actions bot added the doc-not-needed Your PR changes do not impact docs label Nov 2, 2025
@vinkal-chudgar vinkal-chudgar marked this pull request as ready for review November 3, 2025 04:49
Copy link
Member

@lhotari lhotari left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR, I added some review comments to address.

- Replace completionLatch.await() with assertTrue(completionLatch.await(20, TimeUnit.SECONDS))
- Replace manual throwable check with assertNull(error.get(), "Concurrent send encountered an exception")

Signed-off-by: Vinkal Chudgar <vinkal.chudgar@gmail.com>
@vinkal-chudgar
Copy link
Contributor Author

@lhotari - Thank you for the review. I’ve addressed the comments. Could you please take a look when convenient?

@lhotari
Copy link
Member

lhotari commented Nov 4, 2025

/pulsarbot rerun-failure-checks

@codecov-commenter
Copy link

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 74.25%. Comparing base (f55d45a) to head (bfcd696).
⚠️ Report is 11 commits behind head on master.

Additional details and impacted files

Impacted file tree graph

@@              Coverage Diff              @@
##             master   #24929       +/-   ##
=============================================
+ Coverage     38.74%   74.25%   +35.51%     
- Complexity    13265    33511    +20246     
=============================================
  Files          1856     1913       +57     
  Lines        145165   149446     +4281     
  Branches      16848    17362      +514     
=============================================
+ Hits          56238   110967    +54729     
+ Misses        81385    29625    -51760     
- Partials       7542     8854     +1312     
Flag Coverage Δ
inttests 26.21% <ø> (-0.42%) ⬇️
systests 22.68% <ø> (-0.21%) ⬇️
unittests 73.79% <ø> (+38.90%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.
see 1411 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@lhotari lhotari added this to the 4.2.0 milestone Nov 4, 2025
@lhotari lhotari merged commit 60acfba into apache:master Nov 4, 2025
97 of 100 checks passed
lhotari pushed a commit that referenced this pull request Nov 4, 2025
…istent publisher drop (#24929)

Signed-off-by: Vinkal Chudgar <vinkal.chudgar@gmail.com>
(cherry picked from commit 60acfba)
lhotari pushed a commit that referenced this pull request Nov 4, 2025
…istent publisher drop (#24929)

Signed-off-by: Vinkal Chudgar <vinkal.chudgar@gmail.com>
(cherry picked from commit 60acfba)
lhotari pushed a commit that referenced this pull request Nov 4, 2025
…istent publisher drop (#24929)

Signed-off-by: Vinkal Chudgar <vinkal.chudgar@gmail.com>
(cherry picked from commit 60acfba)
lhotari pushed a commit that referenced this pull request Nov 4, 2025
…istent publisher drop (#24929)

Signed-off-by: Vinkal Chudgar <vinkal.chudgar@gmail.com>
(cherry picked from commit 60acfba)
ganesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Nov 6, 2025
…istent publisher drop (apache#24929)

Signed-off-by: Vinkal Chudgar <vinkal.chudgar@gmail.com>
(cherry picked from commit 60acfba)
(cherry picked from commit 1a2e12f)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Nov 6, 2025
…istent publisher drop (apache#24929)

Signed-off-by: Vinkal Chudgar <vinkal.chudgar@gmail.com>
(cherry picked from commit 60acfba)
(cherry picked from commit 1a2e12f)
nodece pushed a commit to nodece/pulsar that referenced this pull request Nov 12, 2025
…istent publisher drop (apache#24929)

Signed-off-by: Vinkal Chudgar <vinkal.chudgar@gmail.com>
(cherry picked from commit 60acfba)
manas-ctds pushed a commit to datastax/pulsar that referenced this pull request Nov 13, 2025
…istent publisher drop (apache#24929)

Signed-off-by: Vinkal Chudgar <vinkal.chudgar@gmail.com>
(cherry picked from commit 60acfba)
(cherry picked from commit d66e5ee)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Nov 13, 2025
…istent publisher drop (apache#24929)

Signed-off-by: Vinkal Chudgar <vinkal.chudgar@gmail.com>
(cherry picked from commit 60acfba)
(cherry picked from commit d66e5ee)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Flaky-test: NonPersistentTopicTest.testMsgDropStat

3 participants