[BEAM-12076] Adds a Kafka external read transform for reading with metadata#15028
Conversation
Codecov Report
@@ Coverage Diff @@
## master #15028 +/- ##
==========================================
+ Coverage 83.77% 83.79% +0.02%
==========================================
Files 439 441 +2
Lines 59052 59509 +457
==========================================
+ Hits 49469 49866 +397
- Misses 9583 9643 +60
Continue to review full report at Codecov.
|
d27fd99 to
040b1eb
Compare
|
R: @ihji |
|
Run Python PreCommit |
|
Run Java PreCommit |
ihji
left a comment
There was a problem hiding this comment.
Thanks. Looks great overall 😄
Just a few minor comments about naming. They're okay to be ignored if you don't agree.
| private final Read<K, V> read; | ||
|
|
||
| ExternalWithMetadata(Read<K, V> read) { | ||
| super("KafkaIO.Read"); |
There was a problem hiding this comment.
TypedWithoutMetadata also sets the same ptransform name KafkaIO.Read:
TypedWithoutMetadata(Read<K, V> read) {
super("KafkaIO.Read");
this.read = read;
}
Should we use different names for easier debugging?
| * record for external transforms. TODO(BEAM-7345): use regular KafkaRecord class when Beam Schema | ||
| * inference supports generics. | ||
| */ | ||
| static class ExternalKafkaRecord { |
There was a problem hiding this comment.
I think ExternalKafkaRecord is somewhat misleading. I would prefer to use something more intuitive (ex. RowCodedKafkaRecord, KafkaRowWithMetadata, etc.)
There was a problem hiding this comment.
Changed to "ByteArrayKafkaRecord".
| // Using the transform name in the URN so that the corresponding transform can be easily | ||
| // identified. | ||
| public static final String URN_WITH_METADATA = | ||
| "beam:external:java:kafkaio:externalwithmetadata:v1"; |
There was a problem hiding this comment.
Is it necessary to use external again in externalwithmetadata? external is already in the URN.
How about typedwithmetadata or rowwithmetadata?
There was a problem hiding this comment.
Changed to "RowsWithMetadata".
| * record for external transforms. TODO(BEAM-7345): use regular KafkaRecord class when Beam Schema | ||
| * inference supports generics. | ||
| */ | ||
| static class ExternalKafkaRecord { |
There was a problem hiding this comment.
Changed to "ByteArrayKafkaRecord".
| private final Read<K, V> read; | ||
|
|
||
| ExternalWithMetadata(Read<K, V> read) { | ||
| super("KafkaIO.Read"); |
| // Using the transform name in the URN so that the corresponding transform can be easily | ||
| // identified. | ||
| public static final String URN_WITH_METADATA = | ||
| "beam:external:java:kafkaio:externalwithmetadata:v1"; |
There was a problem hiding this comment.
Changed to "RowsWithMetadata".
|
Run Java PreCommit |
|
Run Portable_Python PreCommit |
|
Run Java PreCommit |
|
Run Python PreCommit |
|
Run Java PreCommit |
|
Run Python PreCommit |
|
Run PythonDocker PreCommit |
|
Run Portable_Python PreCommit |
|
Run Java PreCommit |
|
Run Python PreCommit |
|
Run Portable_Python PreCommit |
|
Run Portable_Python PreCommit failures seems to be unrelated (Execution failed for task ':sdks:go:installDependencies'). |
|
Run Portable_Python PreCommit |
Adds a Kafka external read transform for reading with metadata.
Also updates Python Kafka wrapper to support reading with metadata.
Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:
R: @username).[BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replaceBEAM-XXXwith the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.CHANGES.mdwith noteworthy changes.See the Contributor Guide for more tips on how to make review process smoother.
ValidatesRunnercompliance status (on master branch)Examples testing status on various runners
Post-Commit SDK/Transform Integration Tests Status (on master branch)
Pre-Commit Tests Status (on master branch)
See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.
GitHub Actions Tests Status (on master branch)
See CI.md for more information about GitHub Actions CI.