Skip to content

Conversation

@lhotari
Copy link
Member

@lhotari lhotari commented Sep 25, 2024

Fixes #23307
Fixes #21199
Fixes #15705
Fixes #21656
Fixes #20899
Fixes #20885

Implementation for PIP-379: Key_Shared Draining Hashes for Improved Message Ordering

Mailing list discussion about this PR

4.0.x docs for the implementation: https://pulsar.apache.org/docs/4.0.x/concepts-messaging/#preserving-order-of-message-delivery-by-key

Motivation

See PIP-379: Key_Shared Draining Hashes for Improved Message Ordering

PIP-379 aims to address several issues with the current implementation and introduce a more efficient mechanism for managing message ordering.

Problem:
The current Key_Shared implementation faces challenges including:

  1. Complex management of "recently joined consumers"
  2. Incomplete fulfillment of ordering guarantees
  3. Unnecessary message blocking
  4. Poor observability

PIP-379 introduces a "draining hashes" concept to efficiently manage
message ordering by tracking affected hashes when consumer assignments
change.

Benefits:

  1. Improved message ordering guarantees
  2. Reduced unnecessary message blocking
  3. Better scalability and performance
  4. Enhanced observability

This proposal would replace the existing "recently joined consumers"
mechanism, addressing its limitations while providing a more robust
solution.

Modifications

See PIP-379: Key_Shared Draining Hashes for Improved Message Ordering for high level design with simplified code examples.

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@lhotari lhotari added the category/reliability The function does not work properly in certain specific environments or failures. e.g. data lost label Sep 25, 2024
@lhotari lhotari added this to the 4.0.0 milestone Sep 25, 2024
@lhotari lhotari self-assigned this Sep 25, 2024
@github-actions github-actions bot added the doc-required Your PR changes impact docs and you will update later. label Sep 25, 2024
@lhotari lhotari force-pushed the lh-pip-379-implementation branch 2 times, most recently from 923f379 to 48b2022 Compare October 1, 2024 11:40
@lhotari lhotari marked this pull request as ready for review October 1, 2024 11:41
@lhotari lhotari force-pushed the lh-pip-379-implementation branch 2 times, most recently from 1855f33 to a6d9bf3 Compare October 1, 2024 23:51
Copy link
Contributor

@codelipenghui codelipenghui left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The change looks good.
Just left a few minor comments.

Copy link
Contributor

@nicoloboschi nicoloboschi left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, nice work

@lhotari lhotari merged commit 3d0625b into apache:master Oct 8, 2024
// deduplication for readMoreEntriesAsync calls
if (readMoreEntriesAsyncRequested.compareAndSet(false, true)) {
topic.getBrokerService().executor().execute(() -> {
readMoreEntriesAsyncRequested.set(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, Lari, sorry to bother you, But when I read the source code, I'm confused about this:

  • It uses an atomic flag readMoreEntriesAsyncRequested to "block" duplicate requests
  • But resets the flag right when the task starts, not after completion
  • Since topic.getBrokerService().executor() is a multi-threaded pool (has multiple core threads):
    This lets new requests jump in immediately
  • Multiple readMoreEntries() can run concurrently

could you explain how this work? I would be so much appreciate!

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@3pacccccc The intention isn't to prevent concurrent execution. The short comment "deduplication for readMoreEntriesAsync calls" explains the intention. In Pulsar code, readMoreEntries and readMoreEntriesAsync is called in multiple locations to ensure that all entries that were made available in a way or another would get dispatched. That's why the deduplication makes sense since it will continue to provide that guarantee while reducing the extra overhead of unnecessary concurrent or subsequent calls to readMoreEntries.
Since in most (or all) dispatcher implementations the method "readMoreEntries" itself is a synchronous method, it will serialize the execution eventually. I agree that this isn't an optimal solution how this is handled in dispatchers, but changing it wasn't the intention when the deduplication was added in this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lhotari OK, I get it, thank you so much!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

category/reliability The function does not work properly in certain specific environments or failures. e.g. data lost doc-required Your PR changes impact docs and you will update later. ready-to-test

10 participants