-
Notifications
You must be signed in to change notification settings - Fork 3.7k
[fix][client] Skip schema validation when sending messages to DLQ to avoid infinite loop when schema validation fails on an incoming message #24663
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
Outdated
Show resolved
Hide resolved
pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
Show resolved
Hide resolved
pulsar-broker/src/test/java/org/apache/pulsar/client/api/DeadLetterTopicTest.java
Show resolved
Hide resolved
|
@lhotari Hi, lari. I've updated the test, PTAL |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #24663 +/- ##
============================================
+ Coverage 74.01% 74.23% +0.22%
+ Complexity 33224 33209 -15
============================================
Files 1858 1885 +27
Lines 146500 146949 +449
Branches 16880 16927 +47
============================================
+ Hits 108425 109092 +667
+ Misses 29394 29158 -236
- Partials 8681 8699 +18
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
|
I ran some manual verification and things seem to work in an expected way. The message will get produced to the DLQ topic with the original schema version. It seems to be a property of the One possible reason might be that getReaderSchema() might only be accurate in AUTO_CONSUME mode: pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java Lines 436 to 445 in 1220951
|
|
I think I found an answer to the wrong schema instance when using pulsar/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MessageImpl.java Lines 395 to 413 in 1220951
For example, for Avro messages, this maps finally here: Lines 53 to 54 in 3b7bef1
It will always use the original Schema instance for reading and never refresh the "readerSchema" from the schema registry. This is why the mandatory validation before serializing the message to DLQ fails. Considering this fact, I think that that |
…avoid infinite loop when schema validation fails on an incoming message (apache#24663) Co-authored-by: Lari Hotari <lhotari@users.noreply.github.com> (cherry picked from commit 829df71)
@lhotari thank you so much for sharing this. this is very helpful and it's clear to me now. BTW, this solution is more reasonable than my initial attempt. |
|
There's one remaining detail here. This solution will only work if SchemaCompatibilityStrategy is configured as |
…avoid infinite loop when schema validation fails on an incoming message (apache#24663) Co-authored-by: Lari Hotari <lhotari@users.noreply.github.com> (cherry picked from commit 829df71) (cherry picked from commit d31657e)
…avoid infinite loop when schema validation fails on an incoming message (apache#24663) Co-authored-by: Lari Hotari <lhotari@users.noreply.github.com> (cherry picked from commit 829df71) (cherry picked from commit 19b9dad)
…avoid infinite loop when schema validation fails on an incoming message (apache#24663) Co-authored-by: Lari Hotari <lhotari@users.noreply.github.com> (cherry picked from commit 829df71) (cherry picked from commit 19b9dad)
…avoid infinite loop when schema validation fails on an incoming message (apache#24663) Co-authored-by: Lari Hotari <lhotari@users.noreply.github.com> (cherry picked from commit 829df71)
…avoid infinite loop when schema validation fails on an incoming message (apache#24663) Co-authored-by: Lari Hotari <lhotari@users.noreply.github.com> (cherry picked from commit 829df71) (cherry picked from commit d31657e)
…avoid infinite loop when schema validation fails on an incoming message (apache#24663) Co-authored-by: Lari Hotari <lhotari@users.noreply.github.com> (cherry picked from commit 829df71)
…avoid infinite loop when schema validation fails on an incoming message (apache#24663) Co-authored-by: Lari Hotari <lhotari@users.noreply.github.com>
…avoid infinite loop when schema validation fails on an incoming message (apache#24663) Co-authored-by: Lari Hotari <lhotari@users.noreply.github.com>
Fixes #24541
Motivation
When a consumer with a dead-letter policy (DLQ) encounters a message with a schema mismatch (e.g., producer sends Long but consumer expects String), and the message exceeds the maximum redelivery count, the following infinite loop occurs:
here's the example how to reproduce this issue:
and here's what's going on behind this scenario:
maxRedeliverCountindeadLetterPolicy, send todeadLetterTopicdeadLetterTopicfailed due to mismatch schema1, and result will be false inConsumerImpl.processPossibleToDLQ2maxRedeliverCountindeadLetterPolicy, send todeadLetterTopicdeadLetterTopicfailed withSchemaSerializationExceptiondue to mismatch schema1, and result will be false inConsumerImpl.processPossibleToDLQ2maxRedeliverCountindeadLetterPolicy, send todeadLetterTopicdeadLetterTopicfailed due to mismatch schema1, and result will be false inConsumerImpl.processPossibleToDLQ2Modifications
Catch SchemaSerializationException in processPossibleToDLQ: When this exception occurs during DLQ send, mark the message as "processed" (i.e., complete with true) to prevent redelivery and break the loop.
Improve logging:
Fix ambiguous debug log format from {}/{} to {}:{} for ledger and entry ID.
Unify and clarify error logs in processPossibleToDLQ to better distinguish between ProducerQueueIsFullError and other exceptions.
Verifying this change
If the box was checked, please highlight the changes
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: 3pacccccc#24