Skip to content

Conversation

@3pacccccc
Copy link
Contributor

Fixes #24541

Motivation

When a consumer with a dead-letter policy (DLQ) encounters a message with a schema mismatch (e.g., producer sends Long but consumer expects String), and the message exceeds the maximum redelivery count, the following infinite loop occurs:

  1. The consumer tries to send the message to the DLQ.
  2. The DLQ send fails with a SchemaSerializationException due to the schema mismatch.
  3. The consumer returns false from processPossibleToDLQ, causing the message to be redelivered.
  4. The message is redelivered, exceeds the redelivery count again, and the process repeats indefinitely.

here's the example how to reproduce this issue:

    public void sendDeadLetterTopicWithMismatchSchemaProducer() throws Exception {
        String namespace = BrokerTestUtil.newUniqueName("my-property/my-ns");
        admin.namespaces().createNamespace(namespace);
        // don't enforce schema validation
        admin.namespaces().setSchemaValidationEnforced(namespace, false);
        // set schema compatibility strategy to always compatible
        admin.namespaces().setSchemaCompatibilityStrategy(namespace, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);
        String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace
                + "/sendDeadLetterTopicWithMismatchSchemaProducer");

        // create topics
        admin.topics().createNonPartitionedTopic(topic);
        admin.topics().createNonPartitionedTopic(topic + "-DLQ");
        admin.topics().createNonPartitionedTopic(topic + "-RETRY");

        final int maxRedeliverCount = 1;
        final String subscriptionName = "my-subscription";
        Consumer<String> consumer = pulsarClient.newConsumer(Schema.AVRO(String.class))
                .topic(topic)
                .subscriptionType(SubscriptionType.Shared)
                .subscriptionName(subscriptionName)
                .deadLetterPolicy(DeadLetterPolicy.builder()
                        .deadLetterTopic(topic + "-DLQ")
                        .retryLetterTopic(topic + "-RETRY")
                        .maxRedeliverCount(maxRedeliverCount)
                        .build())
                .negativeAckRedeliveryDelay(1, TimeUnit.SECONDS)
                .messageListener((Consumer::negativeAcknowledge))
                .subscribe();

        Producer<Long> producer = pulsarClient.newProducer(Schema.AVRO(Long.class)).topic(topic).create();
        producer.send(1234567890L);
    }

and here's what's going on behind this scenario:

producer broker consumer
sends message with AVRO(Long) schema
delivery to consumer
received message and nack this message
delivery to consumer
received message and nack this message
note: infinity loop start
delivery to consumer
exceed maxRedeliverCount in deadLetterPolicy, send to deadLetterTopic
send to deadLetterTopic failed due to mismatch schema1, and result will be false in ConsumerImpl.processPossibleToDLQ2
message will redelivery to broker3
delivery to consumer
exceed maxRedeliverCount in deadLetterPolicy, send to deadLetterTopic
send to deadLetterTopic failed with SchemaSerializationExceptiondue to mismatch schema1, and result will be false in ConsumerImpl.processPossibleToDLQ2
message will redelivery to broker3
delivery to consumer
exceed maxRedeliverCount in deadLetterPolicy, send to deadLetterTopic
send to deadLetterTopic failed due to mismatch schema1, and result will be false in ConsumerImpl.processPossibleToDLQ2
message will redelivery to broker3
...

Modifications

  • Catch SchemaSerializationException in processPossibleToDLQ: When this exception occurs during DLQ send, mark the message as "processed" (i.e., complete with true) to prevent redelivery and break the loop.

  • Improve logging:

    • Fix ambiguous debug log format from {}/{} to {}:{} for ledger and entry ID.

    • Unify and clarify error logs in processPossibleToDLQ to better distinguish between ProducerQueueIsFullError and other exceptions.

Verifying this change

  • Make sure that the change passes the CI checks.

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository: 3pacccccc#24

@3pacccccc 3pacccccc requested a review from lhotari August 25, 2025 16:14
@3pacccccc
Copy link
Contributor Author

@lhotari Hi, lari. I've updated the test, PTAL

@3pacccccc 3pacccccc requested a review from lhotari August 26, 2025 06:55
@codecov-commenter
Copy link

codecov-commenter commented Aug 26, 2025

Codecov Report

❌ Patch coverage is 81.81818% with 2 lines in your changes missing coverage. Please review.
✅ Project coverage is 74.23%. Comparing base (e5e7981) to head (7617179).
⚠️ Report is 2 commits behind head on master.

Files with missing lines Patch % Lines
...va/org/apache/pulsar/client/impl/ConsumerImpl.java 80.00% 2 Missing ⚠️
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #24663      +/-   ##
============================================
+ Coverage     74.01%   74.23%   +0.22%     
+ Complexity    33224    33209      -15     
============================================
  Files          1858     1885      +27     
  Lines        146500   146949     +449     
  Branches      16880    16927      +47     
============================================
+ Hits         108425   109092     +667     
+ Misses        29394    29158     -236     
- Partials       8681     8699      +18     
Flag Coverage Δ
inttests 26.84% <45.45%> (-0.07%) ⬇️
systests 22.65% <9.09%> (?)
unittests 73.74% <81.81%> (-0.02%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...sar/client/impl/schema/AutoProduceBytesSchema.java 71.79% <100.00%> (ø)
...va/org/apache/pulsar/client/impl/ConsumerImpl.java 79.42% <80.00%> (-0.22%) ⬇️

... and 164 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • 📦 JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

@lhotari lhotari changed the title [fix][client] Fix infinite loop when sending mismatched schema messages to DLQ [fix][client] Skip schema validation when sending messages to DLQ to avoid infinite loop when schema validation fails on an incoming message Aug 26, 2025
@lhotari
Copy link
Member

lhotari commented Aug 26, 2025

I ran some manual verification and things seem to work in an expected way. The message will get produced to the DLQ topic with the original schema version. It seems to be a property of the AUTO_PRODUCE_BYTES schema or message.getReaderSchema() that it's referring to the wrong schema when it attempts to run the validate. The solution in this PR seems to be a reasonable workaround to address this issue until we find a better solution.

One possible reason might be that getReaderSchema() might only be accurate in AUTO_CONSUME mode:

public SchemaInfo getSchemaInfo() {
if (schema == null) {
return null;
}
ensureSchemaIsLoaded();
if (schema instanceof AutoConsumeSchema) {
return ((AutoConsumeSchema) schema).getSchemaInfo(getSchemaVersion());
}
return schema.getSchemaInfo();
}

@lhotari
Copy link
Member

lhotari commented Aug 26, 2025

I think I found an answer to the wrong schema instance when using AUTO_PRODUCE_BYTES with message.getReaderSchema().get() as the schema.

public Optional<Schema<?>> getReaderSchema() {
ensureSchemaIsLoaded();
if (schema == null) {
return Optional.empty();
}
byte[] schemaVersion = getSchemaVersion();
if (schemaVersion == null) {
return Optional.of(schema);
}
if (schema instanceof AutoConsumeSchema) {
return Optional.of(((AutoConsumeSchema) schema)
.atSchemaVersion(schemaVersion));
} else if (schema instanceof AbstractSchema) {
return Optional.of(((AbstractSchema<?>) schema)
.atSchemaVersion(schemaVersion));
} else {
return Optional.of(schema);
}
}

For example, for Avro messages, this maps finally here:

return new AvroReader<>(parseAvroSchema(schemaInfo.getSchemaDefinition()),
readerSchema, pojoClassLoader, jsr310ConversionEnabled);

It will always use the original Schema instance for reading and never refresh the "readerSchema" from the schema registry. This is why the mandatory validation before serializing the message to DLQ fails. Considering this fact, I think that that AutoProduceBytesSchema.setRequireSchemaValidation(false) is a very reasonable way to workaround this issue for the DLQ use case.

@lhotari lhotari merged commit 829df71 into apache:master Aug 26, 2025
53 checks passed
lhotari added a commit that referenced this pull request Aug 26, 2025
…avoid infinite loop when schema validation fails on an incoming message (#24663)

Co-authored-by: Lari Hotari <lhotari@users.noreply.github.com>
(cherry picked from commit 829df71)
lhotari added a commit that referenced this pull request Aug 26, 2025
…avoid infinite loop when schema validation fails on an incoming message (#24663)

Co-authored-by: Lari Hotari <lhotari@users.noreply.github.com>
(cherry picked from commit 829df71)
lhotari added a commit that referenced this pull request Aug 26, 2025
…avoid infinite loop when schema validation fails on an incoming message (#24663)

Co-authored-by: Lari Hotari <lhotari@users.noreply.github.com>
(cherry picked from commit 829df71)
nodece pushed a commit to ascentstream/pulsar that referenced this pull request Aug 26, 2025
…avoid infinite loop when schema validation fails on an incoming message (apache#24663)

Co-authored-by: Lari Hotari <lhotari@users.noreply.github.com>
(cherry picked from commit 829df71)
@3pacccccc
Copy link
Contributor Author

I think I found an answer to the wrong schema instance when using AUTO_PRODUCE_BYTES with message.getReaderSchema().get() as the schema.

public Optional<Schema<?>> getReaderSchema() {
ensureSchemaIsLoaded();
if (schema == null) {
return Optional.empty();
}
byte[] schemaVersion = getSchemaVersion();
if (schemaVersion == null) {
return Optional.of(schema);
}
if (schema instanceof AutoConsumeSchema) {
return Optional.of(((AutoConsumeSchema) schema)
.atSchemaVersion(schemaVersion));
} else if (schema instanceof AbstractSchema) {
return Optional.of(((AbstractSchema<?>) schema)
.atSchemaVersion(schemaVersion));
} else {
return Optional.of(schema);
}
}

For example, for Avro messages, this maps finally here:

return new AvroReader<>(parseAvroSchema(schemaInfo.getSchemaDefinition()),
readerSchema, pojoClassLoader, jsr310ConversionEnabled);

It will always use the original Schema instance for reading and never refresh the "readerSchema" from the schema registry. This is why the mandatory validation before serializing the message to DLQ fails. Considering this fact, I think that that AutoProduceBytesSchema.setRequireSchemaValidation(false) is a very reasonable way to workaround this issue for the DLQ use case.

@lhotari thank you so much for sharing this. this is very helpful and it's clear to me now. BTW, this solution is more reasonable than my initial attempt.

@lhotari
Copy link
Member

lhotari commented Aug 27, 2025

There's one remaining detail here. This solution will only work if SchemaCompatibilityStrategy is configured as ALWAYS_COMPATIBLE for the DLQ topic in the topic policy or in the namespace level. While testing this in action, it appeared that updating the SchemaCompatibilityStrategy changes at the namespace level aren't effective after a topic has been loaded. (example: this commit lhotari/pulsar-dlq-schema-issue-example@5799194 "fixed" the test app that was shared by @tonisojandu)

manas-ctds pushed a commit to datastax/pulsar that referenced this pull request Aug 28, 2025
…avoid infinite loop when schema validation fails on an incoming message (apache#24663)

Co-authored-by: Lari Hotari <lhotari@users.noreply.github.com>
(cherry picked from commit 829df71)
(cherry picked from commit d31657e)
ganesh-ctds pushed a commit to datastax/pulsar that referenced this pull request Aug 29, 2025
…avoid infinite loop when schema validation fails on an incoming message (apache#24663)

Co-authored-by: Lari Hotari <lhotari@users.noreply.github.com>
(cherry picked from commit 829df71)
(cherry picked from commit 19b9dad)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Sep 3, 2025
…avoid infinite loop when schema validation fails on an incoming message (apache#24663)

Co-authored-by: Lari Hotari <lhotari@users.noreply.github.com>
(cherry picked from commit 829df71)
(cherry picked from commit 19b9dad)
Technoboy- pushed a commit to Technoboy-/pulsar that referenced this pull request Sep 10, 2025
…avoid infinite loop when schema validation fails on an incoming message (apache#24663)

Co-authored-by: Lari Hotari <lhotari@users.noreply.github.com>
(cherry picked from commit 829df71)
srinath-ctds pushed a commit to datastax/pulsar that referenced this pull request Sep 12, 2025
…avoid infinite loop when schema validation fails on an incoming message (apache#24663)

Co-authored-by: Lari Hotari <lhotari@users.noreply.github.com>
(cherry picked from commit 829df71)
(cherry picked from commit d31657e)
nborisov pushed a commit to nborisov/pulsar that referenced this pull request Sep 12, 2025
…avoid infinite loop when schema validation fails on an incoming message (apache#24663)

Co-authored-by: Lari Hotari <lhotari@users.noreply.github.com>
(cherry picked from commit 829df71)
KannarFr pushed a commit to CleverCloud/pulsar that referenced this pull request Sep 22, 2025
…avoid infinite loop when schema validation fails on an incoming message (apache#24663)

Co-authored-by: Lari Hotari <lhotari@users.noreply.github.com>
walkinggo pushed a commit to walkinggo/pulsar that referenced this pull request Oct 8, 2025
…avoid infinite loop when schema validation fails on an incoming message (apache#24663)

Co-authored-by: Lari Hotari <lhotari@users.noreply.github.com>
@3pacccccc 3pacccccc deleted the fixSchemaException branch November 6, 2025 14:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Bug] Consumer listener gets stuck in an endless loop when sending DLQ message with mismatched schema

3 participants