Skip to content

[fix][broker] Fix consumer stops receiving messages when with large backlogs processing#22454

Merged
Technoboy- merged 5 commits into
apache:masterfrom
Technoboy-:fix-22435
Apr 8, 2024
Merged

[fix][broker] Fix consumer stops receiving messages when with large backlogs processing#22454
Technoboy- merged 5 commits into
apache:masterfrom
Technoboy-:fix-22435

Conversation

@Technoboy-

Copy link
Copy Markdown
Contributor

Fixes #22435

Motivation

When the backlog size > managedLedgerCursorBackloggedThreshold (default 1000), the cursor will be set inactive first. so this will cause the non-durable cursor not to add to the waiting cursor. This leads to the consumer stopping to receive messages.

private void checkBackloggedCursor(PersistentSubscription subscription) {
// activate caught up cursor which include consumers
if (!subscription.getConsumers().isEmpty()
&& subscription.getCursor().getNumberOfEntries() < backloggedCursorThresholdEntries) {
subscription.getCursor().setActive();
} else {
subscription.getCursor().setInactive();
}
}

public void addWaitingCursor(ManagedCursorImpl cursor) {
if (cursor instanceof NonDurableCursorImpl) {
if (cursor.isActive()) {
this.waitingCursors.add(cursor);
}
} else {
this.waitingCursors.add(cursor);
}
}

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

@github-actions github-actions Bot added the doc-not-needed Your PR changes do not impact docs label Apr 7, 2024
@lhotari

lhotari commented Apr 8, 2024

Copy link
Copy Markdown
Member

Great work on this fix @Technoboy- ! Just one minor comment in #22454 (comment) .

@Technoboy- Technoboy- self-assigned this Apr 8, 2024
@Technoboy- Technoboy- added this to the 3.3.0 milestone Apr 8, 2024

@lhotari lhotari left a comment

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Good work @Technoboy

@Technoboy- Technoboy- merged commit 57a616e into apache:master Apr 8, 2024
Technoboy- added a commit to Technoboy-/pulsar that referenced this pull request Apr 8, 2024
Technoboy- added a commit that referenced this pull request Apr 8, 2024

if (dispatcher != null && dispatcher.getConsumers().isEmpty()) {
deactivateCursor();
topic.getManagedLedger().removeWaitingCursor(cursor);

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just wondering about this removal now after the PR has been merged. It seems that whatever isResetCursor means that it would be skipped in that case. @Technoboy- is that a problem?

mukesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Apr 15, 2024
mukesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Apr 17, 2024
mukesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Apr 17, 2024
mukesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Apr 19, 2024
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Apr 23, 2024
hanmz pushed a commit to hanmz/pulsar that referenced this pull request Feb 12, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] Consumer Stops Receiving Messages with Large Backlogs Post-Processing

4 participants