Skip to content

[Bug] When using the admin.topics().skipMessages API, the actual number of skipped messages may be less than the expected number when the value of skipNumber is greater than 8. #20262

@crossoverJie

Description

@crossoverJie

Search before asking

  • I searched in the issues and found nothing similar.

Version

latest version

Minimal reproduce step

    @Test(dataProvider = "topicName")
    public void testSkipMessages(String topicName) throws Exception {
        final String subName = topicName;
        assertEquals(admin.topics().getList("prop-xyz/ns1"), new ArrayList<>());

        final String persistentTopicName = "persistent://prop-xyz/ns1/" + topicName;
        // Force to create a topic
        publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/" + topicName, 0);
        assertEquals(admin.topics().getList("prop-xyz/ns1"),
                List.of("persistent://prop-xyz/ns1/" + topicName));

        // create consumer and subscription
        @Cleanup
        PulsarClient client = PulsarClient.builder()
                .serviceUrl(pulsar.getWebServiceAddress())
                .statsInterval(0, TimeUnit.SECONDS)
                .build();
        AtomicInteger total = new AtomicInteger();
        Consumer<byte[]> consumer = client.newConsumer().topic(persistentTopicName)
                .messageListener(new MessageListener<byte[]>() {
                    @SneakyThrows
                    @Override
                    public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {
                        if (total.get() %2 !=0){
                            consumer.acknowledge(msg);
                        }
                        total.incrementAndGet();
                    }
                })
                .subscriptionName(subName)
                .subscriptionType(SubscriptionType.Exclusive).subscribe();

        assertEquals(admin.topics().getSubscriptions(persistentTopicName), List.of(subName));

        publishMessagesOnPersistentTopic("persistent://prop-xyz/ns1/" + topicName, 100);
        TimeUnit.SECONDS.sleep(3);
        TopicStats topicStats = admin.topics().getStats(persistentTopicName);
        long msgBacklog = topicStats.getSubscriptions().get(subName).getMsgBacklog();
        log.info("back={}",msgBacklog);
        int skipNumber = 20;
        admin.topics().skipMessages(persistentTopicName, subName, skipNumber);
        topicStats = admin.topics().getStats(persistentTopicName);
        assertEquals(topicStats.getSubscriptions().get(subName).getMsgBacklog(), msgBacklog - skipNumber);
    }

When I artificially created 50 hollow messages and skipped 20 messages, only 14 messages were actually skipped.

What did you expect to see?

The expected number of skipped messages is equal to the actual number of skipped messages.

What did you see instead?

The actual number of skips is less than the expected number of skips.
image

Anything else?

} else {
if (log.isDebugEnabled()) {
log.debug("[{}] deletePosition {} moved ahead without clearing deleteMsgs {} for cursor {}",
ledger.getName(), markDeletePosition, r.lowerEndpoint(), name);
}
}
return true;
} finally {
if (r.lowerEndpoint() instanceof PositionImplRecyclable) {
((PositionImplRecyclable) r.lowerEndpoint()).recycle();
((PositionImplRecyclable) r.upperEndpoint()).recycle();
}
}
}, recyclePositionRangeConverter);

image

The reason for this issue is that the recycle() function reuses objects, causing the object referenced by r to change during runtime. When the loop count is greater than 8, the else block is entered, leading to an incorrect calculation of the message count.

I think there are two methods to fix this issue.

The first method is to perform a copy before assignment, similar to the following:

        public void setStartPosition(PositionImpl startPosition) {
            PositionImpl cp = new PositionImpl(startPosition.ledgerId, startPosition.entryId);
            this.startPosition = cp;
        }

        public void setEndPosition(PositionImpl endPosition) {
            PositionImpl cp = new PositionImpl(endPosition.ledgerId, endPosition.entryId);
            this.endPosition = cp;
        }
	
	// state.endPosition = r.lowerEndpoint();
	state.setEndPosition(r.lowerEndpoint());

	// state.startPosition = r.upperEndpoint();
	state.setStartPosition(r.upperEndpoint());

The second method is to remove recyclePositionRangeConverter.

image

any other suggestions for a better solution?

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

Labels

type/bugThe PR fixed a bug or issue reported a bug

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions