-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[pip][design] PIP-269. Add an epoch of cursor to discard outdated reading #20469
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
|
Thanks for update the title of this PR |
|
@jiazhai Please note - due to new PIP process, PMC members can only approve a PIP (which is a PR) once voted in mailing list. Can you undo please? |
|
Per the PIP process (new) as you see here title should start with |
asafm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very nice.
This area is super delicate. We need someone very experienced with the code to give some proofreading here.
pip/pip-269.md
Outdated
| The Cursor is used to read data from ledgers, there are three scenarios for reading messages(we can call these scenarios Read-Scenario): | ||
| - **Sequentially Read**: reading some entries backward from the `read position` and updating the read position to the next position to be read when reading is finished. | ||
| - Read entries from `read position`(include), after the read is complete, set `read position` to the next position of the last entry of this read, then the next round of **Sequentially Read**. | ||
| - **Message Replay Read**: Normally consumers will acknowledge messages all they received, but sometimes the consumer can not handle messages and asks the broker to redeliver these messages(these messages are always earlier than `read position`). E.g. call `consumer.negativeAcknowledge`; close a consumer and there are some messages held by this consumer, these messages will be redelivered to other consumers. Message Dispatcher will cache the position of messages which should be redelivered, and Cursor will read messages which were cached in the memory of Message Dispatcher in the next round. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In those instances where the reading will be done from the "cache" of negatively acknowledged messages, it will update the read position?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The read position of the Cursor is a marker, reads above or equal to this marker are called Sequentially Read, and reads below this marker are called Message Replay Read(these messages have been read once by Sequentially Read before), and Specified Read does not care about this marker(in fact, Specified Read does not even care about Cursor).
In general, Message Replay Read does not modify the read position, except in the scenario A compensation mechanism of Message Replay Read below.
pip/pip-269.md
Outdated
|
|
||
| # High-Level Design | ||
|
|
||
| **For the Goal-1**: Cursor Epoch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok. First, let me try to phrase your idea in my words. Tell me if I'm correct:
You're basically adding a variable to a cursor, named (my name, of course) readPositionExternalResetCount. This variable will be increase when ever the read position is about to be reset to some value, initiated by an external action like reset-subscription, rewind, etc.
Every time a read sequence begins, it starts by recording the readPositionExternalResetCount. When the reading has finished and read position is about to be updated, it verifies no external reset has been done by verifying the readPositionExternalResetCount was not increased relative to the recorded value. If it has, this means a reset action has started, which means we can drop and stop the reading sequence we have started. If not, no reset was done, continue and update read position.
Please note even this is not transactional and fully safe.
What do I suggest:
- Rename variable. Epoch in computer science is mostly for how many seconds has passed since 1970-01-01. See https://en.wikipedia.org/wiki/Epoch_(computing)
- Checking count has increased and if not update read position is not fully safe. Better guard update read position with a lock, since you're taking an action based on the count. I would take a lock on read position, and in that lock update count if needed and read position if needed.
- I wouldn't use an exception for flow control. It's an expensive operation, especially for latency sensitive systems. Return value like Result of some sort - can we do that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tell me if I'm correct
Correct.
Rename variable. Epoch in computer science is mostly for how many seconds have passed since 1970-01-01. See https://en.wikipedia.org/wiki/Epoch_(computing)
Pulsar and Zookeeper have similar concepts named epoch, so epoch may be easier to understand? I have also corrected the document: epoch is also related to the change of mark deleted position(in other words, if the mark deleted position is changed by Cursor Reset, the epoch also increases)
Checking count has increased and if not update read position is not fully safe. Better guard update read position with a lock, since you're taking an action based on the count. I would take a lock on read position, and in that lock update count if needed and read position if needed.
Yes, this is consistent with the current design, the original document may not be clearly enough, I improved it
I wouldn't use an exception for flow control. It's an expensive operation, especially for latency sensitive systems. Return value like Result of some sort - can we do that?
Good suggestion, already instead of return -1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pulsar and Zookeeper have similar concepts named epoch, so epoch may be easier to understand? I have also corrected the document: epoch is also related to the change of mark deleted position(in other words, if the mark deleted position is changed by Cursor Reset, the epoch also increases)
ZK epoch seems more like a revision or version or sequence ID.
Given your explanation, it seems you need a revision/version/sequence ID for the state of the cursor state. How about cursorStateRevisionNum?
Revision is:
The act of changing or correcting something, or the thing that has been changed or corrected: [ U ] The Senate is expected to act on tax revision.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion. already change this variable to cursorStateRevisionNum
pip/pip-269.md
Outdated
| - Every **Sequentially Read** or **Message Replay Read** carries the current Cursor Epoch. | ||
| - Discard the reading and prevent the `read position` change if the epoch carried by the reading is smaller than the current Cursor Epoch when the reading complete | ||
|
|
||
| **For the Goal-2**: Two-phase of Cursor Epoch increment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lost you here.
Why do you need to use same epoch.
I wonder why not a lock perhaps for cursor and dispatcher.
I guess I need to dive more perhaps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lost you here.
I have rewritten the doc to make it easier to understand.
Why do you need to use same epoch.
I could not fully understand this.
I wonder why not a lock perhaps for cursor and dispatcher.
I guess I need to dive more perhaps.
- The lock is better than
synchronizedbecause it is helpful for async logic. - A lock and a variable is better than only a lock because they can help correct the task already in the queue of lock.
- Two locks (one for Cursor and one for Dispatcher) can make the lock range smaller. For example:
- lock(Cursor), set
resettingtotrue, release lock(Cursor). Other actions will be rejected byresettingin progress(fast fail). - lock(Dispatcher), do itself progress.
- lock(Cursor), set
Since there is more than one component called Cursor Position Reset(see the section Cursor Position Reset, I have rewritten it), we can't make the lock too wide, it would increase the risk of deadlock.
pip/pip-269.md
Outdated
| public static class CursorEpochConflictException extends ManagedLedgerException {}; | ||
| ``` | ||
|
|
||
| <strong>ManagedCursorImpl.java</strong>(maybe instead `synchronized` to a `lock`) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, let's not add more code to ManagedCursorImpl - it becomes a dumpster.
Let's try to concentrate the same domain into one class.
First, I need to understand the high level design of goal 2 to be able to give good suggestions here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good suggestion. already added a new component CursorEpoch.java
asafm
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added responses and more comments.
|
Confirmed w/ @codelipenghui, for 3.1, the code will be frozen next week, and this PR will not be merged into 3.1. |
|
The pr had no activity for 30 days, mark with Stale label. |
|
What is the current progress of this pip? @poorbarcode |
Motivation & Modifications
Start a PIP: Add an epoch of cursor to discard outdated reading
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: x