Search before asking
Version
c834feb
Minimal reproduce step
Run the test: testDeadLetterTopicWithInitialSubscriptionAndMultiConsumers
What did you expect to see?
The test should be passed without any exceptions.
What did you see instead?
Even though the test is passed, It will throw unexpected execeptions:
2024-01-12T09:55:17,713 - ERROR - [pulsar-client-io-35-5:ConsumerImpl@2155] - Dead letter producer exception with topic: persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ
java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$ProducerBusyException: {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ' is already connected to topic","reqId":2964198371560761575, "remote":"localhost/127.0.0.1:63813", "local":"/127.0.0.1:63828"}
at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332) ~[?:?]
at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347) ~[?:?]
at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:708) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2194) ~[?:?]
at org.apache.pulsar.client.impl.ProducerImpl.lambda$connectionOpened$16(ProducerImpl.java:1846) ~[classes/:?]
at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:990) ~[?:?]
at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:974) ~[?:?]
at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2194) ~[?:?]
at org.apache.pulsar.client.impl.ClientCnx.handleError(ClientCnx.java:794) ~[classes/:?]
at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:192) ~[classes/:?]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.104.Final.jar:4.1.104.Final]
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152) ~[netty-handler-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
at java.lang.Thread.run(Thread.java:1583) ~[?:?]
Caused by: org.apache.pulsar.client.api.PulsarClientException$ProducerBusyException: {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ' is already connected to topic","reqId":2964198371560761575, "remote":"localhost/127.0.0.1:63813", "local":"/127.0.0.1:63828"}
at org.apache.pulsar.client.impl.ClientCnx.getPulsarClientException(ClientCnx.java:1318) ~[classes/:?]
at org.apache.pulsar.client.impl.ClientCnx.handleError(ClientCnx.java:795) ~[classes/:?]
... 26 more
Anything else?
The root cause is that a regression bug is introduced in #21589. The producer name will be conflicted when multiple consumers in the same topic and subscription send messages to DLQ concurrently.
Are you willing to submit a PR?
Search before asking
Version
c834feb
Minimal reproduce step
Run the test: testDeadLetterTopicWithInitialSubscriptionAndMultiConsumers
What did you expect to see?
The test should be passed without any exceptions.
What did you see instead?
Even though the test is passed, It will throw unexpected execeptions:
Anything else?
The root cause is that a regression bug is introduced in #21589. The producer name will be conflicted when multiple consumers in the same topic and subscription send messages to DLQ concurrently.
Are you willing to submit a PR?