Skip to content

Race condition with Reader.seek() #10671

@megfigura

Description

@megfigura

Describe the bug
Consuming a message with the Reader interface immediately after seeking can consume the wrong message. This only seems to happen in a pretty specific situation (details below). Adding a short pause fixes it, so I wonder if it could happen in other situations.

To Reproduce
IntelliJ / Maven project is attached: PulsarSeek.zip

Adjust the SERVICE_URL as needed. I am using Pulsar 2.7.1 in Docker.

public class SeekTest
{
    private static final String SERVICE_URL = "pulsar://localhost:6650";
    private static final String TEST_TOPIC = "persistent://public/default/seek_test";

    public static void main(String[] args) throws Exception
    {
        PulsarClient client = PulsarClient.builder().serviceUrl(SERVICE_URL).build();

        Producer<byte[]> producer = client.newProducer()
                .topic(TEST_TOPIC)
                .producerName("producer")
                .create();

        // Produce 100 messages and record the message IDs
        System.out.print("Producing 100 messages...  ");
        ArrayList<MessageId> messageIds = new ArrayList<>();
        for (int i = 0; i < 100; i++)
        {
            // Message is the loop counter
            byte[] bytes = Integer.toString(i).getBytes(StandardCharsets.UTF_8);

            MessageId id = producer.send(bytes);
            messageIds.add(id);
        }
        System.out.println("Done");


        Reader<byte[]> reader = client.newReader()
                .topic(TEST_TOPIC)
                .startMessageId(MessageId.latest)
                .startMessageIdInclusive()
                .create();

        System.out.println("Seeking to message 50...");
        reader.seek(messageIds.get(50));
        reader.hasMessageAvailable();
        Message<byte[]> message = reader.readNext();
        System.out.println("Got MessageId: " + message.getMessageId());
        System.out.println("Got message data: " + new String(message.getData(), StandardCharsets.UTF_8));

        reader.closeAsync();
        producer.closeAsync();
        client.closeAsync();
    }
}

Expected behavior
It should print "50", but it actually prints "99".

Output:

Producing 100 messages...  Done
Seeking to message 50...
Got MessageId: 21:99:-1:0
Got message data: 99

Additional context
There are a few things that seem necessary to cause it to fail:

  • Create Reader with startMessageIdInclusive() and startMessageId(MessageId.latest)
  • Call hasMessageAvailable() after seek()

If you add a short pause between seek() and hasMessageAvailable(), it produces the correct result ("50"). This part is a bit worrying since it seems to be some type of race condition. Could it cause other problems that are harder to reproduce?

This can also be reproduced by seeking to a timestamp instead of a MessageId.

In my application, I avoid this issue by creating the Reader with an initial position (startMessageId() / startMessageFromRollbackDuration()) instead of using seek() and this works fine.

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions