-
Notifications
You must be signed in to change notification settings - Fork 3.7k
Description
Search before reporting
- I searched in the issues and found nothing similar.
Read release policy
- I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.
User environment
- Broker version: 3.3.7
- Client library type: Java
- Client library version: 3.3.7
- Client Java version: 17
Issue Description
This is an issue spun off from this original comment from another issue: #20635 (comment)
Even though that issue got resolved, we are still facing a problem where if
- we are using ALWAYS_COMPATIBLE schema compatibility on topic
- and an Avro message is sent with an incompatible message that fails to deserialize to a Specific Avro Record
then the Pulsar client listener tries to send the message to DLQ topic, but schema verification fails on the produce. This causes a reset of the message handling and another attempt and failure to deserialize and another attempt and failure to send to the DLQ topic. This will loop endlessly, which means this listener thread is effectively blocked from accepting further traffic.
Error messages
Exception from the loop produced with the example project linked below:
14:12:23.152 [pulsar-client-internal-14-1] WARN o.a.pulsar.client.impl.ConsumerImpl - [persistent://public/default/test-topic] [test-subscription] [Lr0Ki] Failed to send DLQ message to dlq-topic for message id 3:0:-1
org.apache.pulsar.client.api.SchemaSerializationException: org.apache.avro.AvroTypeException: Found string, expecting long
at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:92)
at org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:71)
at org.apache.pulsar.client.impl.schema.AbstractStructSchema$WrappedVersionedSchema.decode(AbstractStructSchema.java:157)
at org.apache.pulsar.client.api.Schema.validate(Schema.java:63)
at org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema.encode(AutoProduceBytesSchema.java:80)
at org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema.encode(AutoProduceBytesSchema.java:32)
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.lambda$beforeSend$1(TypedMessageBuilderImpl.java:80)
at java.base/java.util.Optional.orElseGet(Optional.java:364)
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.beforeSend(TypedMessageBuilderImpl.java:79)
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.getMessage(TypedMessageBuilderImpl.java:270)
at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.sendAsync(TypedMessageBuilderImpl.java:113)
at org.apache.pulsar.client.impl.ConsumerImpl.lambda$processPossibleToDLQ$50(ConsumerImpl.java:2229)
at java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:718)
at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.avro.AvroTypeException: Found string, expecting long
at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
at org.apache.avro.io.ResolvingDecoder.readLong(ResolvingDecoder.java:162)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:198)
at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:181)
at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:308)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:168)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
at org.apache.pulsar.client.impl.schema.reader.AvroReader.read(AvroReader.java:78)
at org.apache.pulsar.client.api.schema.SchemaReader.read(SchemaReader.java:40)
at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:89)
... 17 common frames omitted
Reproducing the issue
Here is a repository that recreates this issue: https://github.com/tonisojandu/pulsar-dlq-schema-issue-example
Additional information
For our purposes, we have fixed this problem for now with this change in our fork of the Pulsar client:
#21294
However, I have understood from direct communications that it is probably not the correct approach for all use cases.
Are you willing to submit a PR?
- I'm willing to submit a PR!