Skip to content

Conversation

@poorbarcode
Copy link
Contributor

Motivation & Modifications

Start a PIP: Add an epoch of cursor to discard outdated reading

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: x

@poorbarcode poorbarcode self-assigned this Jun 2, 2023
@poorbarcode poorbarcode added this to the 3.1.0 milestone Jun 2, 2023
@github-actions github-actions bot added the doc-required Your PR changes impact docs and you will update later. label Jun 2, 2023
@tisonkun tisonkun changed the title PIP-269: Add an epoch of cursor to discard outdated reading [feat][doc] PIP-269. Add an epoch of cursor to discard outdated reading Jun 2, 2023
@poorbarcode
Copy link
Contributor Author

@tisonkun

Thanks for update the title of this PR

@asafm
Copy link
Contributor

asafm commented Jun 4, 2023

@jiazhai Please note - due to new PIP process, PMC members can only approve a PIP (which is a PR) once voted in mailing list. Can you undo please?

@asafm
Copy link
Contributor

asafm commented Jun 4, 2023

Per the PIP process (new) as you see here title should start with [pip][design] PIP- - this is that we can have search that only shows PIPs.

Copy link
Contributor

@asafm asafm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice.
This area is super delicate. We need someone very experienced with the code to give some proofreading here.

pip/pip-269.md Outdated
The Cursor is used to read data from ledgers, there are three scenarios for reading messages(we can call these scenarios Read-Scenario):
- **Sequentially Read**: reading some entries backward from the `read position` and updating the read position to the next position to be read when reading is finished.
- Read entries from `read position`(include), after the read is complete, set `read position` to the next position of the last entry of this read, then the next round of **Sequentially Read**.
- **Message Replay Read**: Normally consumers will acknowledge messages all they received, but sometimes the consumer can not handle messages and asks the broker to redeliver these messages(these messages are always earlier than `read position`). E.g. call `consumer.negativeAcknowledge`; close a consumer and there are some messages held by this consumer, these messages will be redelivered to other consumers. Message Dispatcher will cache the position of messages which should be redelivered, and Cursor will read messages which were cached in the memory of Message Dispatcher in the next round.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In those instances where the reading will be done from the "cache" of negatively acknowledged messages, it will update the read position?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The read position of the Cursor is a marker, reads above or equal to this marker are called Sequentially Read, and reads below this marker are called Message Replay Read(these messages have been read once by Sequentially Read before), and Specified Read does not care about this marker(in fact, Specified Read does not even care about Cursor).

In general, Message Replay Read does not modify the read position, except in the scenario A compensation mechanism of Message Replay Read below.

pip/pip-269.md Outdated

# High-Level Design

**For the Goal-1**: Cursor Epoch
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok. First, let me try to phrase your idea in my words. Tell me if I'm correct:

You're basically adding a variable to a cursor, named (my name, of course) readPositionExternalResetCount. This variable will be increase when ever the read position is about to be reset to some value, initiated by an external action like reset-subscription, rewind, etc.

Every time a read sequence begins, it starts by recording the readPositionExternalResetCount. When the reading has finished and read position is about to be updated, it verifies no external reset has been done by verifying the readPositionExternalResetCount was not increased relative to the recorded value. If it has, this means a reset action has started, which means we can drop and stop the reading sequence we have started. If not, no reset was done, continue and update read position.
Please note even this is not transactional and fully safe.

What do I suggest:

  1. Rename variable. Epoch in computer science is mostly for how many seconds has passed since 1970-01-01. See https://en.wikipedia.org/wiki/Epoch_(computing)
  2. Checking count has increased and if not update read position is not fully safe. Better guard update read position with a lock, since you're taking an action based on the count. I would take a lock on read position, and in that lock update count if needed and read position if needed.
  3. I wouldn't use an exception for flow control. It's an expensive operation, especially for latency sensitive systems. Return value like Result of some sort - can we do that?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tell me if I'm correct

Correct.

Rename variable. Epoch in computer science is mostly for how many seconds have passed since 1970-01-01. See https://en.wikipedia.org/wiki/Epoch_(computing)

Pulsar and Zookeeper have similar concepts named epoch, so epoch may be easier to understand? I have also corrected the document: epoch is also related to the change of mark deleted position(in other words, if the mark deleted position is changed by Cursor Reset, the epoch also increases)

Checking count has increased and if not update read position is not fully safe. Better guard update read position with a lock, since you're taking an action based on the count. I would take a lock on read position, and in that lock update count if needed and read position if needed.

Yes, this is consistent with the current design, the original document may not be clearly enough, I improved it

I wouldn't use an exception for flow control. It's an expensive operation, especially for latency sensitive systems. Return value like Result of some sort - can we do that?

Good suggestion, already instead of return -1.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pulsar and Zookeeper have similar concepts named epoch, so epoch may be easier to understand? I have also corrected the document: epoch is also related to the change of mark deleted position(in other words, if the mark deleted position is changed by Cursor Reset, the epoch also increases)

ZK epoch seems more like a revision or version or sequence ID.
Given your explanation, it seems you need a revision/version/sequence ID for the state of the cursor state. How about cursorStateRevisionNum?

Revision is:

The act of changing or correcting something, or the thing that has been changed or corrected: [ U ] The Senate is expected to act on tax revision.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. already change this variable to cursorStateRevisionNum

pip/pip-269.md Outdated
- Every **Sequentially Read** or **Message Replay Read** carries the current Cursor Epoch.
- Discard the reading and prevent the `read position` change if the epoch carried by the reading is smaller than the current Cursor Epoch when the reading complete

**For the Goal-2**: Two-phase of Cursor Epoch increment
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lost you here.
Why do you need to use same epoch.
I wonder why not a lock perhaps for cursor and dispatcher.
I guess I need to dive more perhaps.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lost you here.

I have rewritten the doc to make it easier to understand.

Why do you need to use same epoch.

I could not fully understand this.

I wonder why not a lock perhaps for cursor and dispatcher.
I guess I need to dive more perhaps.

  • The lock is better than synchronized because it is helpful for async logic.
  • A lock and a variable is better than only a lock because they can help correct the task already in the queue of lock.
  • Two locks (one for Cursor and one for Dispatcher) can make the lock range smaller. For example:
    • lock(Cursor), set resetting to true, release lock(Cursor). Other actions will be rejected by resetting in progress(fast fail).
    • lock(Dispatcher), do itself progress.

Since there is more than one component called Cursor Position Reset(see the section Cursor Position Reset, I have rewritten it), we can't make the lock too wide, it would increase the risk of deadlock.

pip/pip-269.md Outdated
public static class CursorEpochConflictException extends ManagedLedgerException {};
```

<strong>ManagedCursorImpl.java</strong>(maybe instead `synchronized` to a `lock`)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, let's not add more code to ManagedCursorImpl - it becomes a dumpster.
Let's try to concentrate the same domain into one class.
First, I need to understand the high level design of goal 2 to be able to give good suggestions here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good suggestion. already added a new component CursorEpoch.java

Copy link
Contributor

@asafm asafm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added responses and more comments.

@Anonymitaet
Copy link
Member

Confirmed w/ @codelipenghui, for 3.1, the code will be frozen next week, and this PR will not be merged into 3.1.

@github-actions
Copy link

github-actions bot commented Sep 4, 2023

The pr had no activity for 30 days, mark with Stale label.

@github-actions github-actions bot added the Stale label Sep 4, 2023
@Technoboy- Technoboy- modified the milestones: 3.2.0, 3.3.0 Dec 22, 2023
@zhanghaou
Copy link
Contributor

What is the current progress of this pip? @poorbarcode

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-required Your PR changes impact docs and you will update later. Stale type/PIP

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants