-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Describe the bug
Consuming a message with the Reader interface immediately after seeking can consume the wrong message. This only seems to happen in a pretty specific situation (details below). Adding a short pause fixes it, so I wonder if it could happen in other situations.
To Reproduce
IntelliJ / Maven project is attached: PulsarSeek.zip
Adjust the SERVICE_URL as needed. I am using Pulsar 2.7.1 in Docker.
public class SeekTest
{
private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String TEST_TOPIC = "persistent://public/default/seek_test";
public static void main(String[] args) throws Exception
{
PulsarClient client = PulsarClient.builder().serviceUrl(SERVICE_URL).build();
Producer<byte[]> producer = client.newProducer()
.topic(TEST_TOPIC)
.producerName("producer")
.create();
// Produce 100 messages and record the message IDs
System.out.print("Producing 100 messages... ");
ArrayList<MessageId> messageIds = new ArrayList<>();
for (int i = 0; i < 100; i++)
{
// Message is the loop counter
byte[] bytes = Integer.toString(i).getBytes(StandardCharsets.UTF_8);
MessageId id = producer.send(bytes);
messageIds.add(id);
}
System.out.println("Done");
Reader<byte[]> reader = client.newReader()
.topic(TEST_TOPIC)
.startMessageId(MessageId.latest)
.startMessageIdInclusive()
.create();
System.out.println("Seeking to message 50...");
reader.seek(messageIds.get(50));
reader.hasMessageAvailable();
Message<byte[]> message = reader.readNext();
System.out.println("Got MessageId: " + message.getMessageId());
System.out.println("Got message data: " + new String(message.getData(), StandardCharsets.UTF_8));
reader.closeAsync();
producer.closeAsync();
client.closeAsync();
}
}Expected behavior
It should print "50", but it actually prints "99".
Output:
Producing 100 messages... Done
Seeking to message 50...
Got MessageId: 21:99:-1:0
Got message data: 99
Additional context
There are a few things that seem necessary to cause it to fail:
- Create Reader with
startMessageIdInclusive()andstartMessageId(MessageId.latest) - Call
hasMessageAvailable()afterseek()
If you add a short pause between seek() and hasMessageAvailable(), it produces the correct result ("50"). This part is a bit worrying since it seems to be some type of race condition. Could it cause other problems that are harder to reproduce?
This can also be reproduced by seeking to a timestamp instead of a MessageId.
In my application, I avoid this issue by creating the Reader with an initial position (startMessageId() / startMessageFromRollbackDuration()) instead of using seek() and this works fine.