4343import org .apache .beam .sdk .schemas .transforms .SchemaTransform ;
4444import org .apache .beam .sdk .schemas .transforms .SchemaTransformProvider ;
4545import org .apache .beam .sdk .schemas .transforms .TypedSchemaTransformProvider ;
46+ import org .apache .beam .sdk .schemas .transforms .providers .ErrorHandling ;
4647import org .apache .beam .sdk .schemas .utils .JsonUtils ;
4748import org .apache .beam .sdk .transforms .DoFn ;
48- import org .apache .beam .sdk .transforms .DoFn .FinishBundle ;
49- import org .apache .beam .sdk .transforms .DoFn .ProcessElement ;
5049import org .apache .beam .sdk .transforms .ParDo ;
5150import org .apache .beam .sdk .transforms .SerializableFunction ;
5251import org .apache .beam .sdk .transforms .SimpleFunction ;
@@ -77,8 +76,6 @@ public class KafkaReadSchemaTransformProvider
7776
7877 public static final TupleTag <Row > OUTPUT_TAG = new TupleTag <Row >() {};
7978 public static final TupleTag <Row > ERROR_TAG = new TupleTag <Row >() {};
80- public static final Schema ERROR_SCHEMA =
81- Schema .builder ().addStringField ("error" ).addNullableByteArrayField ("row" ).build ();
8279
8380 final Boolean isTest ;
8481 final Integer testTimeoutSecs ;
@@ -98,6 +95,9 @@ protected Class<KafkaReadSchemaTransformConfiguration> configurationClass() {
9895 return KafkaReadSchemaTransformConfiguration .class ;
9996 }
10097
98+ @ SuppressWarnings ({
99+ "nullness" // TODO(https://github.com/apache/beam/issues/20497)
100+ })
101101 @ Override
102102 protected SchemaTransform from (KafkaReadSchemaTransformConfiguration configuration ) {
103103 final String inputSchema = configuration .getSchema ();
@@ -114,14 +114,32 @@ protected SchemaTransform from(KafkaReadSchemaTransformConfiguration configurati
114114 consumerConfigs .put (ConsumerConfig .AUTO_OFFSET_RESET_CONFIG , autoOffsetReset );
115115
116116 String format = configuration .getFormat ();
117-
118- if (format != null && format .equals ("RAW" )) {
119- if (inputSchema != null ) {
120- throw new IllegalArgumentException (
121- "To read from Kafka in RAW format, you can't provide a schema." );
117+ boolean handleErrors = ErrorHandling .hasOutput (configuration .getErrorHandling ());
118+
119+ if ((format != null && format .equals ("RAW" )) || (!Strings .isNullOrEmpty (inputSchema ))) {
120+ SerializableFunction <byte [], Row > valueMapper ;
121+ Schema beamSchema ;
122+ if (format != null && format .equals ("RAW" )) {
123+ if (inputSchema != null ) {
124+ throw new IllegalArgumentException (
125+ "To read from Kafka in RAW format, you can't provide a schema." );
126+ }
127+ beamSchema = Schema .builder ().addField ("payload" , Schema .FieldType .BYTES ).build ();
128+ valueMapper = getRawBytesToRowFunction (beamSchema );
129+ } else {
130+ assert Strings .isNullOrEmpty (configuration .getConfluentSchemaRegistryUrl ())
131+ : "To read from Kafka, a schema must be provided directly or though Confluent "
132+ + "Schema Registry, but not both." ;
133+
134+ beamSchema =
135+ Objects .equals (format , "JSON" )
136+ ? JsonUtils .beamSchemaFromJsonSchema (inputSchema )
137+ : AvroUtils .toBeamSchema (new org .apache .avro .Schema .Parser ().parse (inputSchema ));
138+ valueMapper =
139+ Objects .equals (format , "JSON" )
140+ ? JsonUtils .getJsonBytesToRowFunction (beamSchema )
141+ : AvroUtils .getAvroBytesToRowFunction (beamSchema );
122142 }
123- Schema rawSchema = Schema .builder ().addField ("payload" , Schema .FieldType .BYTES ).build ();
124- SerializableFunction <byte [], Row > valueMapper = getRawBytesToRowFunction (rawSchema );
125143 return new SchemaTransform () {
126144 @ Override
127145 public PCollectionRowTuple expand (PCollectionRowTuple input ) {
@@ -138,59 +156,23 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
138156 PCollection <byte []> kafkaValues =
139157 input .getPipeline ().apply (kafkaRead .withoutMetadata ()).apply (Values .create ());
140158
159+ Schema errorSchema = ErrorHandling .errorSchemaBytes ();
141160 PCollectionTuple outputTuple =
142161 kafkaValues .apply (
143- ParDo .of (new ErrorFn ("Kafka-read-error-counter" , valueMapper ))
162+ ParDo .of (
163+ new ErrorFn (
164+ "Kafka-read-error-counter" , valueMapper , errorSchema , handleErrors ))
144165 .withOutputTags (OUTPUT_TAG , TupleTagList .of (ERROR_TAG )));
145166
146- return PCollectionRowTuple .of (
147- "output" ,
148- outputTuple .get (OUTPUT_TAG ).setRowSchema (rawSchema ),
149- "errors" ,
150- outputTuple .get (ERROR_TAG ).setRowSchema (ERROR_SCHEMA ));
151- }
152- };
153- }
167+ PCollectionRowTuple outputRows =
168+ PCollectionRowTuple .of (
169+ "output" , outputTuple .get (OUTPUT_TAG ).setRowSchema (beamSchema ));
154170
155- if (inputSchema != null && !inputSchema .isEmpty ()) {
156- assert Strings .isNullOrEmpty (configuration .getConfluentSchemaRegistryUrl ())
157- : "To read from Kafka, a schema must be provided directly or though Confluent "
158- + "Schema Registry, but not both." ;
159-
160- final Schema beamSchema =
161- Objects .equals (format , "JSON" )
162- ? JsonUtils .beamSchemaFromJsonSchema (inputSchema )
163- : AvroUtils .toBeamSchema (new org .apache .avro .Schema .Parser ().parse (inputSchema ));
164- SerializableFunction <byte [], Row > valueMapper =
165- Objects .equals (format , "JSON" )
166- ? JsonUtils .getJsonBytesToRowFunction (beamSchema )
167- : AvroUtils .getAvroBytesToRowFunction (beamSchema );
168- return new SchemaTransform () {
169- @ Override
170- public PCollectionRowTuple expand (PCollectionRowTuple input ) {
171- KafkaIO .Read <byte [], byte []> kafkaRead =
172- KafkaIO .readBytes ()
173- .withConsumerConfigUpdates (consumerConfigs )
174- .withConsumerFactoryFn (new ConsumerFactoryWithGcsTrustStores ())
175- .withTopic (configuration .getTopic ())
176- .withBootstrapServers (configuration .getBootstrapServers ());
177- if (isTest ) {
178- kafkaRead = kafkaRead .withMaxReadTime (Duration .standardSeconds (testTimeoutSecs ));
171+ PCollection <Row > errorOutput = outputTuple .get (ERROR_TAG ).setRowSchema (errorSchema );
172+ if (handleErrors ) {
173+ outputRows = outputRows .and (configuration .getErrorHandling ().getOutput (), errorOutput );
179174 }
180-
181- PCollection <byte []> kafkaValues =
182- input .getPipeline ().apply (kafkaRead .withoutMetadata ()).apply (Values .create ());
183-
184- PCollectionTuple outputTuple =
185- kafkaValues .apply (
186- ParDo .of (new ErrorFn ("Kafka-read-error-counter" , valueMapper ))
187- .withOutputTags (OUTPUT_TAG , TupleTagList .of (ERROR_TAG )));
188-
189- return PCollectionRowTuple .of (
190- "output" ,
191- outputTuple .get (OUTPUT_TAG ).setRowSchema (beamSchema ),
192- "errors" ,
193- outputTuple .get (ERROR_TAG ).setRowSchema (ERROR_SCHEMA ));
175+ return outputRows ;
194176 }
195177 };
196178 } else {
@@ -259,25 +241,38 @@ public List<String> outputCollectionNames() {
259241 }
260242
261243 public static class ErrorFn extends DoFn <byte [], Row > {
262- private SerializableFunction <byte [], Row > valueMapper ;
263- private Counter errorCounter ;
244+ private final SerializableFunction <byte [], Row > valueMapper ;
245+ private final Counter errorCounter ;
264246 private Long errorsInBundle = 0L ;
265-
266- public ErrorFn (String name , SerializableFunction <byte [], Row > valueMapper ) {
247+ private final boolean handleErrors ;
248+ private final Schema errorSchema ;
249+
250+ public ErrorFn (
251+ String name ,
252+ SerializableFunction <byte [], Row > valueMapper ,
253+ Schema errorSchema ,
254+ boolean handleErrors ) {
267255 this .errorCounter = Metrics .counter (KafkaReadSchemaTransformProvider .class , name );
268256 this .valueMapper = valueMapper ;
257+ this .handleErrors = handleErrors ;
258+ this .errorSchema = errorSchema ;
269259 }
270260
271261 @ ProcessElement
272262 public void process (@ DoFn .Element byte [] msg , MultiOutputReceiver receiver ) {
263+ Row mappedRow = null ;
273264 try {
274- receiver . get ( OUTPUT_TAG ). output ( valueMapper .apply (msg ) );
265+ mappedRow = valueMapper .apply (msg );
275266 } catch (Exception e ) {
267+ if (!handleErrors ) {
268+ throw new RuntimeException (e );
269+ }
276270 errorsInBundle += 1 ;
277271 LOG .warn ("Error while parsing the element" , e );
278- receiver
279- .get (ERROR_TAG )
280- .output (Row .withSchema (ERROR_SCHEMA ).addValues (e .toString (), msg ).build ());
272+ receiver .get (ERROR_TAG ).output (ErrorHandling .errorRecord (errorSchema , msg , e ));
273+ }
274+ if (mappedRow != null ) {
275+ receiver .get (OUTPUT_TAG ).output (mappedRow );
281276 }
282277 }
283278
0 commit comments