Skip to content

[YAML] Fix error handling for KafkaSchemaTransforms#29261

Merged
robertwb merged 3 commits intoapache:masterfrom
Polber:jkinard/fix-kafka
Nov 3, 2023
Merged

[YAML] Fix error handling for KafkaSchemaTransforms#29261
robertwb merged 3 commits intoapache:masterfrom
Polber:jkinard/fix-kafka

Conversation

@Polber
Copy link
Copy Markdown
Contributor

@Polber Polber commented Nov 1, 2023

Add proper error handling to KafkaReadSchemaTransform and KafkaWriteSchemaTransform


Thank you for your contribution! Follow this checklist to help us incorporate your contribution quickly and easily:

  • Mention the appropriate issue in your description (for example: addresses #123), if applicable. This will automatically add a link to the pull request in the issue. If you would like the issue to automatically close on merging the pull request, comment fixes #<ISSUE NUMBER> instead.
  • Update CHANGES.md with noteworthy changes.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

See the Contributor Guide for more tips on how to make review process smoother.

To check the build health, please visit https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md

GitHub Actions Tests Status (on master branch)

Build python source distribution and wheels
Python tests
Java tests
Go tests

See CI.md for more information about GitHub Actions CI or the workflows README to see a list of phrases to trigger workflows.

@Polber
Copy link
Copy Markdown
Contributor Author

Polber commented Nov 1, 2023

R: @robertwb

@github-actions
Copy link
Copy Markdown
Contributor

github-actions bot commented Nov 2, 2023

Stopping reviewer notifications for this pull request: review requested by someone other than the bot, ceding control

@Polber
Copy link
Copy Markdown
Contributor Author

Polber commented Nov 2, 2023

R: @damccorm

Copy link
Copy Markdown
Contributor

@robertwb robertwb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm... Can we refactor this so as not to be so duplicative here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a quick refactor. There is probably a more elegant solution, but I'm trying to get this submitted asap with 2.52

Signed-off-by: Jeffrey Kinard <jeff@thekinards.com>
@Polber Polber force-pushed the jkinard/fix-kafka branch from e6e97f5 to 470f17f Compare November 2, 2023 00:53
@codecov
Copy link
Copy Markdown

codecov bot commented Nov 2, 2023

Codecov Report

Merging #29261 (49d2ee7) into master (e03988e) will decrease coverage by 0.04%.
Report is 21 commits behind head on master.
The diff coverage is 0.00%.

@@            Coverage Diff             @@
##           master   #29261      +/-   ##
==========================================
- Coverage   38.31%   38.28%   -0.04%     
==========================================
  Files         688      689       +1     
  Lines      101879   101990     +111     
==========================================
+ Hits        39040    39043       +3     
- Misses      61259    61367     +108     
  Partials     1580     1580              
Flag Coverage Δ
python 29.88% <0.00%> (-0.05%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files Coverage Δ
sdks/python/apache_beam/yaml/yaml_transform.py 0.00% <0.00%> (ø)

... and 6 files with indirect coverage changes

📣 We’re building smart automated test selection to slash your CI/CD build times. Learn more

Signed-off-by: Jeffrey Kinard <jeff@thekinards.com>
@Polber Polber requested a review from robertwb November 2, 2023 05:26
PCollection<byte[]> kafkaValues =
input.getPipeline().apply(kafkaRead.withoutMetadata()).apply(Values.create());

Schema errorSchema = ErrorHandling.errorSchema(ERROR_SCHEMA);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This probably shouldn't be an ErrorSchema of ERROR_SCHEMA.

We should add a errorSchema(FieldType) overload, where bytes is the thing to pass here. I know John Casey is looking at DLQ stuff where we might want to refactor this a bit more. (I suppose we're calling YAML stable, but the formatting of the error messages may have to change.)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I did that initially, but I wan't sure if that would mess with our Row-based approach. I can refactor that and test

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#29274 (as this is in the docs, no need to cherry-pick that one).

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@robertwb How does it look now?

@Polber Polber force-pushed the jkinard/fix-kafka branch from 5ecf053 to fc607b4 Compare November 2, 2023 19:05
@Polber Polber requested a review from robertwb November 2, 2023 19:57
Signed-off-by: Jeffrey Kinard <jeff@thekinards.com>
@Polber Polber force-pushed the jkinard/fix-kafka branch from fc607b4 to 49d2ee7 Compare November 2, 2023 21:21
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants