Search before asking
Version
3.0.0
Minimal reproduce step
2023-07-28T09:13:34,703+0800 [pulsar-io-12-13] WARN org.apache.pulsar.broker.service.ServerCnx - [/10.65.17.39:65308] Got exception java.lang.ArrayIndexOutOfBoundsException: Index 2 out of bounds for length 2 at java.base/java.util.concurrent.CopyOnWriteArrayList.elementAt(CopyOnWriteArrayList.java:385) at java.base/java.util.concurrent.CopyOnWriteArrayList.get(CopyOnWriteArrayList.java:398) at org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers.getNextConsumerFromSameOrLowerLevel(AbstractDispatcherMultipleConsumers.java:192) at org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers.getNextConsumer(AbstractDispatcherMultipleConsumers.java:137) at org.apache.pulsar.broker.service.nonpersistent.NonPersistentDispatcherMultipleConsumers.sendMessages(NonPersistentDispatcherMultipleConsumers.java:188) at org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic.lambda$publishMessage$2(NonPersistentTopic.java:202) at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:554) at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:277) at org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic.publishMessage(NonPersistentTopic.java:196) at org.apache.pulsar.broker.service.Producer.publishMessageToTopic(Producer.java:281) at org.apache.pulsar.broker.service.Producer.publishMessage(Producer.java:194) at org.apache.pulsar.broker.service.ServerCnx.handleSend(ServerCnx.java:1733) at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:222) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:202) at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:164) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800) at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499) at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:833)
multi-thread invoke
org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers#getNextConsumer
Even if consumerList.size() is checked first, there are still thread safety issues between consumerList.size() and consumerList.get()
if (currentConsumerRoundRobinIndex >= consumerList.size()) { currentConsumerRoundRobinIndex = 0; } int currentRoundRobinConsumerPriority = consumerList.get(currentConsumerRoundRobinIndex).getPriorityLevel();
What did you expect to see?
Like persistent methods, it is possible to access this method in a thread-safe manner or make the method itself safe
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers#trySendMessagesToConsumers
protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List<Entry> entries) { if (needTrimAckedMessages()) { cursor.trimDeletedEntries(entries); }
What did you see instead?
@OverRide
public void sendMessages(List entries) {
Consumer consumer = TOTAL_AVAILABLE_PERMITS_UPDATER.get(this) > 0 ? getNextConsumer() : null;
org.apache.pulsar.broker.service.nonpersistent.NonPersistentDispatcherMultipleConsumers#sendMessages
Anything else?
No response
Are you willing to submit a PR?
Search before asking
Version
3.0.0
Minimal reproduce step
2023-07-28T09:13:34,703+0800 [pulsar-io-12-13] WARN org.apache.pulsar.broker.service.ServerCnx - [/10.65.17.39:65308] Got exception java.lang.ArrayIndexOutOfBoundsException: Index 2 out of bounds for length 2 at java.base/java.util.concurrent.CopyOnWriteArrayList.elementAt(CopyOnWriteArrayList.java:385) at java.base/java.util.concurrent.CopyOnWriteArrayList.get(CopyOnWriteArrayList.java:398) at org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers.getNextConsumerFromSameOrLowerLevel(AbstractDispatcherMultipleConsumers.java:192) at org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers.getNextConsumer(AbstractDispatcherMultipleConsumers.java:137) at org.apache.pulsar.broker.service.nonpersistent.NonPersistentDispatcherMultipleConsumers.sendMessages(NonPersistentDispatcherMultipleConsumers.java:188) at org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic.lambda$publishMessage$2(NonPersistentTopic.java:202) at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap$Section.forEach(ConcurrentOpenHashMap.java:554) at org.apache.pulsar.common.util.collections.ConcurrentOpenHashMap.forEach(ConcurrentOpenHashMap.java:277) at org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic.publishMessage(NonPersistentTopic.java:196) at org.apache.pulsar.broker.service.Producer.publishMessageToTopic(Producer.java:281) at org.apache.pulsar.broker.service.Producer.publishMessage(Producer.java:194) at org.apache.pulsar.broker.service.ServerCnx.handleSend(ServerCnx.java:1733) at org.apache.pulsar.common.protocol.PulsarDecoder.channelRead(PulsarDecoder.java:222) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.handler.flow.FlowControlHandler.dequeue(FlowControlHandler.java:202) at io.netty.handler.flow.FlowControlHandler.channelRead(FlowControlHandler.java:164) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800) at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:499) at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:397) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:833)multi-thread invoke
org.apache.pulsar.broker.service.AbstractDispatcherMultipleConsumers#getNextConsumerEven if consumerList.size() is checked first, there are still thread safety issues between consumerList.size() and consumerList.get()
if (currentConsumerRoundRobinIndex >= consumerList.size()) { currentConsumerRoundRobinIndex = 0; } int currentRoundRobinConsumerPriority = consumerList.get(currentConsumerRoundRobinIndex).getPriorityLevel();What did you expect to see?
Like persistent methods, it is possible to access this method in a thread-safe manner or make the method itself safe
org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers#trySendMessagesToConsumers
protected synchronized boolean trySendMessagesToConsumers(ReadType readType, List<Entry> entries) { if (needTrimAckedMessages()) { cursor.trimDeletedEntries(entries); }What did you see instead?
@OverRide
public void sendMessages(List entries) {
Consumer consumer = TOTAL_AVAILABLE_PERMITS_UPDATER.get(this) > 0 ? getNextConsumer() : null;
org.apache.pulsar.broker.service.nonpersistent.NonPersistentDispatcherMultipleConsumers#sendMessages
Anything else?
No response
Are you willing to submit a PR?