Skip to content

[FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.#19972

Merged
tisonkun merged 4 commits into
apache:masterfrom
streamnative:feature/new-seek-lifecycle
Aug 12, 2022
Merged

[FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint.#19972
tisonkun merged 4 commits into
apache:masterfrom
streamnative:feature/new-seek-lifecycle

Conversation

@syhily

@syhily syhily commented Jun 15, 2022

Copy link
Copy Markdown
Contributor

What is the purpose of the change

Brief change log

This task modifies the flink-connector-pulsar module, adds some new mechanisms to set the initial consuming position.

  • Change StartCursor, add new useful methods, and deprecate the confused fromMessageTime() method.
  • Change StopCursor, and add new useful methods.
  • Introduce a new SplitAssigner for assigning the splits among the Pulsar readers. Make the partition assignment logic clear and testable.
  • Change the start position seeking mechanism from Pulsar consumer API to Pulsar admin API. Don't reset the start position when the topic has a subscription.

Verifying this change

This change is already covered by existing tests, such as:

  • PulsarSourceITCase
  • PulsarSourceEnumeratorTest
  • PulsarOrderedPartitionSplitReaderTest

We add new tests for new partition assign logic:

  • NormalSplitAssignerTest
  • SharedSplitAssignerTest

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (yes)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduces a new feature? (yes)
  • If yes, how is the feature documented? (docs)

@flinkbot

flinkbot commented Jun 15, 2022

Copy link
Copy Markdown
Collaborator

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@syhily syhily changed the title [FLINK-27399][Connector/Pulsar] Modify start cursor and stop cursor, change initial position setting logic. [FLINK-27399][Connector/Pulsar] Change initial consuming position setting logic for better handle the checkpoint. Jun 15, 2022
Comment thread flink-python/pyflink/datastream/connectors/pulsar.py Outdated
@syhily syhily force-pushed the feature/new-seek-lifecycle branch 11 times, most recently from cbf393b to 1ae927b Compare July 8, 2022 18:34

@imaffe imaffe left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I reviewed until the NormlSplitAssigner. Will continue review tomorrow.

Comment thread docs/content.zh/docs/connectors/datastream/pulsar.md
@MartijnVisser MartijnVisser requested a review from PatrickRen July 11, 2022 07:31
@MartijnVisser

Copy link
Copy Markdown
Contributor

@PatrickRen Can you also have a look at this PR? I've understood that this PR should help resolve this blocker test stability FLINK-26721

@syhily syhily force-pushed the feature/new-seek-lifecycle branch from 1ae927b to 12049bf Compare July 11, 2022 22:19
@MartijnVisser MartijnVisser requested a review from tisonkun July 12, 2022 13:31
@MartijnVisser

Copy link
Copy Markdown
Contributor

@tisonkun I've understood from @wuchong that you might also want to help/have a look at this PR, therefore I've tagged you.

@tisonkun

Copy link
Copy Markdown
Member

@MartijnVisser Thank you. I'll review the patch in this week. Actually I ever try to request myself as a reviewer but forget several times >_<

@tisonkun tisonkun left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally looks good. The effective changes are:

  1. Changing SplitAssignmentState to SplitAssigner
  2. Create subscription via PulsarAdmin API instead of consumer API and seek.

These significant changes are reasonable from my perspective.

Comments inline.

@MartijnVisser

Copy link
Copy Markdown
Contributor

@syhily Can you resolve the latest review comments?

@syhily syhily force-pushed the feature/new-seek-lifecycle branch from 12049bf to 21eda6f Compare July 28, 2022 08:22
@syhily syhily requested review from a49a and imaffe and removed request for imaffe August 11, 2022 19:16
@syhily syhily force-pushed the feature/new-seek-lifecycle branch from 2baa322 to f807c4a Compare August 12, 2022 03:24
@tisonkun tisonkun requested review from tisonkun and removed request for a49a August 12, 2022 03:38
@syhily syhily force-pushed the feature/new-seek-lifecycle branch from f807c4a to 938be57 Compare August 12, 2022 05:18
@syhily syhily force-pushed the feature/new-seek-lifecycle branch from 938be57 to 7ea6b3c Compare August 12, 2022 07:31
@syhily

syhily commented Aug 12, 2022

Copy link
Copy Markdown
Contributor Author

@tisonkun Finally, the ci turns green.

@tisonkun tisonkun merged commit 18d21a0 into apache:master Aug 12, 2022
@syhily syhily deleted the feature/new-seek-lifecycle branch August 13, 2022 06:15
syhily added a commit to streamnative/flink that referenced this pull request Aug 13, 2022
…ting logic for better handle the checkpoint. (apache#19972)

* Change the initial start cursor and stop cursor to better handle the consuming behaviors.
* Create the initial subscription instead seek every time. This should fix the wrong position setting.
* Fix the wrong stop cursor, make sure it stops at the correct space
* Drop Consumer.seek() for apache/pulsar#16171

(cherry picked from commit 18d21a0)
syhily added a commit to streamnative/flink that referenced this pull request Aug 13, 2022
…ting logic for better handle the checkpoint. (apache#19972)

* Change the initial start cursor and stop cursor to better handle the consuming behaviors.
* Create the initial subscription instead seek every time. This should fix the wrong position setting.
* Fix the wrong stop cursor, make sure it stops at the correct space
* Drop Consumer.seek() for apache/pulsar#16171
syhily added a commit to streamnative/flink that referenced this pull request Aug 13, 2022
…ting logic for better handle the checkpoint. (apache#19972)

* Change the initial start cursor and stop cursor to better handle the consuming behaviors.
* Create the initial subscription instead seek every time. This should fix the wrong position setting.
* Fix the wrong stop cursor, make sure it stops at the correct space
* Drop Consumer.seek() for apache/pulsar#16171
syhily added a commit to streamnative/flink that referenced this pull request Aug 13, 2022
…ting logic for better handle the checkpoint. (apache#19972)

* Change the initial start cursor and stop cursor to better handle the consuming behaviors.
* Create the initial subscription instead seek every time. This should fix the wrong position setting.
* Fix the wrong stop cursor, make sure it stops at the correct space
* Drop Consumer.seek() for apache/pulsar#16171
syhily added a commit to streamnative/flink that referenced this pull request Aug 14, 2022
…ting logic for better handle the checkpoint. (apache#19972)

* Change the initial start cursor and stop cursor to better handle the consuming behaviors.
* Create the initial subscription instead seek every time. This should fix the wrong position setting.
* Fix the wrong stop cursor, make sure it stops at the correct space
* Drop Consumer.seek() for apache/pulsar#16171
tisonkun pushed a commit that referenced this pull request Aug 14, 2022
…ting logic for better handle the checkpoint. (#19972) (#20565)

* Change the initial start cursor and stop cursor to better handle the consuming behaviors.
* Create the initial subscription instead seek every time. This should fix the wrong position setting.
* Fix the wrong stop cursor, make sure it stops at the correct space
* Drop Consumer.seek() for apache/pulsar#16171
tisonkun pushed a commit that referenced this pull request Aug 14, 2022
…ting logic for better handle the checkpoint. (#19972) (#20564)

* Change the initial start cursor and stop cursor to better handle the consuming behaviors.
* Create the initial subscription instead seek every time. This should fix the wrong position setting.
* Fix the wrong stop cursor, make sure it stops at the correct space
* Drop Consumer.seek() for apache/pulsar#16171
huangxiaofeng10047 pushed a commit to huangxiaofeng10047/flink that referenced this pull request Nov 3, 2022
…ting logic for better handle the checkpoint. (apache#19972)

* Change the initial start cursor and stop cursor to better handle the consuming behaviors.
* Create the initial subscription instead seek every time. This should fix the wrong position setting.
* Fix the wrong stop cursor, make sure it stops at the correct space
* Drop Consumer.seek() for apache/pulsar#16171
Rodger-zk pushed a commit to Rodger-zk/Rodger-zk-bkbase-flink that referenced this pull request Dec 3, 2024
…ting logic for better handle the checkpoint. (apache#19972)

* Change the initial start cursor and stop cursor to better handle the consuming behaviors.
* Create the initial subscription instead seek every time. This should fix the wrong position setting.
* Fix the wrong stop cursor, make sure it stops at the correct space
* Drop Consumer.seek() for apache/pulsar#16171
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

7 participants