-
Notifications
You must be signed in to change notification settings - Fork 3.7k
recycle OpSendMsg after releasing payload #157
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
|
CLA is valid! |
merlimat
approved these changes
Jan 9, 2017
Contributor
merlimat
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
sijie
pushed a commit
to sijie/pulsar
that referenced
this pull request
Mar 4, 2018
hrsakai
pushed a commit
to hrsakai/pulsar
that referenced
this pull request
Dec 10, 2020
* [issue:144] Add retry logic when get connection error Signed-off-by: xiaolong.ran <rxl@apache.org> * replace fmt with log Signed-off-by: xiaolong.ran <rxl@apache.org> * code formate Signed-off-by: xiaolong.ran <rxl@apache.org> * add coverage.html to .gitignore Signed-off-by: xiaolong.ran <rxl@apache.org> * fix comments Signed-off-by: xiaolong.ran <rxl@apache.org> * code format Signed-off-by: xiaolong.ran <rxl@apache.org>
codelipenghui
added a commit
to codelipenghui/incubator-pulsar
that referenced
this pull request
Jun 27, 2022
### Motivation While create many consumers (> 10000), the IO thread run into BLOCK state for long time which will affect the message publish and subsequent consumer creation. ``` "pulsar-io-15-24" apache#195 prio=5 os_prio=31 cpu=15744.67ms elapsed=272.18s tid=0x00007faaa7183400 nid=0x19c03 waiting for monitor entry [0x0000700019642000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:207) - waiting to lock <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription) at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206) at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698) at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674) at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684) at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662) at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168) at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984) at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200) at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314) at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(java.base@17.0.3/Thread.java:833) ``` ``` "pulsar-io-15-8" apache#157 prio=5 os_prio=31 cpu=10573.05ms elapsed=314.91s tid=0x00007faa9bf6e800 nid=0x17507 runnable [0x00007000171d5000] java.lang.Thread.State: RUNNABLE at java.util.TimSort.countRunAndMakeAscending(java.base@17.0.3/TimSort.java:360) at java.util.TimSort.sort(java.base@17.0.3/TimSort.java:234) at java.util.Arrays.sort(java.base@17.0.3/Arrays.java:1307) at java.util.concurrent.CopyOnWriteArrayList.sortRange(java.base@17.0.3/CopyOnWriteArrayList.java:896) at java.util.concurrent.CopyOnWriteArrayList.sort(java.base@17.0.3/CopyOnWriteArrayList.java:888) - locked <0x00001000158237d8> (a java.lang.Object) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.addConsumer(PersistentDispatcherMultipleConsumers.java:159) - locked <0x0000100015830888> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers) at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:287) - locked <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription) at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206) at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698) at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674) at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684) at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662) at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168) at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984) at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200) at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314) at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(java.base@17.0.3/Thread.java:833) ``` ### Modification - Sort the consumer list only if the new consumer with high priority than the last element in the consumer list, this can avoid the sort operation for all the consumers without priority level (the client-side always pass 0 if priority level absent).
merlimat
pushed a commit
that referenced
this pull request
Jun 28, 2022
…16243) ### Motivation While create many consumers (> 10000), the IO thread run into BLOCK state for long time which will affect the message publish and subsequent consumer creation. ``` "pulsar-io-15-24" #195 prio=5 os_prio=31 cpu=15744.67ms elapsed=272.18s tid=0x00007faaa7183400 nid=0x19c03 waiting for monitor entry [0x0000700019642000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:207) - waiting to lock <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription) at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206) at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698) at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674) at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684) at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662) at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168) at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984) at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200) at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314) at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(java.base@17.0.3/Thread.java:833) ``` ``` "pulsar-io-15-8" #157 prio=5 os_prio=31 cpu=10573.05ms elapsed=314.91s tid=0x00007faa9bf6e800 nid=0x17507 runnable [0x00007000171d5000] java.lang.Thread.State: RUNNABLE at java.util.TimSort.countRunAndMakeAscending(java.base@17.0.3/TimSort.java:360) at java.util.TimSort.sort(java.base@17.0.3/TimSort.java:234) at java.util.Arrays.sort(java.base@17.0.3/Arrays.java:1307) at java.util.concurrent.CopyOnWriteArrayList.sortRange(java.base@17.0.3/CopyOnWriteArrayList.java:896) at java.util.concurrent.CopyOnWriteArrayList.sort(java.base@17.0.3/CopyOnWriteArrayList.java:888) - locked <0x00001000158237d8> (a java.lang.Object) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.addConsumer(PersistentDispatcherMultipleConsumers.java:159) - locked <0x0000100015830888> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers) at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:287) - locked <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription) at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206) at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698) at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674) at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684) at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662) at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168) at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984) at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200) at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314) at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(java.base@17.0.3/Thread.java:833) ``` ### Modification - Sort the consumer list only if the new consumer with high priority than the last element in the consumer list, this can avoid the sort operation for all the consumers without priority level (the client-side always pass 0 if priority level absent).
merlimat
pushed a commit
that referenced
this pull request
Jun 28, 2022
…16243) ### Motivation While create many consumers (> 10000), the IO thread run into BLOCK state for long time which will affect the message publish and subsequent consumer creation. ``` "pulsar-io-15-24" #195 prio=5 os_prio=31 cpu=15744.67ms elapsed=272.18s tid=0x00007faaa7183400 nid=0x19c03 waiting for monitor entry [0x0000700019642000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:207) - waiting to lock <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription) at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206) at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698) at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674) at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684) at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662) at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168) at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984) at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200) at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314) at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(java.base@17.0.3/Thread.java:833) ``` ``` "pulsar-io-15-8" #157 prio=5 os_prio=31 cpu=10573.05ms elapsed=314.91s tid=0x00007faa9bf6e800 nid=0x17507 runnable [0x00007000171d5000] java.lang.Thread.State: RUNNABLE at java.util.TimSort.countRunAndMakeAscending(java.base@17.0.3/TimSort.java:360) at java.util.TimSort.sort(java.base@17.0.3/TimSort.java:234) at java.util.Arrays.sort(java.base@17.0.3/Arrays.java:1307) at java.util.concurrent.CopyOnWriteArrayList.sortRange(java.base@17.0.3/CopyOnWriteArrayList.java:896) at java.util.concurrent.CopyOnWriteArrayList.sort(java.base@17.0.3/CopyOnWriteArrayList.java:888) - locked <0x00001000158237d8> (a java.lang.Object) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.addConsumer(PersistentDispatcherMultipleConsumers.java:159) - locked <0x0000100015830888> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers) at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:287) - locked <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription) at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206) at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698) at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674) at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684) at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662) at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168) at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984) at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200) at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314) at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(java.base@17.0.3/Thread.java:833) ``` ### Modification - Sort the consumer list only if the new consumer with high priority than the last element in the consumer list, this can avoid the sort operation for all the consumers without priority level (the client-side always pass 0 if priority level absent).
nicoloboschi
referenced
this pull request
in datastax/pulsar
Jun 28, 2022
…pache#16243) ### Motivation While create many consumers (> 10000), the IO thread run into BLOCK state for long time which will affect the message publish and subsequent consumer creation. ``` "pulsar-io-15-24" #195 prio=5 os_prio=31 cpu=15744.67ms elapsed=272.18s tid=0x00007faaa7183400 nid=0x19c03 waiting for monitor entry [0x0000700019642000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:207) - waiting to lock <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription) at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206) at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698) at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674) at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684) at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662) at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168) at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984) at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200) at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314) at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(java.base@17.0.3/Thread.java:833) ``` ``` "pulsar-io-15-8" #157 prio=5 os_prio=31 cpu=10573.05ms elapsed=314.91s tid=0x00007faa9bf6e800 nid=0x17507 runnable [0x00007000171d5000] java.lang.Thread.State: RUNNABLE at java.util.TimSort.countRunAndMakeAscending(java.base@17.0.3/TimSort.java:360) at java.util.TimSort.sort(java.base@17.0.3/TimSort.java:234) at java.util.Arrays.sort(java.base@17.0.3/Arrays.java:1307) at java.util.concurrent.CopyOnWriteArrayList.sortRange(java.base@17.0.3/CopyOnWriteArrayList.java:896) at java.util.concurrent.CopyOnWriteArrayList.sort(java.base@17.0.3/CopyOnWriteArrayList.java:888) - locked <0x00001000158237d8> (a java.lang.Object) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.addConsumer(PersistentDispatcherMultipleConsumers.java:159) - locked <0x0000100015830888> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers) at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:287) - locked <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription) at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206) at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698) at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674) at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684) at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662) at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168) at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984) at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200) at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314) at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(java.base@17.0.3/Thread.java:833) ``` ### Modification - Sort the consumer list only if the new consumer with high priority than the last element in the consumer list, this can avoid the sort operation for all the consumers without priority level (the client-side always pass 0 if priority level absent). (cherry picked from commit 4bdaa32)
congbobo184
pushed a commit
that referenced
this pull request
Nov 10, 2022
…16243) ### Motivation While create many consumers (> 10000), the IO thread run into BLOCK state for long time which will affect the message publish and subsequent consumer creation. ``` "pulsar-io-15-24" #195 prio=5 os_prio=31 cpu=15744.67ms elapsed=272.18s tid=0x00007faaa7183400 nid=0x19c03 waiting for monitor entry [0x0000700019642000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:207) - waiting to lock <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription) at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206) at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698) at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674) at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684) at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662) at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168) at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984) at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200) at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314) at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(java.base@17.0.3/Thread.java:833) ``` ``` "pulsar-io-15-8" #157 prio=5 os_prio=31 cpu=10573.05ms elapsed=314.91s tid=0x00007faa9bf6e800 nid=0x17507 runnable [0x00007000171d5000] java.lang.Thread.State: RUNNABLE at java.util.TimSort.countRunAndMakeAscending(java.base@17.0.3/TimSort.java:360) at java.util.TimSort.sort(java.base@17.0.3/TimSort.java:234) at java.util.Arrays.sort(java.base@17.0.3/Arrays.java:1307) at java.util.concurrent.CopyOnWriteArrayList.sortRange(java.base@17.0.3/CopyOnWriteArrayList.java:896) at java.util.concurrent.CopyOnWriteArrayList.sort(java.base@17.0.3/CopyOnWriteArrayList.java:888) - locked <0x00001000158237d8> (a java.lang.Object) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.addConsumer(PersistentDispatcherMultipleConsumers.java:159) - locked <0x0000100015830888> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers) at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:287) - locked <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription) at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206) at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698) at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674) at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684) at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662) at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168) at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984) at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200) at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314) at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(java.base@17.0.3/Thread.java:833) ``` ### Modification - Sort the consumer list only if the new consumer with high priority than the last element in the consumer list, this can avoid the sort operation for all the consumers without priority level (the client-side always pass 0 if priority level absent). (cherry picked from commit 291fedc)
congbobo184
pushed a commit
that referenced
this pull request
Nov 26, 2022
…16243) ### Motivation While create many consumers (> 10000), the IO thread run into BLOCK state for long time which will affect the message publish and subsequent consumer creation. ``` "pulsar-io-15-24" #195 prio=5 os_prio=31 cpu=15744.67ms elapsed=272.18s tid=0x00007faaa7183400 nid=0x19c03 waiting for monitor entry [0x0000700019642000] java.lang.Thread.State: BLOCKED (on object monitor) at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:207) - waiting to lock <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription) at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206) at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698) at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674) at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684) at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662) at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168) at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984) at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200) at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314) at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(java.base@17.0.3/Thread.java:833) ``` ``` "pulsar-io-15-8" #157 prio=5 os_prio=31 cpu=10573.05ms elapsed=314.91s tid=0x00007faa9bf6e800 nid=0x17507 runnable [0x00007000171d5000] java.lang.Thread.State: RUNNABLE at java.util.TimSort.countRunAndMakeAscending(java.base@17.0.3/TimSort.java:360) at java.util.TimSort.sort(java.base@17.0.3/TimSort.java:234) at java.util.Arrays.sort(java.base@17.0.3/Arrays.java:1307) at java.util.concurrent.CopyOnWriteArrayList.sortRange(java.base@17.0.3/CopyOnWriteArrayList.java:896) at java.util.concurrent.CopyOnWriteArrayList.sort(java.base@17.0.3/CopyOnWriteArrayList.java:888) - locked <0x00001000158237d8> (a java.lang.Object) at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.addConsumer(PersistentDispatcherMultipleConsumers.java:159) - locked <0x0000100015830888> (a org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers) at org.apache.pulsar.broker.service.persistent.PersistentSubscription.lambda$addConsumer$2(PersistentSubscription.java:287) - locked <0x0000100015823488> (a org.apache.pulsar.broker.service.persistent.PersistentSubscription) at org.apache.pulsar.broker.service.persistent.PersistentSubscription$$Lambda$984/0x000000080136d898.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentSubscription.addConsumer(PersistentSubscription.java:206) at org.apache.pulsar.broker.service.AbstractTopic.addConsumerToSubscription(AbstractTopic.java:513) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$15(PersistentTopic.java:782) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$983/0x000000080136cd28.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentTopic.lambda$internalSubscribe$17(PersistentTopic.java:777) at org.apache.pulsar.broker.service.persistent.PersistentTopic$$Lambda$982/0x000000080136cae0.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.persistent.PersistentTopic.internalSubscribe(PersistentTopic.java:698) at org.apache.pulsar.broker.service.persistent.PersistentTopic.subscribe(PersistentTopic.java:674) at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$12(ServerCnx.java:1078) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$869/0x0000000801316630.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniComposeStage(java.base@17.0.3/CompletableFuture.java:1187) at java.util.concurrent.CompletableFuture.thenCompose(java.base@17.0.3/CompletableFuture.java:2309) at org.apache.pulsar.broker.service.ServerCnx.lambda$handleSubscribe$15(ServerCnx.java:1042) at org.apache.pulsar.broker.service.ServerCnx$$Lambda$860/0x000000080130f970.apply(Unknown Source) at java.util.concurrent.CompletableFuture.uniApplyNow(java.base@17.0.3/CompletableFuture.java:684) at java.util.concurrent.CompletableFuture.uniApplyStage(java.base@17.0.3/CompletableFuture.java:662) at java.util.concurrent.CompletableFuture.thenApply(java.base@17.0.3/CompletableFuture.java:2168) at org.apache.pulsar.broker.service.ServerCnx.handleSubscribe(ServerCnx.java:984) at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:229) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:200) at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:162) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:327) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:314) at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:435) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:279) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:722) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:658) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:584) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:496) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:995) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run(java.base@17.0.3/Thread.java:833) ``` ### Modification - Sort the consumer list only if the new consumer with high priority than the last element in the consumer list, this can avoid the sort operation for all the consumers without priority level (the client-side always pass 0 if priority level absent). (cherry picked from commit 291fedc)
2 tasks
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Motivation
Producershould recycleOpSendMsgofpendingMessagesqueue before clearing it up, so it can be reclaimed later on.Modifications
Recycle
OpSendMsgofpendingMessagesqueue before clearing it up.Result
No functional change.