Skip to content

remove usage of buffer accumulator from Kafka custom consumer#6357

Merged
dlvenable merged 1 commit intoopensearch-project:mainfrom
graytaylor0:KafkabufferImprovements
Dec 16, 2025
Merged

remove usage of buffer accumulator from Kafka custom consumer#6357
dlvenable merged 1 commit intoopensearch-project:mainfrom
graytaylor0:KafkabufferImprovements

Conversation

@graytaylor0
Copy link
Copy Markdown
Member

@graytaylor0 graytaylor0 commented Dec 15, 2025

Description

This change removes the buffer accumulator in favor of directly calling buffer.writeAll. This is because BufferAccumulator does not throw TimeoutException, and instead will do infinite back off and retry to write to the buffer during TimeoutExceptions. This causes the consumer thread processing the records to get blocked, which stops poll from getting called on the consumer, which can result in the consumer being removed from the group after the max.poll.interval.ms expires, which leads to Kafka rebalances.

Check List

  • New functionality includes testing.
  • New functionality has a documentation issue. Please link to it in this PR.
    • New functionality has javadoc added
  • Commits are signed with a real name per the DCO

By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.

Signed-off-by: Taylor Gray <tylgry@amazon.com>
@graytaylor0 graytaylor0 force-pushed the KafkabufferImprovements branch from b8ffd80 to 021f5b8 Compare December 15, 2025 22:00
} else {
bufferAccumulator.flush();
}
buffer.writeAll(eventRecords, BUFFER_WRITE_TIMEOUT);
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the "topicMetrics.getNumberOfBufferSizeOverflows().increment();" needs to be modified as well to keep track of how times we got this timeout exception. Maybe check for "SizeOverflowException" can be removed as well?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well we are still tracking SizeOverflowException only in the code below where we check the exception type. If you look at the buffer code you can see that we already have a metric for TimeoutException here (

). Under the hood the BufferAccumulator was just calling buffer.writeAll() so it's the same code path as far as metrics go

@dlvenable dlvenable merged commit 530cb13 into opensearch-project:main Dec 16, 2025
48 of 50 checks passed
@dlvenable dlvenable added this to the v2.14 milestone Dec 16, 2025
wandna-amazon pushed a commit to wandna-amazon/data-prepper that referenced this pull request Jan 8, 2026
…arch-project#6357)

Signed-off-by: Taylor Gray <tylgry@amazon.com>
Signed-off-by: Nathan Wand <wandna@amazon.com>
simonelbaz pushed a commit to simonelbaz/data-prepper that referenced this pull request Jan 31, 2026
…arch-project#6357)

Signed-off-by: Taylor Gray <tylgry@amazon.com>
Signed-off-by: Simon ELBAZ <elbazsimon9@gmail.com>
simonelbaz pushed a commit to simonelbaz/data-prepper that referenced this pull request Jan 31, 2026
simonelbaz pushed a commit to simonelbaz/data-prepper that referenced this pull request Jan 31, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants