Conversation
dimitarndimitrov
left a comment
There was a problem hiding this comment.
Disclaimer: I still don't understand the domain area, so this review is mostly a human-generated lint report.
Find some potentially relevant nit-picking below.
| private final long address; | ||
| private long nextId; | ||
| private long outstanding; | ||
| private long maxOutstanding; |
There was a problem hiding this comment.
nit: maxOutstanding can be made final.
| assert dst.position() == 0; | ||
| assert file.epollEventLoop.aioContext == this; | ||
|
|
||
| int length = (dst.limit() & 511) == 0 ? dst.limit() : ((dst.limit() + 511) & ~511); |
There was a problem hiding this comment.
nit: This line could be moved in a separate method, explaining its purpose (e.g. getSmallestAlignedBoundary(ByteBuffer, int)) and parametrizing the value of 511 or 512.
There was a problem hiding this comment.
There used to be a check on file.isDirect() that is now missing, is this intended? I also agree that the part of code that deals with alignment is probably better moved to a dedicated method to help with readability, not just the calculation, the 2 checks too.
There was a problem hiding this comment.
It does check if the buffer is direct, sure I can make the checks a sep method
| throw new RuntimeException("supplied buffer isn't long enough to handle read length alignment"); | ||
| // Avoid sending overflowing the aio context queue (EGAIN) | ||
| // instead buffer locally | ||
| if (outstanding + 1 <= maxOutstanding) { |
There was a problem hiding this comment.
nit: I think outstanding < maxOutstanding is shorter and clearer.
| long id = Native.submitAIORead(this, file.getEventFd(), file.getFd(), position, length, dst); | ||
|
|
||
| IORequest r = outstandingRequests.putIfAbsent(id, request); | ||
| ++outstanding; |
There was a problem hiding this comment.
nit: I may be misunderstanding the way the code here's supposed to work, but shouldn't the incrementing of outstanding depend on the result of the putIfAbsent and/or offer calls??
There was a problem hiding this comment.
I agree, outstanding should be incremented only if the item was added to the map, which is why I suggested above to just use the map size.
| return; | ||
| } | ||
|
|
||
| //Process the finished reads |
There was a problem hiding this comment.
nit: Here and below, some inconsistencies wrt spacing between a line comment's second forward slash and the text that follows it.
| private static final Integer aioMaxConcurrency = Integer.getInteger("netty.aio.maxConcurrency", 1024); | ||
| private static final Integer aioMaxConcurrency = Integer.getInteger("netty.aio.maxConcurrency", | ||
| 1024 / Runtime.getRuntime() | ||
| .availableProcessors()); |
There was a problem hiding this comment.
nit: You can add a comment explaining the meaning behind this calculation.
| import io.netty.channel.unix.FileDescriptor; | ||
| import io.netty.util.internal.logging.InternalLogger; | ||
| import io.netty.util.internal.logging.InternalLoggerFactory; | ||
| import org.jctools.queues.SpscGrowableArrayQueue; |
There was a problem hiding this comment.
nit: Is this import actually needed?
| length = (dst.limit() & 511) == 0 ? dst.limit() : ((dst.limit() + 511) & ~511); | ||
| if (dst.capacity() < length) { | ||
| throw new RuntimeException("supplied buffer isn't long enough to handle read length alignment"); | ||
| // Avoid sending overflowing the aio context queue (EGAIN) |
There was a problem hiding this comment.
Pushed some nits (4e3dad4).
See inline for remaining comments, some got hidden behind outdated code so make sure to check them all. Biggest concern is exception handling and a file.isDirect() check that got missing. Also, let me know if you prefer that I attempt to make the changes myself and you review them.
|
|
||
| private final long address; | ||
| private long nextId; | ||
| private long outstanding; |
There was a problem hiding this comment.
Do we need this counter, isn't the size of outstandingRequests sufficient, given that the code executes in the same thread?
There was a problem hiding this comment.
Just more efficient to check a counter vs map.size()
There was a problem hiding this comment.
True in the general case, but LongObjectHashMap keeps the size cached, so I don't think this would have a performance impact, although I agree this is a super-hot code path.
| private long outstanding; | ||
| private long maxOutstanding; | ||
| private final Map<Long, IORequest> outstandingRequests; | ||
| private final ArrayDeque<IORequest> pendingRequests; |
There was a problem hiding this comment.
We are definitely OK growing this indefinitely?
There was a problem hiding this comment.
I don't think so. From the doc: Array deques have no capacity restrictions; as far as I understand, the parameter in the constructor is just the initial value.
| assert dst.position() == 0; | ||
| assert file.epollEventLoop.aioContext == this; | ||
|
|
||
| int length = (dst.limit() & 511) == 0 ? dst.limit() : ((dst.limit() + 511) & ~511); |
There was a problem hiding this comment.
There used to be a check on file.isDirect() that is now missing, is this intended? I also agree that the part of code that deals with alignment is probably better moved to a dedicated method to help with readability, not just the calculation, the 2 checks too.
| long id = Native.submitAIORead(this, file.getEventFd(), file.getFd(), position, length, dst); | ||
|
|
||
| IORequest r = outstandingRequests.putIfAbsent(id, request); | ||
| ++outstanding; |
There was a problem hiding this comment.
I agree, outstanding should be incremented only if the item was added to the map, which is why I suggested above to just use the map size.
| //We do this after processing the finished reads incase any outstanding reads | ||
| //they share the same read buffer | ||
| for (int i = 0; i < numReady; i++) { | ||
| --outstanding; |
There was a problem hiding this comment.
Here I have a problem with the outstanding counter again because an exception will cause this counter not to be updated whereas the requests are no longer outstanding.
| long nextId = Native.submitAIORead(this, req.file.getEventFd(), req.file.getFd(), | ||
| req.position, req.length, req.buffer); | ||
| IORequest r = outstandingRequests.putIfAbsent(nextId, req); | ||
| assert r == null; |
There was a problem hiding this comment.
If we wanted to refactor the code a little bit, we should have a dedicated method to submit the AIO read, add the request to outstandingRequests and, if required, increment outstanding. This functionality is repeated twice.
| IORequest req = null; | ||
| // Push any pending requests if we have room | ||
| try { | ||
| if (outstanding + 1 <= maxOutstanding && (req = pendingRequests.poll()) != null) { |
There was a problem hiding this comment.
Any other exception not "Resource temporarily unavailable" and req is silently dropped. I think this method should be split in two methods, one to process the requests that have completed and one to send more pending requests, and each method should have its own exception handling.
|
Pushed changes based on review points and added a fix for APOLLO-1081 |
There was a problem hiding this comment.
We are almost there. I made some changes in a48e113:
-
Limited the size of the pending requests queue and added a test to check "Too many pending requests" is thrown as expected
-
Minor tweak to
addPendingReadsto avoid looping if there are no pending requests -
Removed the
eventFdinstance variable fromAIOEpollFileChannel(it usesnettyChannel.fd()directly) to avoid confusion as to why it is not closed. -
In order to test "Too many pending requests" I had to add another parameter to the
AIOContextconstructor, and since I got that far, I also added a system property to change the default max pending value.
If you are happy with my changes, I'm +1 to merge, if not let's have one more, hopefully final, round.
| return; | ||
| } | ||
|
|
||
| keysArray[(int) j] = (long) niocb->request_key; |
|
Committed manually as 0a263dd. |
Motivation:
ReferenceCountedOpenSslEngine is careful to lock access to `ssl`
almost everywhere (manually verified) *except* in the constructor.
Since `ssl` is non-final, it does not enjoy automatic thread safety
of the code that uses it. Specifically, that means netty tcnative
code is not thread safe.
Modifications:
Ensure that all ssl engine intialization and variables related to
it are properly synchronized by adding in the constructor.
Result:
Less noisy race detector.
Notes:
The specific racing threads are:
```
Read of size 8 at 0x7b5400019ff8 by thread T52 (mutexes: write M215300):
#0 ssl_do_info_callback .../src/ssl/ssl_lib.c:2602:24 (f077793ecd812aeebb37296c987f655c+0x23c6834)
#1 ssl_process_alert .../src/ssl/tls_record.c:473:3 (f077793ecd812aeebb37296c987f655c+0x23a5346)
#2 tls_open_record .../src/ssl/tls_record.c:338:12 (f077793ecd812aeebb37296c987f655c+0x23a5289)
#3 ssl3_get_record .../src/ssl/s3_pkt.c:146:7 (f077793ecd812aeebb37296c987f655c+0x23a3da0)
#4 ssl3_read_app_data .../src/ssl/s3_pkt.c:388:17 (f077793ecd812aeebb37296c987f655c+0x23a368f)
#5 ssl_read_impl .../src/ssl/ssl_lib.c:722:15 (f077793ecd812aeebb37296c987f655c+0x23c0895)
#6 SSL_read .../src/ssl/ssl_lib.c:743:10 (f077793ecd812aeebb37296c987f655c+0x23c075b)
#7 netty_internal_tcnative_SSL_readFromSSL .../netty_tcnative/openssl-dynamic/src/main/c/ssl.c:946:12 (f077793ecd812aeebb37296c987f655c+0x23827f7)
#8 <null> <null> (0x7fc0760193be)
#9 io.netty.handler.ssl.ReferenceCountedOpenSslEngine.readPlaintextData(Ljava/nio/ByteBuffer;)I (ReferenceCountedOpenSslEngine.java:449)
#10 io.netty.handler.ssl.ReferenceCountedOpenSslEngine.unwrap([Ljava/nio/ByteBuffer;II[Ljava/nio/ByteBuffer;II)Ljavax/net/ssl/SSLEngineResult; (ReferenceCountedOpenSslEngine.java:882)
#11 io.netty.handler.ssl.ReferenceCountedOpenSslEngine.unwrap([Ljava/nio/ByteBuffer;[Ljava/nio/ByteBuffer;)Ljavax/net/ssl/SSLEngineResult; (ReferenceCountedOpenSslEngine.java:985)
#12 io.netty.handler.ssl.ReferenceCountedOpenSslEngine.unwrap(Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;)Ljavax/net/ssl/SSLEngineResult; (ReferenceCountedOpenSslEngine.java:1028)
#13 io.netty.handler.ssl.SslHandler$SslEngineType$1.unwrap(Lio/netty/handler/ssl/SslHandler;Lio/netty/buffer/ByteBuf;IILio/netty/buffer/ByteBuf;)Ljavax/net/ssl/SSLEngineResult; (SslHandler.java:206)
#14 io.netty.handler.ssl.SslHandler.unwrap(Lio/netty/channel/ChannelHandlerContext;Lio/netty/buffer/ByteBuf;II)Z (SslHandler.java:1162)
#15 io.netty.handler.ssl.SslHandler.decode(Lio/netty/channel/ChannelHandlerContext;Lio/netty/buffer/ByteBuf;Ljava/util/List;)V (SslHandler.java:1084)
#16 io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(Lio/netty/channel/ChannelHandlerContext;Lio/netty/buffer/ByteBuf;Ljava/util/List;)V (ByteToMessageDecoder.java:489)
#17 io.netty.handler.codec.ByteToMessageDecoder.callDecode(Lio/netty/channel/ChannelHandlerContext;Lio/netty/buffer/ByteBuf;Ljava/util/List;)V (ByteToMessageDecoder.java:428)
#18 io.netty.handler.codec.ByteToMessageDecoder.channelRead(Lio/netty/channel/ChannelHandlerContext;Ljava/lang/Object;)V (ByteToMessageDecoder.java:265)
#19 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Ljava/lang/Object;)V (AbstractChannelHandlerContext.java:362)
#20 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Lio/netty/channel/AbstractChannelHandlerContext;Ljava/lang/Object;)V (AbstractChannelHandlerContext.java:348)
#21 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Ljava/lang/Object;)Lio/netty/channel/ChannelHandlerContext; (AbstractChannelHandlerContext.java:340)
#22 io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(Lio/netty/channel/ChannelHandlerContext;Ljava/lang/Object;)V (DefaultChannelPipeline.java:1334)
#23 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Ljava/lang/Object;)V (AbstractChannelHandlerContext.java:362)
#24 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Lio/netty/channel/AbstractChannelHandlerContext;Ljava/lang/Object;)V (AbstractChannelHandlerContext.java:348)
#25 io.netty.channel.DefaultChannelPipeline.fireChannelRead(Ljava/lang/Object;)Lio/netty/channel/ChannelPipeline; (DefaultChannelPipeline.java:926)
#26 io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read()V (AbstractNioByteChannel.java:134)
#27 io.netty.channel.nio.NioEventLoop.processSelectedKey(Ljava/nio/channels/SelectionKey;Lio/netty/channel/nio/AbstractNioChannel;)V (NioEventLoop.java:644)
#28 io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized()V (NioEventLoop.java:579)
#29 io.netty.channel.nio.NioEventLoop.processSelectedKeys()V (NioEventLoop.java:496)
#30 io.netty.channel.nio.NioEventLoop.run()V (NioEventLoop.java:458)
#31 io.netty.util.concurrent.SingleThreadEventExecutor$5.run()V (SingleThreadEventExecutor.java:858)
#32 io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run()V (DefaultThreadFactory.java:138)
#33 java.lang.Thread.run()V (Thread.java:745)
#34 (Generated Stub)
Previous write of size 8 at 0x7b5400019ff8 by thread T97:
#0 SSL_CTX_set_info_callback .../ssl/ssl_session.c:1136:22 (f077793ecd812aeebb37296c987f655c+0x23bd621)
#1 netty_internal_tcnative_SSL_newSSL .../netty_tcnative/openssl-dynamic/src/main/c/ssl.c:830:5 (f077793ecd812aeebb37296c987f655c+0x2382306)
#2 <null> <null> (0x7fc0760193be)
#3 io.netty.handler.ssl.ReferenceCountedOpenSslEngine.<init>(Lio/netty/handler/ssl/ReferenceCountedOpenSslContext;Lio/netty/buffer/ByteBufAllocator;Ljava/lang/String;IZ)V (ReferenceCountedOpenSslEngine.java:237)
#4 io.netty.handler.ssl.OpenSslEngine.<init>(Lio/netty/handler/ssl/OpenSslContext;Lio/netty/buffer/ByteBufAllocator;Ljava/lang/String;I)V (OpenSslEngine.java:31)
#5 io.netty.handler.ssl.OpenSslContext.newEngine0(Lio/netty/buffer/ByteBufAllocator;Ljava/lang/String;I)Ljavax/net/ssl/SSLEngine; (OpenSslContext.java:49)
#6 io.netty.handler.ssl.ReferenceCountedOpenSslContext.newEngine(Lio/netty/buffer/ByteBufAllocator;Ljava/lang/String;I)Ljavax/net/ssl/SSLEngine; (ReferenceCountedOpenSslContext.java:409)
#7 io.netty.handler.ssl.ReferenceCountedOpenSslContext.newEngine(Lio/netty/buffer/ByteBufAllocator;)Ljavax/net/ssl/SSLEngine; (ReferenceCountedOpenSslContext.java:423)
#8 io.grpc.netty.ProtocolNegotiators$ServerTlsHandler.handlerAdded(Lio/netty/channel/ChannelHandlerContext;)V (ProtocolNegotiators.java:133)
#9 io.netty.channel.DefaultChannelPipeline.callHandlerAdded0(Lio/netty/channel/AbstractChannelHandlerContext;)V (DefaultChannelPipeline.java:597)
#10 io.netty.channel.DefaultChannelPipeline.addLast(Lio/netty/util/concurrent/EventExecutorGroup;Ljava/lang/String;Lio/netty/channel/ChannelHandler;)Lio/netty/channel/ChannelPipeline; (DefaultChannelPipeline.java:226)
#11 io.netty.channel.DefaultChannelPipeline.addLast(Lio/netty/util/concurrent/EventExecutorGroup;[Lio/netty/channel/ChannelHandler;)Lio/netty/channel/ChannelPipeline; (DefaultChannelPipeline.java:392)
#12 io.netty.channel.DefaultChannelPipeline.addLast([Lio/netty/channel/ChannelHandler;)Lio/netty/channel/ChannelPipeline; (DefaultChannelPipeline.java:379)
#13 io.grpc.netty.NettyServerTransport.start(Lio/grpc/internal/ServerTransportListener;)V (NettyServerTransport.java:99)
#14 io.grpc.netty.NettyServer$1.initChannel(Lio/netty/channel/Channel;)V (NettyServer.java:164)
#15 io.netty.channel.ChannelInitializer.initChannel(Lio/netty/channel/ChannelHandlerContext;)Z (ChannelInitializer.java:113)
#16 io.netty.channel.ChannelInitializer.handlerAdded(Lio/netty/channel/ChannelHandlerContext;)V (ChannelInitializer.java:105)
#17 io.netty.channel.DefaultChannelPipeline.callHandlerAdded0(Lio/netty/channel/AbstractChannelHandlerContext;)V (DefaultChannelPipeline.java:597)
#18 io.netty.channel.DefaultChannelPipeline.access$000(Lio/netty/channel/DefaultChannelPipeline;Lio/netty/channel/AbstractChannelHandlerContext;)V (DefaultChannelPipeline.java:44)
#19 io.netty.channel.DefaultChannelPipeline$PendingHandlerAddedTask.execute()V (DefaultChannelPipeline.java:1387)
#20 io.netty.channel.DefaultChannelPipeline.callHandlerAddedForAllHandlers()V (DefaultChannelPipeline.java:1122)
#21 io.netty.channel.DefaultChannelPipeline.invokeHandlerAddedIfNeeded()V (DefaultChannelPipeline.java:647)
#22 io.netty.channel.AbstractChannel$AbstractUnsafe.register0(Lio/netty/channel/ChannelPromise;)V (AbstractChannel.java:506)
#23 io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(Lio/netty/channel/AbstractChannel$AbstractUnsafe;Lio/netty/channel/ChannelPromise;)V (AbstractChannel.java:419)
#24 io.netty.channel.AbstractChannel$AbstractUnsafe$1.run()V (AbstractChannel.java:478)
#25 io.netty.util.concurrent.AbstractEventExecutor.safeExecute(Ljava/lang/Runnable;)V (AbstractEventExecutor.java:163)
#26 io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(J)Z (SingleThreadEventExecutor.java:403)
#27 io.netty.channel.nio.NioEventLoop.run()V (NioEventLoop.java:462)
#28 io.netty.util.concurrent.SingleThreadEventExecutor$5.run()V (SingleThreadEventExecutor.java:858)
#29 io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run()V (DefaultThreadFactory.java:138)
#30 java.lang.Thread.run()V (Thread.java:745)
#31 (Generated Stub)
```
Motivation:
ReferenceCountedOpenSslEngine is careful to lock access to `ssl`
almost everywhere (manually verified) *except* in the constructor.
Since `ssl` is non-final, it does not enjoy automatic thread safety
of the code that uses it. Specifically, that means netty tcnative
code is not thread safe.
Modifications:
Ensure that all ssl engine intialization and variables related to
it are properly synchronized by adding in the constructor.
Result:
Less noisy race detector.
Notes:
The specific racing threads are:
```
Read of size 8 at 0x7b5400019ff8 by thread T52 (mutexes: write M215300):
#0 ssl_do_info_callback .../src/ssl/ssl_lib.c:2602:24 (f077793ecd812aeebb37296c987f655c+0x23c6834)
riptano#1 ssl_process_alert .../src/ssl/tls_record.c:473:3 (f077793ecd812aeebb37296c987f655c+0x23a5346)
riptano#2 tls_open_record .../src/ssl/tls_record.c:338:12 (f077793ecd812aeebb37296c987f655c+0x23a5289)
riptano#3 ssl3_get_record .../src/ssl/s3_pkt.c:146:7 (f077793ecd812aeebb37296c987f655c+0x23a3da0)
riptano#4 ssl3_read_app_data .../src/ssl/s3_pkt.c:388:17 (f077793ecd812aeebb37296c987f655c+0x23a368f)
riptano#5 ssl_read_impl .../src/ssl/ssl_lib.c:722:15 (f077793ecd812aeebb37296c987f655c+0x23c0895)
riptano#6 SSL_read .../src/ssl/ssl_lib.c:743:10 (f077793ecd812aeebb37296c987f655c+0x23c075b)
riptano#7 netty_internal_tcnative_SSL_readFromSSL .../netty_tcnative/openssl-dynamic/src/main/c/ssl.c:946:12 (f077793ecd812aeebb37296c987f655c+0x23827f7)
riptano#8 <null> <null> (0x7fc0760193be)
riptano#9 io.netty.handler.ssl.ReferenceCountedOpenSslEngine.readPlaintextData(Ljava/nio/ByteBuffer;)I (ReferenceCountedOpenSslEngine.java:449)
riptano#10 io.netty.handler.ssl.ReferenceCountedOpenSslEngine.unwrap([Ljava/nio/ByteBuffer;II[Ljava/nio/ByteBuffer;II)Ljavax/net/ssl/SSLEngineResult; (ReferenceCountedOpenSslEngine.java:882)
riptano#11 io.netty.handler.ssl.ReferenceCountedOpenSslEngine.unwrap([Ljava/nio/ByteBuffer;[Ljava/nio/ByteBuffer;)Ljavax/net/ssl/SSLEngineResult; (ReferenceCountedOpenSslEngine.java:985)
riptano#12 io.netty.handler.ssl.ReferenceCountedOpenSslEngine.unwrap(Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;)Ljavax/net/ssl/SSLEngineResult; (ReferenceCountedOpenSslEngine.java:1028)
riptano#13 io.netty.handler.ssl.SslHandler$SslEngineType$1.unwrap(Lio/netty/handler/ssl/SslHandler;Lio/netty/buffer/ByteBuf;IILio/netty/buffer/ByteBuf;)Ljavax/net/ssl/SSLEngineResult; (SslHandler.java:206)
riptano#14 io.netty.handler.ssl.SslHandler.unwrap(Lio/netty/channel/ChannelHandlerContext;Lio/netty/buffer/ByteBuf;II)Z (SslHandler.java:1162)
riptano#15 io.netty.handler.ssl.SslHandler.decode(Lio/netty/channel/ChannelHandlerContext;Lio/netty/buffer/ByteBuf;Ljava/util/List;)V (SslHandler.java:1084)
riptano#16 io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(Lio/netty/channel/ChannelHandlerContext;Lio/netty/buffer/ByteBuf;Ljava/util/List;)V (ByteToMessageDecoder.java:489)
riptano#17 io.netty.handler.codec.ByteToMessageDecoder.callDecode(Lio/netty/channel/ChannelHandlerContext;Lio/netty/buffer/ByteBuf;Ljava/util/List;)V (ByteToMessageDecoder.java:428)
riptano#18 io.netty.handler.codec.ByteToMessageDecoder.channelRead(Lio/netty/channel/ChannelHandlerContext;Ljava/lang/Object;)V (ByteToMessageDecoder.java:265)
riptano#19 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Ljava/lang/Object;)V (AbstractChannelHandlerContext.java:362)
riptano#20 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Lio/netty/channel/AbstractChannelHandlerContext;Ljava/lang/Object;)V (AbstractChannelHandlerContext.java:348)
riptano#21 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Ljava/lang/Object;)Lio/netty/channel/ChannelHandlerContext; (AbstractChannelHandlerContext.java:340)
riptano#22 io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(Lio/netty/channel/ChannelHandlerContext;Ljava/lang/Object;)V (DefaultChannelPipeline.java:1334)
riptano#23 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Ljava/lang/Object;)V (AbstractChannelHandlerContext.java:362)
riptano#24 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Lio/netty/channel/AbstractChannelHandlerContext;Ljava/lang/Object;)V (AbstractChannelHandlerContext.java:348)
riptano#25 io.netty.channel.DefaultChannelPipeline.fireChannelRead(Ljava/lang/Object;)Lio/netty/channel/ChannelPipeline; (DefaultChannelPipeline.java:926)
riptano#26 io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read()V (AbstractNioByteChannel.java:134)
riptano#27 io.netty.channel.nio.NioEventLoop.processSelectedKey(Ljava/nio/channels/SelectionKey;Lio/netty/channel/nio/AbstractNioChannel;)V (NioEventLoop.java:644)
riptano#28 io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized()V (NioEventLoop.java:579)
riptano#29 io.netty.channel.nio.NioEventLoop.processSelectedKeys()V (NioEventLoop.java:496)
riptano#30 io.netty.channel.nio.NioEventLoop.run()V (NioEventLoop.java:458)
riptano#31 io.netty.util.concurrent.SingleThreadEventExecutor$5.run()V (SingleThreadEventExecutor.java:858)
riptano#32 io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run()V (DefaultThreadFactory.java:138)
riptano#33 java.lang.Thread.run()V (Thread.java:745)
riptano#34 (Generated Stub)
Previous write of size 8 at 0x7b5400019ff8 by thread T97:
#0 SSL_CTX_set_info_callback .../ssl/ssl_session.c:1136:22 (f077793ecd812aeebb37296c987f655c+0x23bd621)
riptano#1 netty_internal_tcnative_SSL_newSSL .../netty_tcnative/openssl-dynamic/src/main/c/ssl.c:830:5 (f077793ecd812aeebb37296c987f655c+0x2382306)
riptano#2 <null> <null> (0x7fc0760193be)
riptano#3 io.netty.handler.ssl.ReferenceCountedOpenSslEngine.<init>(Lio/netty/handler/ssl/ReferenceCountedOpenSslContext;Lio/netty/buffer/ByteBufAllocator;Ljava/lang/String;IZ)V (ReferenceCountedOpenSslEngine.java:237)
riptano#4 io.netty.handler.ssl.OpenSslEngine.<init>(Lio/netty/handler/ssl/OpenSslContext;Lio/netty/buffer/ByteBufAllocator;Ljava/lang/String;I)V (OpenSslEngine.java:31)
riptano#5 io.netty.handler.ssl.OpenSslContext.newEngine0(Lio/netty/buffer/ByteBufAllocator;Ljava/lang/String;I)Ljavax/net/ssl/SSLEngine; (OpenSslContext.java:49)
riptano#6 io.netty.handler.ssl.ReferenceCountedOpenSslContext.newEngine(Lio/netty/buffer/ByteBufAllocator;Ljava/lang/String;I)Ljavax/net/ssl/SSLEngine; (ReferenceCountedOpenSslContext.java:409)
riptano#7 io.netty.handler.ssl.ReferenceCountedOpenSslContext.newEngine(Lio/netty/buffer/ByteBufAllocator;)Ljavax/net/ssl/SSLEngine; (ReferenceCountedOpenSslContext.java:423)
riptano#8 io.grpc.netty.ProtocolNegotiators$ServerTlsHandler.handlerAdded(Lio/netty/channel/ChannelHandlerContext;)V (ProtocolNegotiators.java:133)
riptano#9 io.netty.channel.DefaultChannelPipeline.callHandlerAdded0(Lio/netty/channel/AbstractChannelHandlerContext;)V (DefaultChannelPipeline.java:597)
riptano#10 io.netty.channel.DefaultChannelPipeline.addLast(Lio/netty/util/concurrent/EventExecutorGroup;Ljava/lang/String;Lio/netty/channel/ChannelHandler;)Lio/netty/channel/ChannelPipeline; (DefaultChannelPipeline.java:226)
riptano#11 io.netty.channel.DefaultChannelPipeline.addLast(Lio/netty/util/concurrent/EventExecutorGroup;[Lio/netty/channel/ChannelHandler;)Lio/netty/channel/ChannelPipeline; (DefaultChannelPipeline.java:392)
riptano#12 io.netty.channel.DefaultChannelPipeline.addLast([Lio/netty/channel/ChannelHandler;)Lio/netty/channel/ChannelPipeline; (DefaultChannelPipeline.java:379)
riptano#13 io.grpc.netty.NettyServerTransport.start(Lio/grpc/internal/ServerTransportListener;)V (NettyServerTransport.java:99)
riptano#14 io.grpc.netty.NettyServer$1.initChannel(Lio/netty/channel/Channel;)V (NettyServer.java:164)
riptano#15 io.netty.channel.ChannelInitializer.initChannel(Lio/netty/channel/ChannelHandlerContext;)Z (ChannelInitializer.java:113)
riptano#16 io.netty.channel.ChannelInitializer.handlerAdded(Lio/netty/channel/ChannelHandlerContext;)V (ChannelInitializer.java:105)
riptano#17 io.netty.channel.DefaultChannelPipeline.callHandlerAdded0(Lio/netty/channel/AbstractChannelHandlerContext;)V (DefaultChannelPipeline.java:597)
riptano#18 io.netty.channel.DefaultChannelPipeline.access$000(Lio/netty/channel/DefaultChannelPipeline;Lio/netty/channel/AbstractChannelHandlerContext;)V (DefaultChannelPipeline.java:44)
riptano#19 io.netty.channel.DefaultChannelPipeline$PendingHandlerAddedTask.execute()V (DefaultChannelPipeline.java:1387)
riptano#20 io.netty.channel.DefaultChannelPipeline.callHandlerAddedForAllHandlers()V (DefaultChannelPipeline.java:1122)
riptano#21 io.netty.channel.DefaultChannelPipeline.invokeHandlerAddedIfNeeded()V (DefaultChannelPipeline.java:647)
riptano#22 io.netty.channel.AbstractChannel$AbstractUnsafe.register0(Lio/netty/channel/ChannelPromise;)V (AbstractChannel.java:506)
riptano#23 io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(Lio/netty/channel/AbstractChannel$AbstractUnsafe;Lio/netty/channel/ChannelPromise;)V (AbstractChannel.java:419)
riptano#24 io.netty.channel.AbstractChannel$AbstractUnsafe$1.run()V (AbstractChannel.java:478)
riptano#25 io.netty.util.concurrent.AbstractEventExecutor.safeExecute(Ljava/lang/Runnable;)V (AbstractEventExecutor.java:163)
riptano#26 io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(J)Z (SingleThreadEventExecutor.java:403)
riptano#27 io.netty.channel.nio.NioEventLoop.run()V (NioEventLoop.java:462)
riptano#28 io.netty.util.concurrent.SingleThreadEventExecutor$5.run()V (SingleThreadEventExecutor.java:858)
riptano#29 io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run()V (DefaultThreadFactory.java:138)
riptano#30 java.lang.Thread.run()V (Thread.java:745)
riptano#31 (Generated Stub)
```
…tor (netty#10890) Motivation: A race detector discovered a data race in GlobalEventExecutor present in netty 4.1.51.Final: ``` Write of size 4 at 0x0000cea08774 by thread T103: #0 io.netty.util.internal.DefaultPriorityQueue.poll()Lio/netty/util/internal/PriorityQueueNode; DefaultPriorityQueue.java:113 #1 io.netty.util.internal.DefaultPriorityQueue.poll()Ljava/lang/Object; DefaultPriorityQueue.java:31 #2 java.util.AbstractQueue.remove()Ljava/lang/Object; AbstractQueue.java:113 #3 io.netty.util.concurrent.AbstractScheduledEventExecutor.pollScheduledTask(J)Ljava/lang/Runnable; AbstractScheduledEventExecutor.java:133 #4 io.netty.util.concurrent.GlobalEventExecutor.fetchFromScheduledTaskQueue()V GlobalEventExecutor.java:119 #5 io.netty.util.concurrent.GlobalEventExecutor.takeTask()Ljava/lang/Runnable; GlobalEventExecutor.java:106 #6 io.netty.util.concurrent.GlobalEventExecutor$TaskRunner.run()V GlobalEventExecutor.java:240 #7 io.netty.util.internal.ThreadExecutorMap$2.run()V ThreadExecutorMap.java:74 #8 io.netty.util.concurrent.FastThreadLocalRunnable.run()V FastThreadLocalRunnable.java:30 #9 java.lang.Thread.run()V Thread.java:835 #10 (Generated Stub) <null> Previous read of size 4 at 0x0000cea08774 by thread T110: #0 io.netty.util.internal.DefaultPriorityQueue.size()I DefaultPriorityQueue.java:46 #1 io.netty.util.concurrent.GlobalEventExecutor$TaskRunner.run()V GlobalEventExecutor.java:263 #2 io.netty.util.internal.ThreadExecutorMap$2.run()V ThreadExecutorMap.java:74 #3 io.netty.util.concurrent.FastThreadLocalRunnable.run()V FastThreadLocalRunnable.java:30 #4 java.lang.Thread.run()V Thread.java:835 #5 (Generated Stub) <null> ``` The race is legit, but benign. To trigger it requires a TaskRunner to begin exiting and set 'started' to false, more work to be scheduled which starts a new TaskRunner, that work then needs to schedule additional work which modifies 'scheduledTaskQueue', and then the original TaskRunner checks 'scheduledTaskQueue'. But there is no danger to this race as it can only produce a false negative in the condition which causes the code to CAS 'started' which is thread-safe. Modifications: Delete problematic references to scheduledTaskQueue. The only way scheduledTaskQueue could be modified since the last check is if another TaskRunner is running, in which case the current TaskRunner doesn't care. Result: Data-race free code, and a bit less code to boot.
Motivation: netty#10995 when `io.netty.channel.unix.Socket` is ipv6 and join a multicast group with ipv4 address will cause `io.netty.channel.ChannelException: setsockopt() failed: Invalid argument` (at least in `Linux centos.dev 4.18.0-240.10.1.el8_3.x86_64 #1 SMP Mon Jan 18 17:05:51 UTC 2021 x86_64 x86_64 x86_64 GNU/Linux`) Modification: check if target group address is ipv6 before call `io.netty.channel.epoll.LinuxSocket#joinGroup(java.net.InetAddress, java.net.NetworkInterface, java.net.InetAddress)` I'm not sure if this modification is currect, but i checked source code of java NIO ``` Java_sun_nio_ch_Net_canJoin6WithIPv4Group0(JNIEnv* env, jclass cl) { #if defined(__APPLE__) /* IPV6_ADD_MEMBERSHIP can be used to join IPv4 multicast groups */ return JNI_TRUE; #else /* IPV6_ADD_MEMBERSHIP cannot be used to join IPv4 multicast groups */ return JNI_FALSE; #endif } ``` seems ipv6 address can't join ipv4 group except osx Result: test on `Linux 3.10.0-514.el7.x86_64 #1 SMP Tue Nov 22 16:42:41 UTC 2016 x86_64 x86_64 x86_64 GNU/Linux` exception ` setsockopt() failed: Invalid argument` has fixed Fixes netty#10995
netty#13383) …3367)" Motivation: Something seems to be wrong when trying to deploy on aarch64 with this change. Modifications: This reverts commit 3535a2f. Result: Snapshot deploys work again
netty#15293) …5279) Motivation: In Kqueue it sometimes happened that we did see a WARN that the registration for a given id could not be found. The reason for this was that it is possible that the registration was already be cancelled by the user and so not present anymore. In this case the logs are harmless but still confusing. We should better ensure we handle it better. Modifications: When the user cancels a registration just mark it as cancelled and remove it from the map one all events are processed. Result: Less confusing logging and more correct handling of cancelled registrations
netty#15395) …5281) Motivation: In our testsuite we often just wrapped a byte[] and passed it to the write method. While this works it also might end up not testing often with direct buffers. We should make this a bit more random and so cover direct and heap buffers. The idea popped up when reviewing netty#15231 Modifications: - Add new static helper method that will either return a wrapped buffer or a direct buffers in a random fashion - Use this method to allocate buffers Result: More random testing of heap / direct buffers
No description provided.