[KafkaIO] Only update size metrics once per batch#36077
[KafkaIO] Only update size metrics once per batch#36077kennknowles merged 1 commit intoapache:masterfrom
Conversation
There was a problem hiding this comment.
Summary of Changes
Hello @sjvanrossum, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
This pull request introduces a significant performance improvement for KafkaIO by optimizing how record size metrics are updated. Instead of updating these metrics for each individual record, the system now aggregates the necessary data within a batch and performs a single, consolidated update. This change aims to reduce CPU overhead and enhance the overall efficiency of Kafka data processing within Apache Beam.
Highlights
- Performance Optimization: Updates to record size metrics (distribution and average) are now batched, significantly reducing CPU time previously spent on unbatched updates. This change improves efficiency by updating metrics once per batch rather than per individual record.
- KafkaIOUtils Refactoring: The
MovingAverageutility class inKafkaIOUtils.javahas been enhanced with a newupdatemethod that accepts a sum and count, enabling batch updates. The existing single-quantityupdatemethod was also refined. - ReadFromKafkaDoFn Metric Collection: The
ReadFromKafkaDoFnnow accumulates record size metrics (sum, count, min, max) for an entire batch of Kafka records before performing a single update to theavgRecordSizeandrawSizesmetrics. This replaces the previous per-record update approach.
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.
| Feature | Command | Description |
|---|---|---|
| Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
| Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
| Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
| Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
|
Assigning reviewers: R: @kennknowles for label java. Note: If you would like to opt out of this review, comment Available commands:
The PR bot will only process comments in the main thread (not review comments). |
kennknowles
left a comment
There was a problem hiding this comment.
Those are some surprising numbers. Are there integration tests we can run? (by touching files in .github/trigger_files corresponding to workflows to run)
|
I see we have a dedicated precommit. I am happy to merge and we can watch for any integration test failures. |




Unbatched updates for distribution of record sizes take up a surprising amount of CPU time, >3% CPU time at 50th percentile.
Unbatched updates for average record size takes up >0.3% CPU time at 50th percentile, which isn't huge but still worth fixing while we're at it.
Performing these updates once per batch reduces CPU time at 50th percentile to 0.2% for the distribution and 0.02% for the average.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, commentfixes #<ISSUE NUMBER>instead.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.