Discussion thread: https://lists.apache.org/thread/n3drk2g2oy766qnbtx17knvtssy3tdyl
Motivation
Currently, the Pulsar-client supports setting the startMessageId for Consumer and Reader, and also supports reading the message of startMessageId position.
Assume, we have two message id 1,2,3,4 in the topic:
- When we set
earliest as startMessageId value, we can get the message of message id 1
- When we set
latest as startMessageId value, we can't get any message
Sometimes we want to read the message id 4 for the first time, we have only one approach in client:
Reader<byte[]> reader = pulsarClient.newReader()
.topic(topicName)
.subscriptionName(subscriptionName)
.startMessageId(MessageId.latest)
.startMessageIdInclusive()
.create();
reader.hasMessageAvailable();
Message<byte[]> msg = reader.readNext(1, TimeUnit.SECONDS);
Call reader.hasMessageAvailable() before reader.readNext() can get the correct message id 4, which include seek action when the startMessageIdInclusive() is enabled.
This approach is confusing. If we do this on the broker side, it will make things easier.
Goal
This PIP proposes support for reading the message of startMessageId position on the broker side:
- Add to
Consumer
- Add to
Reader
Implementation
Protocol
Add a start_message_id_inclusive field to CommandSubscribe for determine whether to read the message of startMessageId position:
message CommandSubscribe {
// some fields
// If specified, the subscription will read the message from the start message id position.
optional bool start_message_id_inclusive = 20 [default = false];
}
ManagedCursorImpl
Add a check in org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#initializeCursorPosition.
We only need to care that the startMessageId is MessageId.latest and thestart_message_id_inclusive is true, we get latest position from ledger as readPosition value, otherwise if thestart_message_id_inclusive is false, get next position of the latest position as readPosition value.
Client
The Consumer and Reader support setting the start_message_id_inclusive value to CommandSubscribe command.
Compatibility
This feature can have both backward and forward compatibility, this means the users can use any client to request any broker.
Notice that the users still can read the message of the latest position by call reader.hasMessageAvailable() before reader.readNext(), but this call can be ignored when using the new client and the new broker.
Discussion thread: https://lists.apache.org/thread/n3drk2g2oy766qnbtx17knvtssy3tdyl
Motivation
Currently, the Pulsar-client supports setting the
startMessageIdfor Consumer and Reader, and also supports reading the message ofstartMessageIdposition.Assume, we have two message id 1,2,3,4 in the topic:
earliestasstartMessageIdvalue, we can get the message of message id 1latestasstartMessageIdvalue, we can't get any messageSometimes we want to read the message id 4 for the first time, we have only one approach in client:
Call
reader.hasMessageAvailable()beforereader.readNext()can get the correct message id 4, which include seek action when thestartMessageIdInclusive()is enabled.This approach is confusing. If we do this on the broker side, it will make things easier.
Goal
This PIP proposes support for reading the message of
startMessageIdposition on the broker side:ConsumerReaderImplementation
Protocol
Add a
start_message_id_inclusivefield toCommandSubscribefor determine whether to read the message ofstartMessageIdposition:ManagedCursorImpl
Add a check in
org.apache.bookkeeper.mledger.impl.ManagedCursorImpl#initializeCursorPosition.We only need to care that the
startMessageIdisMessageId.latestand thestart_message_id_inclusiveistrue, we get latest position from ledger asreadPositionvalue, otherwise if thestart_message_id_inclusiveisfalse, get next position of the latest position asreadPositionvalue.Client
The
ConsumerandReadersupport setting thestart_message_id_inclusivevalue toCommandSubscribecommand.Compatibility
This feature can have both backward and forward compatibility, this means the users can use any client to request any broker.
Notice that the users still can read the message of the latest position by call
reader.hasMessageAvailable()beforereader.readNext(), but this call can be ignored when using the new client and the new broker.