Skip to content

[BEAM-12076] Adds a Kafka external read transform for reading with metadata#15028

Merged
chamikaramj merged 6 commits intoapache:masterfrom
chamikaramj:py_kafka_with_metadata
Jul 1, 2021
Merged

[BEAM-12076] Adds a Kafka external read transform for reading with metadata#15028
chamikaramj merged 6 commits intoapache:masterfrom
chamikaramj:py_kafka_with_metadata

Conversation

@chamikaramj
Copy link
Copy Markdown
Contributor

@chamikaramj chamikaramj commented Jun 17, 2021

Adds a Kafka external read transform for reading with metadata.
Also updates Python Kafka wrapper to support reading with metadata.


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Choose reviewer(s) and mention them in a comment (R: @username).
  • Format the pull request title like [BEAM-XXX] Fixes bug in ApproximateQuantiles, where you replace BEAM-XXX with the appropriate JIRA issue, if applicable. This will automatically link the pull request to the issue.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

ValidatesRunner compliance status (on master branch)

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- Build Status Build Status --- Build Status ---
Java Build Status Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Build Status
Python --- Build Status
Build Status
Build Status
Build Status
Build Status
--- Build Status ---
XLang Build Status Build Status Build Status --- Build Status ---

Examples testing status on various runners

Lang ULR Dataflow Flink Samza Spark Twister2
Go --- --- --- --- --- --- ---
Java --- Build Status
Build Status
Build Status
--- --- --- --- ---
Python --- --- --- --- --- --- ---
XLang --- --- --- --- --- --- ---

Post-Commit SDK/Transform Integration Tests Status (on master branch)

Go Java Python
Build Status Build Status Build Status
Build Status
Build Status

Pre-Commit Tests Status (on master branch)

--- Java Python Go Website Whitespace Typescript
Non-portable Build Status
Build Status
Build Status
Build Status
Build Status
Build Status Build Status Build Status Build Status
Portable --- Build Status Build Status --- --- ---

See .test-infra/jenkins/README for trigger phrase, status and link of all Jenkins jobs.

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests

See CI.md for more information about GitHub Actions CI.

@codecov
Copy link
Copy Markdown

codecov bot commented Jun 17, 2021

Codecov Report

Merging #15028 (7906f96) into master (5fb31eb) will increase coverage by 0.02%.
The diff coverage is 15.38%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master   #15028      +/-   ##
==========================================
+ Coverage   83.77%   83.79%   +0.02%     
==========================================
  Files         439      441       +2     
  Lines       59052    59509     +457     
==========================================
+ Hits        49469    49866     +397     
- Misses       9583     9643      +60     
Impacted Files Coverage Δ
...ython/apache_beam/examples/kafkataxi/kafka_taxi.py 0.00% <0.00%> (ø)
sdks/python/apache_beam/io/kafka.py 80.00% <100.00%> (+0.83%) ⬆️
sdks/python/apache_beam/io/mongodbio.py 92.88% <0.00%> (-2.76%) ⬇️
...on/apache_beam/runners/portability/spark_runner.py 67.34% <0.00%> (-2.22%) ⬇️
sdks/python/apache_beam/pvalue.py 91.35% <0.00%> (-1.67%) ⬇️
...pache_beam/runners/interactive/interactive_beam.py 74.72% <0.00%> (-1.10%) ⬇️
sdks/python/apache_beam/internal/metrics/metric.py 86.17% <0.00%> (-1.07%) ⬇️
...m/runners/portability/spark_uber_jar_job_server.py 84.32% <0.00%> (-1.06%) ⬇️
...ython/apache_beam/io/gcp/bigquery_read_internal.py 58.46% <0.00%> (-0.75%) ⬇️
...ks/python/apache_beam/runners/worker/data_plane.py 90.60% <0.00%> (-0.61%) ⬇️
... and 26 more

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 5fb31eb...7906f96. Read the comment docs.

@chamikaramj chamikaramj force-pushed the py_kafka_with_metadata branch from d27fd99 to 040b1eb Compare June 21, 2021 14:38
@chamikaramj chamikaramj changed the title Adds a schema-friendly KafkaIO read transform that reads records with metadata [BEAM-12076] Adds a Kafka external read transform for reading from Kafka with metadata Jun 21, 2021
@chamikaramj chamikaramj changed the title [BEAM-12076] Adds a Kafka external read transform for reading from Kafka with metadata [BEAM-12076] Adds a Kafka external read transform for reading with metadata Jun 21, 2021
@chamikaramj
Copy link
Copy Markdown
Contributor Author

R: @ihji

CC: @boyuanzz @TheNeuralBit @youngoli

@chamikaramj
Copy link
Copy Markdown
Contributor Author

Run Python PreCommit

@chamikaramj
Copy link
Copy Markdown
Contributor Author

Run Java PreCommit

Copy link
Copy Markdown
Contributor

@ihji ihji left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. Looks great overall 😄

Just a few minor comments about naming. They're okay to be ignored if you don't agree.

private final Read<K, V> read;

ExternalWithMetadata(Read<K, V> read) {
super("KafkaIO.Read");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TypedWithoutMetadata also sets the same ptransform name KafkaIO.Read:

TypedWithoutMetadata(Read<K, V> read) {
      super("KafkaIO.Read");
      this.read = read;
    }

Should we use different names for easier debugging?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

* record for external transforms. TODO(BEAM-7345): use regular KafkaRecord class when Beam Schema
* inference supports generics.
*/
static class ExternalKafkaRecord {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ExternalKafkaRecord is somewhat misleading. I would prefer to use something more intuitive (ex. RowCodedKafkaRecord, KafkaRowWithMetadata, etc.)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to "ByteArrayKafkaRecord".

// Using the transform name in the URN so that the corresponding transform can be easily
// identified.
public static final String URN_WITH_METADATA =
"beam:external:java:kafkaio:externalwithmetadata:v1";
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it necessary to use external again in externalwithmetadata? external is already in the URN.

How about typedwithmetadata or rowwithmetadata?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to "RowsWithMetadata".

Copy link
Copy Markdown
Contributor Author

@chamikaramj chamikaramj left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

* record for external transforms. TODO(BEAM-7345): use regular KafkaRecord class when Beam Schema
* inference supports generics.
*/
static class ExternalKafkaRecord {
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to "ByteArrayKafkaRecord".

private final Read<K, V> read;

ExternalWithMetadata(Read<K, V> read) {
super("KafkaIO.Read");
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// Using the transform name in the URN so that the corresponding transform can be easily
// identified.
public static final String URN_WITH_METADATA =
"beam:external:java:kafkaio:externalwithmetadata:v1";
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed to "RowsWithMetadata".

@chamikaramj
Copy link
Copy Markdown
Contributor Author

Run Java PreCommit

@chamikaramj
Copy link
Copy Markdown
Contributor Author

Run Portable_Python PreCommit

@chamikaramj
Copy link
Copy Markdown
Contributor Author

Run Java PreCommit

@chamikaramj
Copy link
Copy Markdown
Contributor Author

Run Python PreCommit

@chamikaramj
Copy link
Copy Markdown
Contributor Author

Run Java PreCommit

@chamikaramj
Copy link
Copy Markdown
Contributor Author

Run Python PreCommit

@chamikaramj
Copy link
Copy Markdown
Contributor Author

Run PythonDocker PreCommit

@chamikaramj
Copy link
Copy Markdown
Contributor Author

Run Portable_Python PreCommit

@chamikaramj
Copy link
Copy Markdown
Contributor Author

Run Java PreCommit

@chamikaramj
Copy link
Copy Markdown
Contributor Author

Run Python PreCommit

@chamikaramj
Copy link
Copy Markdown
Contributor Author

Run Portable_Python PreCommit

@chamikaramj
Copy link
Copy Markdown
Contributor Author

Run Portable_Python PreCommit failures seems to be unrelated (Execution failed for task ':sdks:go:installDependencies').

@chamikaramj
Copy link
Copy Markdown
Contributor Author

Run Portable_Python PreCommit

@chamikaramj chamikaramj merged commit d6088d3 into apache:master Jul 1, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants