Skip to content

Commit 0e8e54c

Browse files
authored
[YAML] Fix error handling for KafkaSchemaTransforms (#29261) (#29289)
1 parent 71e9895 commit 0e8e54c

9 files changed

Lines changed: 148 additions & 90 deletions

File tree

sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/providers/ErrorHandling.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,12 @@ public static Schema errorSchema(Schema inputSchema) {
5353
Schema.Field.of("error_message", Schema.FieldType.STRING));
5454
}
5555

56+
public static Schema errorSchemaBytes() {
57+
return Schema.of(
58+
Schema.Field.of("failed_row", Schema.FieldType.BYTES),
59+
Schema.Field.of("error_message", Schema.FieldType.STRING));
60+
}
61+
5662
@SuppressWarnings({
5763
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
5864
})
@@ -62,4 +68,14 @@ public static Row errorRecord(Schema errorSchema, Row inputRow, Throwable th) {
6268
.withFieldValue("error_message", th.getMessage())
6369
.build();
6470
}
71+
72+
@SuppressWarnings({
73+
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
74+
})
75+
public static Row errorRecord(Schema errorSchema, byte[] inputBytes, Throwable th) {
76+
return Row.withSchema(errorSchema)
77+
.withFieldValue("failed_row", inputBytes)
78+
.withFieldValue("error_message", th.getMessage())
79+
.build();
80+
}
6581
}

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformConfiguration.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.beam.sdk.schemas.AutoValueSchema;
2525
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
2626
import org.apache.beam.sdk.schemas.annotations.SchemaFieldDescription;
27+
import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
2728
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
2829

2930
/**
@@ -105,6 +106,10 @@ public static Builder builder() {
105106
/** Sets the topic from which to read. */
106107
public abstract String getTopic();
107108

109+
@SchemaFieldDescription("This option specifies whether and where to output unwritable rows.")
110+
@Nullable
111+
public abstract ErrorHandling getErrorHandling();
112+
108113
/** Builder for the {@link KafkaReadSchemaTransformConfiguration}. */
109114
@AutoValue.Builder
110115
public abstract static class Builder {
@@ -127,6 +132,8 @@ public abstract static class Builder {
127132
/** Sets the topic from which to read. */
128133
public abstract Builder setTopic(String value);
129134

135+
public abstract Builder setErrorHandling(ErrorHandling errorHandling);
136+
130137
/** Builds a {@link KafkaReadSchemaTransformConfiguration} instance. */
131138
public abstract KafkaReadSchemaTransformConfiguration build();
132139
}

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaReadSchemaTransformProvider.java

Lines changed: 61 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,9 @@
4343
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
4444
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
4545
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
46+
import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
4647
import org.apache.beam.sdk.schemas.utils.JsonUtils;
4748
import org.apache.beam.sdk.transforms.DoFn;
48-
import org.apache.beam.sdk.transforms.DoFn.FinishBundle;
49-
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
5049
import org.apache.beam.sdk.transforms.ParDo;
5150
import org.apache.beam.sdk.transforms.SerializableFunction;
5251
import org.apache.beam.sdk.transforms.SimpleFunction;
@@ -77,8 +76,6 @@ public class KafkaReadSchemaTransformProvider
7776

7877
public static final TupleTag<Row> OUTPUT_TAG = new TupleTag<Row>() {};
7978
public static final TupleTag<Row> ERROR_TAG = new TupleTag<Row>() {};
80-
public static final Schema ERROR_SCHEMA =
81-
Schema.builder().addStringField("error").addNullableByteArrayField("row").build();
8279

8380
final Boolean isTest;
8481
final Integer testTimeoutSecs;
@@ -98,6 +95,9 @@ protected Class<KafkaReadSchemaTransformConfiguration> configurationClass() {
9895
return KafkaReadSchemaTransformConfiguration.class;
9996
}
10097

98+
@SuppressWarnings({
99+
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
100+
})
101101
@Override
102102
protected SchemaTransform from(KafkaReadSchemaTransformConfiguration configuration) {
103103
final String inputSchema = configuration.getSchema();
@@ -114,14 +114,32 @@ protected SchemaTransform from(KafkaReadSchemaTransformConfiguration configurati
114114
consumerConfigs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
115115

116116
String format = configuration.getFormat();
117-
118-
if (format != null && format.equals("RAW")) {
119-
if (inputSchema != null) {
120-
throw new IllegalArgumentException(
121-
"To read from Kafka in RAW format, you can't provide a schema.");
117+
boolean handleErrors = ErrorHandling.hasOutput(configuration.getErrorHandling());
118+
119+
if ((format != null && format.equals("RAW")) || (!Strings.isNullOrEmpty(inputSchema))) {
120+
SerializableFunction<byte[], Row> valueMapper;
121+
Schema beamSchema;
122+
if (format != null && format.equals("RAW")) {
123+
if (inputSchema != null) {
124+
throw new IllegalArgumentException(
125+
"To read from Kafka in RAW format, you can't provide a schema.");
126+
}
127+
beamSchema = Schema.builder().addField("payload", Schema.FieldType.BYTES).build();
128+
valueMapper = getRawBytesToRowFunction(beamSchema);
129+
} else {
130+
assert Strings.isNullOrEmpty(configuration.getConfluentSchemaRegistryUrl())
131+
: "To read from Kafka, a schema must be provided directly or though Confluent "
132+
+ "Schema Registry, but not both.";
133+
134+
beamSchema =
135+
Objects.equals(format, "JSON")
136+
? JsonUtils.beamSchemaFromJsonSchema(inputSchema)
137+
: AvroUtils.toBeamSchema(new org.apache.avro.Schema.Parser().parse(inputSchema));
138+
valueMapper =
139+
Objects.equals(format, "JSON")
140+
? JsonUtils.getJsonBytesToRowFunction(beamSchema)
141+
: AvroUtils.getAvroBytesToRowFunction(beamSchema);
122142
}
123-
Schema rawSchema = Schema.builder().addField("payload", Schema.FieldType.BYTES).build();
124-
SerializableFunction<byte[], Row> valueMapper = getRawBytesToRowFunction(rawSchema);
125143
return new SchemaTransform() {
126144
@Override
127145
public PCollectionRowTuple expand(PCollectionRowTuple input) {
@@ -138,59 +156,23 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
138156
PCollection<byte[]> kafkaValues =
139157
input.getPipeline().apply(kafkaRead.withoutMetadata()).apply(Values.create());
140158

159+
Schema errorSchema = ErrorHandling.errorSchemaBytes();
141160
PCollectionTuple outputTuple =
142161
kafkaValues.apply(
143-
ParDo.of(new ErrorFn("Kafka-read-error-counter", valueMapper))
162+
ParDo.of(
163+
new ErrorFn(
164+
"Kafka-read-error-counter", valueMapper, errorSchema, handleErrors))
144165
.withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
145166

146-
return PCollectionRowTuple.of(
147-
"output",
148-
outputTuple.get(OUTPUT_TAG).setRowSchema(rawSchema),
149-
"errors",
150-
outputTuple.get(ERROR_TAG).setRowSchema(ERROR_SCHEMA));
151-
}
152-
};
153-
}
167+
PCollectionRowTuple outputRows =
168+
PCollectionRowTuple.of(
169+
"output", outputTuple.get(OUTPUT_TAG).setRowSchema(beamSchema));
154170

155-
if (inputSchema != null && !inputSchema.isEmpty()) {
156-
assert Strings.isNullOrEmpty(configuration.getConfluentSchemaRegistryUrl())
157-
: "To read from Kafka, a schema must be provided directly or though Confluent "
158-
+ "Schema Registry, but not both.";
159-
160-
final Schema beamSchema =
161-
Objects.equals(format, "JSON")
162-
? JsonUtils.beamSchemaFromJsonSchema(inputSchema)
163-
: AvroUtils.toBeamSchema(new org.apache.avro.Schema.Parser().parse(inputSchema));
164-
SerializableFunction<byte[], Row> valueMapper =
165-
Objects.equals(format, "JSON")
166-
? JsonUtils.getJsonBytesToRowFunction(beamSchema)
167-
: AvroUtils.getAvroBytesToRowFunction(beamSchema);
168-
return new SchemaTransform() {
169-
@Override
170-
public PCollectionRowTuple expand(PCollectionRowTuple input) {
171-
KafkaIO.Read<byte[], byte[]> kafkaRead =
172-
KafkaIO.readBytes()
173-
.withConsumerConfigUpdates(consumerConfigs)
174-
.withConsumerFactoryFn(new ConsumerFactoryWithGcsTrustStores())
175-
.withTopic(configuration.getTopic())
176-
.withBootstrapServers(configuration.getBootstrapServers());
177-
if (isTest) {
178-
kafkaRead = kafkaRead.withMaxReadTime(Duration.standardSeconds(testTimeoutSecs));
171+
PCollection<Row> errorOutput = outputTuple.get(ERROR_TAG).setRowSchema(errorSchema);
172+
if (handleErrors) {
173+
outputRows = outputRows.and(configuration.getErrorHandling().getOutput(), errorOutput);
179174
}
180-
181-
PCollection<byte[]> kafkaValues =
182-
input.getPipeline().apply(kafkaRead.withoutMetadata()).apply(Values.create());
183-
184-
PCollectionTuple outputTuple =
185-
kafkaValues.apply(
186-
ParDo.of(new ErrorFn("Kafka-read-error-counter", valueMapper))
187-
.withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
188-
189-
return PCollectionRowTuple.of(
190-
"output",
191-
outputTuple.get(OUTPUT_TAG).setRowSchema(beamSchema),
192-
"errors",
193-
outputTuple.get(ERROR_TAG).setRowSchema(ERROR_SCHEMA));
175+
return outputRows;
194176
}
195177
};
196178
} else {
@@ -259,25 +241,38 @@ public List<String> outputCollectionNames() {
259241
}
260242

261243
public static class ErrorFn extends DoFn<byte[], Row> {
262-
private SerializableFunction<byte[], Row> valueMapper;
263-
private Counter errorCounter;
244+
private final SerializableFunction<byte[], Row> valueMapper;
245+
private final Counter errorCounter;
264246
private Long errorsInBundle = 0L;
265-
266-
public ErrorFn(String name, SerializableFunction<byte[], Row> valueMapper) {
247+
private final boolean handleErrors;
248+
private final Schema errorSchema;
249+
250+
public ErrorFn(
251+
String name,
252+
SerializableFunction<byte[], Row> valueMapper,
253+
Schema errorSchema,
254+
boolean handleErrors) {
267255
this.errorCounter = Metrics.counter(KafkaReadSchemaTransformProvider.class, name);
268256
this.valueMapper = valueMapper;
257+
this.handleErrors = handleErrors;
258+
this.errorSchema = errorSchema;
269259
}
270260

271261
@ProcessElement
272262
public void process(@DoFn.Element byte[] msg, MultiOutputReceiver receiver) {
263+
Row mappedRow = null;
273264
try {
274-
receiver.get(OUTPUT_TAG).output(valueMapper.apply(msg));
265+
mappedRow = valueMapper.apply(msg);
275266
} catch (Exception e) {
267+
if (!handleErrors) {
268+
throw new RuntimeException(e);
269+
}
276270
errorsInBundle += 1;
277271
LOG.warn("Error while parsing the element", e);
278-
receiver
279-
.get(ERROR_TAG)
280-
.output(Row.withSchema(ERROR_SCHEMA).addValues(e.toString(), msg).build());
272+
receiver.get(ERROR_TAG).output(ErrorHandling.errorRecord(errorSchema, msg, e));
273+
}
274+
if (mappedRow != null) {
275+
receiver.get(OUTPUT_TAG).output(mappedRow);
281276
}
282277
}
283278

sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaWriteSchemaTransformProvider.java

Lines changed: 41 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,13 +36,14 @@
3636
import org.apache.beam.sdk.schemas.transforms.SchemaTransform;
3737
import org.apache.beam.sdk.schemas.transforms.SchemaTransformProvider;
3838
import org.apache.beam.sdk.schemas.transforms.TypedSchemaTransformProvider;
39+
import org.apache.beam.sdk.schemas.transforms.providers.ErrorHandling;
3940
import org.apache.beam.sdk.schemas.utils.JsonUtils;
4041
import org.apache.beam.sdk.transforms.DoFn;
41-
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
4242
import org.apache.beam.sdk.transforms.ParDo;
4343
import org.apache.beam.sdk.transforms.SerializableFunction;
4444
import org.apache.beam.sdk.transforms.SimpleFunction;
4545
import org.apache.beam.sdk.values.KV;
46+
import org.apache.beam.sdk.values.PCollection;
4647
import org.apache.beam.sdk.values.PCollectionRowTuple;
4748
import org.apache.beam.sdk.values.PCollectionTuple;
4849
import org.apache.beam.sdk.values.Row;
@@ -67,8 +68,6 @@ public class KafkaWriteSchemaTransformProvider
6768
public static final TupleTag<Row> ERROR_TAG = new TupleTag<Row>() {};
6869
public static final TupleTag<KV<byte[], byte[]>> OUTPUT_TAG =
6970
new TupleTag<KV<byte[], byte[]>>() {};
70-
public static final Schema ERROR_SCHEMA =
71-
Schema.builder().addStringField("error").addNullableByteArrayField("row").build();
7271
private static final Logger LOG =
7372
LoggerFactory.getLogger(KafkaWriteSchemaTransformProvider.class);
7473

@@ -100,25 +99,38 @@ static final class KafkaWriteSchemaTransform extends SchemaTransform implements
10099
}
101100

102101
public static class ErrorCounterFn extends DoFn<Row, KV<byte[], byte[]>> {
103-
private SerializableFunction<Row, byte[]> toBytesFn;
104-
private Counter errorCounter;
102+
private final SerializableFunction<Row, byte[]> toBytesFn;
103+
private final Counter errorCounter;
105104
private Long errorsInBundle = 0L;
105+
private final boolean handleErrors;
106+
private final Schema errorSchema;
106107

107-
public ErrorCounterFn(String name, SerializableFunction<Row, byte[]> toBytesFn) {
108+
public ErrorCounterFn(
109+
String name,
110+
SerializableFunction<Row, byte[]> toBytesFn,
111+
Schema errorSchema,
112+
boolean handleErrors) {
108113
this.toBytesFn = toBytesFn;
109-
errorCounter = Metrics.counter(KafkaWriteSchemaTransformProvider.class, name);
114+
this.errorCounter = Metrics.counter(KafkaWriteSchemaTransformProvider.class, name);
115+
this.handleErrors = handleErrors;
116+
this.errorSchema = errorSchema;
110117
}
111118

112119
@ProcessElement
113120
public void process(@DoFn.Element Row row, MultiOutputReceiver receiver) {
121+
KV<byte[], byte[]> output = null;
114122
try {
115-
receiver.get(OUTPUT_TAG).output(KV.of(new byte[1], toBytesFn.apply(row)));
123+
output = KV.of(new byte[1], toBytesFn.apply(row));
116124
} catch (Exception e) {
125+
if (!handleErrors) {
126+
throw new RuntimeException(e);
127+
}
117128
errorsInBundle += 1;
118129
LOG.warn("Error while processing the element", e);
119-
receiver
120-
.get(ERROR_TAG)
121-
.output(Row.withSchema(ERROR_SCHEMA).addValues(e.toString(), row.toString()).build());
130+
receiver.get(ERROR_TAG).output(ErrorHandling.errorRecord(errorSchema, row, e));
131+
}
132+
if (output != null) {
133+
receiver.get(OUTPUT_TAG).output(output);
122134
}
123135
}
124136

@@ -129,6 +141,9 @@ public void finish() {
129141
}
130142
}
131143

144+
@SuppressWarnings({
145+
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
146+
})
132147
@Override
133148
public PCollectionRowTuple expand(PCollectionRowTuple input) {
134149
Schema inputSchema = input.get("input").getSchema();
@@ -145,13 +160,17 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
145160
toBytesFn = AvroUtils.getRowToAvroBytesFunction(inputSchema);
146161
}
147162

163+
boolean handleErrors = ErrorHandling.hasOutput(configuration.getErrorHandling());
148164
final Map<String, String> configOverrides = configuration.getProducerConfigUpdates();
165+
Schema errorSchema = ErrorHandling.errorSchema(inputSchema);
149166
PCollectionTuple outputTuple =
150167
input
151168
.get("input")
152169
.apply(
153170
"Map rows to Kafka messages",
154-
ParDo.of(new ErrorCounterFn("Kafka-write-error-counter", toBytesFn))
171+
ParDo.of(
172+
new ErrorCounterFn(
173+
"Kafka-write-error-counter", toBytesFn, errorSchema, handleErrors))
155174
.withOutputTags(OUTPUT_TAG, TupleTagList.of(ERROR_TAG)));
156175

157176
outputTuple
@@ -167,8 +186,11 @@ public PCollectionRowTuple expand(PCollectionRowTuple input) {
167186
.withKeySerializer(ByteArraySerializer.class)
168187
.withValueSerializer(ByteArraySerializer.class));
169188

189+
// TODO: include output from KafkaIO Write once updated from PDone
190+
PCollection<Row> errorOutput =
191+
outputTuple.get(ERROR_TAG).setRowSchema(ErrorHandling.errorSchema(errorSchema));
170192
return PCollectionRowTuple.of(
171-
"errors", outputTuple.get(ERROR_TAG).setRowSchema(ERROR_SCHEMA));
193+
handleErrors ? configuration.getErrorHandling().getOutput() : "errors", errorOutput);
172194
}
173195
}
174196

@@ -227,6 +249,10 @@ public abstract static class KafkaWriteSchemaTransformConfiguration implements S
227249
@Nullable
228250
public abstract Map<String, String> getProducerConfigUpdates();
229251

252+
@SchemaFieldDescription("This option specifies whether and where to output unwritable rows.")
253+
@Nullable
254+
public abstract ErrorHandling getErrorHandling();
255+
230256
public static Builder builder() {
231257
return new AutoValue_KafkaWriteSchemaTransformProvider_KafkaWriteSchemaTransformConfiguration
232258
.Builder();
@@ -242,6 +268,8 @@ public abstract static class Builder {
242268

243269
public abstract Builder setProducerConfigUpdates(Map<String, String> producerConfigUpdates);
244270

271+
public abstract Builder setErrorHandling(ErrorHandling errorHandling);
272+
245273
public abstract KafkaWriteSchemaTransformConfiguration build();
246274
}
247275
}

0 commit comments

Comments
 (0)