Skip to content

[Bug] Consumer listener gets stuck in an endless loop when sending DLQ message with mismatched schema #24541

@tonisojandu

Description

@tonisojandu

Search before reporting

  • I searched in the issues and found nothing similar.

Read release policy

  • I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.

User environment

  • Broker version: 3.3.7
  • Client library type: Java
  • Client library version: 3.3.7
  • Client Java version: 17

Issue Description

This is an issue spun off from this original comment from another issue: #20635 (comment)

Even though that issue got resolved, we are still facing a problem where if

  • we are using ALWAYS_COMPATIBLE schema compatibility on topic
  • and an Avro message is sent with an incompatible message that fails to deserialize to a Specific Avro Record

then the Pulsar client listener tries to send the message to DLQ topic, but schema verification fails on the produce. This causes a reset of the message handling and another attempt and failure to deserialize and another attempt and failure to send to the DLQ topic. This will loop endlessly, which means this listener thread is effectively blocked from accepting further traffic.

Error messages

Exception from the loop produced with the example project linked below:

14:12:23.152 [pulsar-client-internal-14-1] WARN  o.a.pulsar.client.impl.ConsumerImpl - [persistent://public/default/test-topic] [test-subscription] [Lr0Ki] Failed to send DLQ message to dlq-topic for message id 3:0:-1
org.apache.pulsar.client.api.SchemaSerializationException: org.apache.avro.AvroTypeException: Found string, expecting long
        at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:92)
        at org.apache.pulsar.client.impl.schema.AbstractStructSchema.decode(AbstractStructSchema.java:71)
        at org.apache.pulsar.client.impl.schema.AbstractStructSchema$WrappedVersionedSchema.decode(AbstractStructSchema.java:157)
        at org.apache.pulsar.client.api.Schema.validate(Schema.java:63)
        at org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema.encode(AutoProduceBytesSchema.java:80)
        at org.apache.pulsar.client.impl.schema.AutoProduceBytesSchema.encode(AutoProduceBytesSchema.java:32)
        at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.lambda$beforeSend$1(TypedMessageBuilderImpl.java:80)
        at java.base/java.util.Optional.orElseGet(Optional.java:364)
        at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.beforeSend(TypedMessageBuilderImpl.java:79)
        at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.getMessage(TypedMessageBuilderImpl.java:270)
        at org.apache.pulsar.client.impl.TypedMessageBuilderImpl.sendAsync(TypedMessageBuilderImpl.java:113)
        at org.apache.pulsar.client.impl.ConsumerImpl.lambda$processPossibleToDLQ$50(ConsumerImpl.java:2229)
        at java.base/java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:718)
        at java.base/java.util.concurrent.CompletableFuture$Completion.run(CompletableFuture.java:482)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at org.apache.pulsar.shade.io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.avro.AvroTypeException: Found string, expecting long
        at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:308)
        at org.apache.avro.io.parsing.Parser.advance(Parser.java:86)
        at org.apache.avro.io.ResolvingDecoder.readLong(ResolvingDecoder.java:162)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:198)
        at org.apache.avro.specific.SpecificDatumReader.readField(SpecificDatumReader.java:181)
        at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:308)
        at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:248)
        at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:168)
        at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:180)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:161)
        at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:154)
        at org.apache.pulsar.client.impl.schema.reader.AvroReader.read(AvroReader.java:78)
        at org.apache.pulsar.client.api.schema.SchemaReader.read(SchemaReader.java:40)
        at org.apache.pulsar.client.impl.schema.reader.AbstractMultiVersionReader.read(AbstractMultiVersionReader.java:89)
        ... 17 common frames omitted

Reproducing the issue

Here is a repository that recreates this issue: https://github.com/tonisojandu/pulsar-dlq-schema-issue-example

Additional information

For our purposes, we have fixed this problem for now with this change in our fork of the Pulsar client:
#21294

However, I have understood from direct communications that it is probably not the correct approach for all use cases.

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

No one assigned

    Labels

    triage/lhotari/importantlhotari's triaging label for important issues or PRstype/bugThe PR fixed a bug or issue reported a bug

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions