Skip to content

PIP-150: Support read the message of startMessageId position on the broker side #14883

@nodece

Description

@nodece

Discussion thread: https://lists.apache.org/thread/n3drk2g2oy766qnbtx17knvtssy3tdyl

Motivation

Currently, the Pulsar-client supports setting the startMessageId for Consumer and Reader, and also supports reading the message of startMessageId position.

Assume, we have two message id 1,2,3,4 in the topic:

  • When we set earliest as startMessageId value, we can get the message of message id 1
  • When we set latest as startMessageId value, we can't get any message

Sometimes we want to read the message id 4 for the first time, we have only one approach in client:

 Reader<byte[]> reader = pulsarClient.newReader()
                .topic(topicName)
                .subscriptionName(subscriptionName)
                .startMessageId(MessageId.latest)
                .startMessageIdInclusive()
                .create();

reader.hasMessageAvailable();
Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS);

Call reader.hasMessageAvailable() before reader.readNext() can get the correct message id 4, which include seek action when the startMessageIdInclusive() is enabled.

This approach is confusing. If we do this on the broker side, it will make things easier.

Goal

This PIP proposes support for reading the message of startMessageId position on the broker side:

  • Add to Consumer
  • Add to Reader

Implementation

Protocol

Add a start_message_id_inclusive field to CommandSubscribe for determine whether to read the message of startMessageId position:

message CommandSubscribe {
    // some fields

    // If specified, the subscription will read the message from the start message id position.
    optional bool start_message_id_inclusive = 20 [default = false];
}

ManagedCursorImpl

Add a check in org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#initializeCursorPosition.

We only need to care that the startMessageId is MessageId.latest and thestart_message_id_inclusive is true, we get latest position from ledger as readPosition value, otherwise if thestart_message_id_inclusive is false, get next position of the latest position as readPosition value.

Client

The Consumer and Reader support setting the start_message_id_inclusive value to CommandSubscribe command.

Compatibility

This feature can have both backward and forward compatibility, this means the users can use any client to request any broker.

Notice that the users still can read the message of the latest position by call reader.hasMessageAvailable() before reader.readNext(), but this call can be ignored when using the new client and the new broker.

Metadata

Metadata

Assignees

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions