Search before asking
Version
latest version
Minimal reproduce step
@Test(dataProvider = "topicName")
public void testSkipMessages(String topicName) throws Exception {
final String subName = topicName;
assertEquals(admin.topics().getList("prop-xyz/ns1"), new ArrayList<>());
final String persistentTopicName = "persistent://prop-xyz/ns1/" + topicName;
// Force to create a topic
publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/" + topicName, 0);
assertEquals(admin.topics().getList("prop-xyz/ns1"),
List.of("persistent://prop-xyz/ns1/" + topicName));
// create consumer and subscription
@Cleanup
PulsarClient client = PulsarClient.builder()
.serviceUrl(pulsar.getWebServiceAddress())
.statsInterval(0, TimeUnit.SECONDS)
.build();
AtomicInteger total = new AtomicInteger();
Consumer<byte[]> consumer = client.newConsumer().topic(persistentTopicName)
.messageListener(new MessageListener<byte[]>() {
@SneakyThrows
@Override
public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {
if (total.get() %2 !=0){
consumer.acknowledge(msg);
}
total.incrementAndGet();
}
})
.subscriptionName(subName)
.subscriptionType(SubscriptionType.Exclusive).subscribe();
assertEquals(admin.topics().getSubscriptions(persistentTopicName), List.of(subName));
publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/" + topicName, 100);
TimeUnit.SECONDS.sleep(3);
TopicStats topicStats = admin.topics().getStats(persistentTopicName);
long msgBacklog = topicStats.getSubscriptions().get(subName).getMsgBacklog();
log.info("back={}",msgBacklog);
int skipNumber = 20;
admin.topics().skipMessages(persistentTopicName, subName, skipNumber);
topicStats = admin.topics().getStats(persistentTopicName);
assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), msgBacklog - skipNumber);
}
When I artificially created 50 hollow messages and skipped 20 messages, only 14 messages were actually skipped.
What did you expect to see?
The expected number of skipped messages is equal to the actual number of skipped messages.
What did you see instead?
The actual number of skips is less than the expected number of skips.

Anything else?
|
} else { |
|
if (log.isDebugEnabled()) { |
|
log.debug("[{}] deletePosition {} moved ahead without clearing deleteMsgs {} for cursor {}", |
|
ledger.getName(), markDeletePosition, r.lowerEndpoint(), name); |
|
} |
|
} |
|
return true; |
|
} finally { |
|
if (r.lowerEndpoint() instanceof PositionImplRecyclable) { |
|
((PositionImplRecyclable) r.lowerEndpoint()).recycle(); |
|
((PositionImplRecyclable) r.upperEndpoint()).recycle(); |
|
} |
|
} |
|
}, recyclePositionRangeConverter); |
The reason for this issue is that the recycle() function reuses objects, causing the object referenced by r to change during runtime. When the loop count is greater than 8, the else block is entered, leading to an incorrect calculation of the message count.
I think there are two methods to fix this issue.
The first method is to perform a copy before assignment, similar to the following:
public void setStartPosition(PositionImpl startPosition) {
PositionImpl cp = new PositionImpl(startPosition.ledgerId, startPosition.entryId);
this.startPosition = cp;
}
public void setEndPosition(PositionImpl endPosition) {
PositionImpl cp = new PositionImpl(endPosition.ledgerId, endPosition.entryId);
this.endPosition = cp;
}
// state.endPosition = r.lowerEndpoint();
state.setEndPosition(r.lowerEndpoint());
// state.startPosition = r.upperEndpoint();
state.setStartPosition(r.upperEndpoint());
The second method is to remove recyclePositionRangeConverter.

any other suggestions for a better solution?
Are you willing to submit a PR?
Search before asking
Version
latest version
Minimal reproduce step
When I artificially created 50 hollow messages and skipped 20 messages, only 14 messages were actually skipped.
What did you expect to see?
The expected number of skipped messages is equal to the actual number of skipped messages.
What did you see instead?
The actual number of skips is less than the expected number of skips.

Anything else?
pulsar/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedCursorImpl.java
Lines 1772 to 1785 in e956db7
The reason for this issue is that the
recycle()function reuses objects, causing the object referenced byrto change during runtime. When the loop count is greater than 8, the else block is entered, leading to an incorrect calculation of the message count.I think there are two methods to fix this issue.
The first method is to perform a copy before assignment, similar to the following:
The second method is to remove
recyclePositionRangeConverter.any other suggestions for a better solution?
Are you willing to submit a PR?