Skip to content

[Bug] DLQ producer name conflicts when consumer sends messages to DLQ #21888

@RobertIndie

Description

@RobertIndie

Search before asking

  • I searched in the issues and found nothing similar.

Version

c834feb

Minimal reproduce step

Run the test: testDeadLetterTopicWithInitialSubscriptionAndMultiConsumers

What did you expect to see?

The test should be passed without any exceptions.

What did you see instead?

Even though the test is passed, It will throw unexpected execeptions:

2024-01-12T09:55:17,713 - ERROR - [pulsar-client-io-35-5:ConsumerImpl@2155] - Dead letter producer exception with topic: persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ
java.util.concurrent.CompletionException: org.apache.pulsar.client.api.PulsarClientException$ProducerBusyException: {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ' is already connected to topic","reqId":2964198371560761575, "remote":"localhost/127.0.0.1:63813", "local":"/127.0.0.1:63828"}
	at java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:347) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniAccept.tryFire(CompletableFuture.java:708) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2194) ~[?:?]
	at org.apache.pulsar.client.impl.ProducerImpl.lambda$connectionOpened$16(ProducerImpl.java:1846) ~[classes/:?]
	at java.util.concurrent.CompletableFuture.uniExceptionally(CompletableFuture.java:990) ~[?:?]
	at java.util.concurrent.CompletableFuture$UniExceptionally.tryFire(CompletableFuture.java:974) ~[?:?]
	at java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) ~[?:?]
	at java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:2194) ~[?:?]
	at org.apache.pulsar.client.impl.ClientCnx.handleError(ClientCnx.java:794) ~[classes/:?]
	at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:192) ~[classes/:?]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[netty-codec-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) ~[netty-codec-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152) ~[netty-handler-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) ~[netty-transport-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.104.Final.jar:4.1.104.Final]
	at java.lang.Thread.run(Thread.java:1583) ~[?:?]
Caused by: org.apache.pulsar.client.api.PulsarClientException$ProducerBusyException: {"errorMsg":"org.apache.pulsar.broker.service.BrokerServiceException$NamingException: Producer with name 'persistent://my-property/my-ns/dead-letter-topic-my-subscription-DLQ' is already connected to topic","reqId":2964198371560761575, "remote":"localhost/127.0.0.1:63813", "local":"/127.0.0.1:63828"}
	at org.apache.pulsar.client.impl.ClientCnx.getPulsarClientException(ClientCnx.java:1318) ~[classes/:?]
	at org.apache.pulsar.client.impl.ClientCnx.handleError(ClientCnx.java:795) ~[classes/:?]
	... 26 more

Anything else?

The root cause is that a regression bug is introduced in #21589. The producer name will be conflicted when multiple consumers in the same topic and subscription send messages to DLQ concurrently.

Are you willing to submit a PR?

  • I'm willing to submit a PR!

Metadata

Metadata

Assignees

Labels

release/blockerIndicate the PR or issue that should block the release until it gets resolvedtype/bugThe PR fixed a bug or issue reported a bug

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions