What happened?
encountered a bug when using KafkaIO with a customized Deserializer that extends Deserializer<Row>. We built this Deserializer to deserialize byte arrays into Beam Rows with a schema we specified.
When using KafkaIO, we use withValueDeserializerAndCoder.
It runs without any issues with legacy Kafka IO ReadFromKafkaViaUnbounded. However, it fails when using ReadFromKafkaViaSDF. The error originates here.
This happens because currently ReadFromKafkaViaSDF does not set the coder even if we explicitly provide both the deserializer and the coder using withValueDeserializerAndCoder. Since no coder is explicitly set, Beam infers the type from the deserializer (
|
private Coder<V> getValueCoder(CoderRegistry coderRegistry) { |
|
return (getValueCoder() != null) |
|
? getValueCoder() |
|
: Preconditions.checkStateNotNull(getValueDeserializerProvider()).getCoder(coderRegistry); |
). This is typically not an issue when using Beam's built-in deserializers:
However, if we use a customized deserializer, such as foo_bar_Deserializer implements Deserializer<Row>, Beam will be unable to infer the coder and will throw an error.
- Legacy KafkaIO (
ReadFromKafkaViaUnbounded) sets both the deserializer and the coder based on the input.
- KafkaIO implemented using SDF (
ReadFromKafkaViaSDF) currently does not set the coder explicitly. It does not pass the coder and use it, instead relying on inferring the coder from the Deserializer, which will throw an error as there is no coder for Row in the registry for obvious reasons.
To resolve this issue, we need to explicitly set the coder based on the input.
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components
What happened?
encountered a bug when using KafkaIO with a customized Deserializer that extends
Deserializer<Row>. We built this Deserializer to deserialize byte arrays into Beam Rows with a schema we specified.When using KafkaIO, we use withValueDeserializerAndCoder.
It runs without any issues with legacy Kafka IO
ReadFromKafkaViaUnbounded. However, it fails when usingReadFromKafkaViaSDF. The error originates here.This happens because currently
ReadFromKafkaViaSDFdoes not set the coder even if we explicitly provide both the deserializer and the coder using withValueDeserializerAndCoder. Since no coder is explicitly set, Beam infers the type from the deserializer (beam/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
Lines 1940 to 1943 in 800d434
However, if we use a customized deserializer, such as
foo_bar_Deserializer implements Deserializer<Row>, Beam will be unable to infer the coder and will throw an error.ReadFromKafkaViaUnbounded) sets both the deserializer and the coder based on the input.ReadFromKafkaViaSDF) currently does not set the coder explicitly. It does not pass the coder and use it, instead relying on inferring the coder from the Deserializer, which will throw an error as there is no coder for Row in the registry for obvious reasons.To resolve this issue, we need to explicitly set the coder based on the input.
Issue Priority
Priority: 2 (default / most bugs should be filed as P2)
Issue Components