Skip to content

DB-1585 Enable extending NioEventLoop#7

Merged
dimitarndimitrov merged 2 commits intodse-netty-4.1.25.Finalfrom
db1585-enable-extending-nioeventloop
May 25, 2018
Merged

DB-1585 Enable extending NioEventLoop#7
dimitarndimitrov merged 2 commits intodse-netty-4.1.25.Finalfrom
db1585-enable-extending-nioeventloop

Conversation

@dimitarndimitrov
Copy link
Copy Markdown

Make NioEventLoop extendable in order for our single-core event loops in the NIO use-case to both beTPCEventLoops, and directly subclass NioEventLoop instead of delegating to it.

Also includes a small unrelated fix that seems to fail the transport-netty-epoll build.

Copy link
Copy Markdown

@stef1927 stef1927 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@dimitarndimitrov
Copy link
Copy Markdown
Author

Thanks, folks, I'll wait for the Apollo ticket to get ready to go in, and I'll submit the Netty and the Apollo changes together.

@stef1927
Copy link
Copy Markdown

For info: I've just committed ab19cfe to revert a previous change from DB-1720 that was causing problems. I did not change the Netty version since this pull request will change it when it gets merged.

stef1927 pushed a commit that referenced this pull request May 15, 2018
Motivation:
ReferenceCountedOpenSslEngine is careful to lock access to `ssl`
almost everywhere (manually verified) *except* in the constructor.
Since `ssl` is non-final, it does not enjoy automatic thread safety
of the code that uses it.  Specifically, that means netty tcnative
code is not thread safe.

Modifications:

Ensure that all ssl engine intialization and variables related to
it are properly synchronized  by adding in the constructor.

Result:
Less noisy race detector.

Notes:
The specific racing threads are:
```
  Read of size 8 at 0x7b5400019ff8 by thread T52 (mutexes: write M215300):
    #0 ssl_do_info_callback .../src/ssl/ssl_lib.c:2602:24 (f077793ecd812aeebb37296c987f655c+0x23c6834)
    #1 ssl_process_alert .../src/ssl/tls_record.c:473:3 (f077793ecd812aeebb37296c987f655c+0x23a5346)
    #2 tls_open_record .../src/ssl/tls_record.c:338:12 (f077793ecd812aeebb37296c987f655c+0x23a5289)
    #3 ssl3_get_record .../src/ssl/s3_pkt.c:146:7 (f077793ecd812aeebb37296c987f655c+0x23a3da0)
    #4 ssl3_read_app_data .../src/ssl/s3_pkt.c:388:17 (f077793ecd812aeebb37296c987f655c+0x23a368f)
    #5 ssl_read_impl .../src/ssl/ssl_lib.c:722:15 (f077793ecd812aeebb37296c987f655c+0x23c0895)
    #6 SSL_read .../src/ssl/ssl_lib.c:743:10 (f077793ecd812aeebb37296c987f655c+0x23c075b)
    #7 netty_internal_tcnative_SSL_readFromSSL .../netty_tcnative/openssl-dynamic/src/main/c/ssl.c:946:12 (f077793ecd812aeebb37296c987f655c+0x23827f7)
    #8 <null> <null> (0x7fc0760193be)
    #9 io.netty.handler.ssl.ReferenceCountedOpenSslEngine.readPlaintextData(Ljava/nio/ByteBuffer;)I (ReferenceCountedOpenSslEngine.java:449)
    #10 io.netty.handler.ssl.ReferenceCountedOpenSslEngine.unwrap([Ljava/nio/ByteBuffer;II[Ljava/nio/ByteBuffer;II)Ljavax/net/ssl/SSLEngineResult; (ReferenceCountedOpenSslEngine.java:882)
    #11 io.netty.handler.ssl.ReferenceCountedOpenSslEngine.unwrap([Ljava/nio/ByteBuffer;[Ljava/nio/ByteBuffer;)Ljavax/net/ssl/SSLEngineResult; (ReferenceCountedOpenSslEngine.java:985)
    #12 io.netty.handler.ssl.ReferenceCountedOpenSslEngine.unwrap(Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;)Ljavax/net/ssl/SSLEngineResult; (ReferenceCountedOpenSslEngine.java:1028)
    #13 io.netty.handler.ssl.SslHandler$SslEngineType$1.unwrap(Lio/netty/handler/ssl/SslHandler;Lio/netty/buffer/ByteBuf;IILio/netty/buffer/ByteBuf;)Ljavax/net/ssl/SSLEngineResult; (SslHandler.java:206)
    #14 io.netty.handler.ssl.SslHandler.unwrap(Lio/netty/channel/ChannelHandlerContext;Lio/netty/buffer/ByteBuf;II)Z (SslHandler.java:1162)
    #15 io.netty.handler.ssl.SslHandler.decode(Lio/netty/channel/ChannelHandlerContext;Lio/netty/buffer/ByteBuf;Ljava/util/List;)V (SslHandler.java:1084)
    #16 io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(Lio/netty/channel/ChannelHandlerContext;Lio/netty/buffer/ByteBuf;Ljava/util/List;)V (ByteToMessageDecoder.java:489)
    #17 io.netty.handler.codec.ByteToMessageDecoder.callDecode(Lio/netty/channel/ChannelHandlerContext;Lio/netty/buffer/ByteBuf;Ljava/util/List;)V (ByteToMessageDecoder.java:428)
    #18 io.netty.handler.codec.ByteToMessageDecoder.channelRead(Lio/netty/channel/ChannelHandlerContext;Ljava/lang/Object;)V (ByteToMessageDecoder.java:265)
    #19 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Ljava/lang/Object;)V (AbstractChannelHandlerContext.java:362)
    #20 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Lio/netty/channel/AbstractChannelHandlerContext;Ljava/lang/Object;)V (AbstractChannelHandlerContext.java:348)
    #21 io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(Ljava/lang/Object;)Lio/netty/channel/ChannelHandlerContext; (AbstractChannelHandlerContext.java:340)
    #22 io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(Lio/netty/channel/ChannelHandlerContext;Ljava/lang/Object;)V (DefaultChannelPipeline.java:1334)
    #23 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Ljava/lang/Object;)V (AbstractChannelHandlerContext.java:362)
    #24 io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(Lio/netty/channel/AbstractChannelHandlerContext;Ljava/lang/Object;)V (AbstractChannelHandlerContext.java:348)
    #25 io.netty.channel.DefaultChannelPipeline.fireChannelRead(Ljava/lang/Object;)Lio/netty/channel/ChannelPipeline; (DefaultChannelPipeline.java:926)
    #26 io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read()V (AbstractNioByteChannel.java:134)
    #27 io.netty.channel.nio.NioEventLoop.processSelectedKey(Ljava/nio/channels/SelectionKey;Lio/netty/channel/nio/AbstractNioChannel;)V (NioEventLoop.java:644)
    #28 io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized()V (NioEventLoop.java:579)
    #29 io.netty.channel.nio.NioEventLoop.processSelectedKeys()V (NioEventLoop.java:496)
    #30 io.netty.channel.nio.NioEventLoop.run()V (NioEventLoop.java:458)
    #31 io.netty.util.concurrent.SingleThreadEventExecutor$5.run()V (SingleThreadEventExecutor.java:858)
    #32 io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run()V (DefaultThreadFactory.java:138)
    #33 java.lang.Thread.run()V (Thread.java:745)
    #34 (Generated Stub)

  Previous write of size 8 at 0x7b5400019ff8 by thread T97:
    #0 SSL_CTX_set_info_callback .../ssl/ssl_session.c:1136:22 (f077793ecd812aeebb37296c987f655c+0x23bd621)
    #1 netty_internal_tcnative_SSL_newSSL .../netty_tcnative/openssl-dynamic/src/main/c/ssl.c:830:5 (f077793ecd812aeebb37296c987f655c+0x2382306)
    #2 <null> <null> (0x7fc0760193be)
    #3 io.netty.handler.ssl.ReferenceCountedOpenSslEngine.<init>(Lio/netty/handler/ssl/ReferenceCountedOpenSslContext;Lio/netty/buffer/ByteBufAllocator;Ljava/lang/String;IZ)V (ReferenceCountedOpenSslEngine.java:237)
    #4 io.netty.handler.ssl.OpenSslEngine.<init>(Lio/netty/handler/ssl/OpenSslContext;Lio/netty/buffer/ByteBufAllocator;Ljava/lang/String;I)V (OpenSslEngine.java:31)
    #5 io.netty.handler.ssl.OpenSslContext.newEngine0(Lio/netty/buffer/ByteBufAllocator;Ljava/lang/String;I)Ljavax/net/ssl/SSLEngine; (OpenSslContext.java:49)
    #6 io.netty.handler.ssl.ReferenceCountedOpenSslContext.newEngine(Lio/netty/buffer/ByteBufAllocator;Ljava/lang/String;I)Ljavax/net/ssl/SSLEngine; (ReferenceCountedOpenSslContext.java:409)
    #7 io.netty.handler.ssl.ReferenceCountedOpenSslContext.newEngine(Lio/netty/buffer/ByteBufAllocator;)Ljavax/net/ssl/SSLEngine; (ReferenceCountedOpenSslContext.java:423)
    #8 io.grpc.netty.ProtocolNegotiators$ServerTlsHandler.handlerAdded(Lio/netty/channel/ChannelHandlerContext;)V (ProtocolNegotiators.java:133)
    #9 io.netty.channel.DefaultChannelPipeline.callHandlerAdded0(Lio/netty/channel/AbstractChannelHandlerContext;)V (DefaultChannelPipeline.java:597)
    #10 io.netty.channel.DefaultChannelPipeline.addLast(Lio/netty/util/concurrent/EventExecutorGroup;Ljava/lang/String;Lio/netty/channel/ChannelHandler;)Lio/netty/channel/ChannelPipeline; (DefaultChannelPipeline.java:226)
    #11 io.netty.channel.DefaultChannelPipeline.addLast(Lio/netty/util/concurrent/EventExecutorGroup;[Lio/netty/channel/ChannelHandler;)Lio/netty/channel/ChannelPipeline; (DefaultChannelPipeline.java:392)
    #12 io.netty.channel.DefaultChannelPipeline.addLast([Lio/netty/channel/ChannelHandler;)Lio/netty/channel/ChannelPipeline; (DefaultChannelPipeline.java:379)
    #13 io.grpc.netty.NettyServerTransport.start(Lio/grpc/internal/ServerTransportListener;)V (NettyServerTransport.java:99)
    #14 io.grpc.netty.NettyServer$1.initChannel(Lio/netty/channel/Channel;)V (NettyServer.java:164)
    #15 io.netty.channel.ChannelInitializer.initChannel(Lio/netty/channel/ChannelHandlerContext;)Z (ChannelInitializer.java:113)
    #16 io.netty.channel.ChannelInitializer.handlerAdded(Lio/netty/channel/ChannelHandlerContext;)V (ChannelInitializer.java:105)
    #17 io.netty.channel.DefaultChannelPipeline.callHandlerAdded0(Lio/netty/channel/AbstractChannelHandlerContext;)V (DefaultChannelPipeline.java:597)
    #18 io.netty.channel.DefaultChannelPipeline.access$000(Lio/netty/channel/DefaultChannelPipeline;Lio/netty/channel/AbstractChannelHandlerContext;)V (DefaultChannelPipeline.java:44)
    #19 io.netty.channel.DefaultChannelPipeline$PendingHandlerAddedTask.execute()V (DefaultChannelPipeline.java:1387)
    #20 io.netty.channel.DefaultChannelPipeline.callHandlerAddedForAllHandlers()V (DefaultChannelPipeline.java:1122)
    #21 io.netty.channel.DefaultChannelPipeline.invokeHandlerAddedIfNeeded()V (DefaultChannelPipeline.java:647)
    #22 io.netty.channel.AbstractChannel$AbstractUnsafe.register0(Lio/netty/channel/ChannelPromise;)V (AbstractChannel.java:506)
    #23 io.netty.channel.AbstractChannel$AbstractUnsafe.access$200(Lio/netty/channel/AbstractChannel$AbstractUnsafe;Lio/netty/channel/ChannelPromise;)V (AbstractChannel.java:419)
    #24 io.netty.channel.AbstractChannel$AbstractUnsafe$1.run()V (AbstractChannel.java:478)
    #25 io.netty.util.concurrent.AbstractEventExecutor.safeExecute(Ljava/lang/Runnable;)V (AbstractEventExecutor.java:163)
    #26 io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(J)Z (SingleThreadEventExecutor.java:403)
    #27 io.netty.channel.nio.NioEventLoop.run()V (NioEventLoop.java:462)
    #28 io.netty.util.concurrent.SingleThreadEventExecutor$5.run()V (SingleThreadEventExecutor.java:858)
    #29 io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run()V (DefaultThreadFactory.java:138)
    #30 java.lang.Thread.run()V (Thread.java:745)
    #31 (Generated Stub)

```
@dimitarndimitrov dimitarndimitrov changed the base branch from dse-netty-4.1.13.Final to dse-netty-4.1.25.Final May 23, 2018 12:45
@dimitarndimitrov dimitarndimitrov force-pushed the db1585-enable-extending-nioeventloop branch from 3877007 to d75f4e5 Compare May 23, 2018 12:45
@dimitarndimitrov
Copy link
Copy Markdown
Author

Rebased to dse-netty-4.1.25.Final and changed the PR base to that branch.

@dimitarndimitrov dimitarndimitrov merged commit 6d95b68 into dse-netty-4.1.25.Final May 25, 2018
@dimitarndimitrov dimitarndimitrov deleted the db1585-enable-extending-nioeventloop branch May 25, 2018 07:40
jtgrabowski pushed a commit that referenced this pull request Oct 1, 2019
…etty#8896)

Motivation:

RFC 6455 defines that, generally, a WebSocket client should not close a TCP
connection as far as a server is the one who's responsible for doing that.
In practice tho', it's not always possible to control the server. Server's
misbehavior may lead to connections being leaked (if the server does not
comply with the RFC).

RFC 6455 #7.1.1 says

> In abnormal cases (such as not having received a TCP Close from the server
after a reasonable amount of time) a client MAY initiate the TCP Close.

Modifications:

* WebSocket client handshaker additional param `forceCloseAfterMillis`

* Use 10 seconds as default

Result:

WebSocket client handshaker to comply with RFC. Fixes netty#8883.
alfred-yeung added a commit that referenced this pull request May 28, 2022
…etty#8896)

Motivation:
RFC 6455 defines that, generally, a WebSocket client should not close a TCP
connection as far as a server is the one who's responsible for doing that.
In practice tho', it's not always possible to control the server. Server's
misbehavior may lead to connections being leaked (if the server does not
comply with the RFC).

RFC 6455 #7.1.1 says

In abnormal cases (such as not having received a TCP Close from the server
after a reasonable amount of time) a client MAY initiate the TCP Close.

Modifications:
WebSocket client handshaker additional param forceCloseAfterMillis

Switched off by default to keep backward compatibility

Result:
WebSocket client handshaker to comply with RFC. Fixes (netty#8883).
thobe pushed a commit that referenced this pull request Jun 29, 2022
…tor (netty#10890)

Motivation:

A race detector discovered a data race in GlobalEventExecutor present in
netty 4.1.51.Final:

```
  Write of size 4 at 0x0000cea08774 by thread T103:
    #0 io.netty.util.internal.DefaultPriorityQueue.poll()Lio/netty/util/internal/PriorityQueueNode; DefaultPriorityQueue.java:113
    #1 io.netty.util.internal.DefaultPriorityQueue.poll()Ljava/lang/Object; DefaultPriorityQueue.java:31
    #2 java.util.AbstractQueue.remove()Ljava/lang/Object; AbstractQueue.java:113
    #3 io.netty.util.concurrent.AbstractScheduledEventExecutor.pollScheduledTask(J)Ljava/lang/Runnable; AbstractScheduledEventExecutor.java:133
    #4 io.netty.util.concurrent.GlobalEventExecutor.fetchFromScheduledTaskQueue()V GlobalEventExecutor.java:119
    #5 io.netty.util.concurrent.GlobalEventExecutor.takeTask()Ljava/lang/Runnable; GlobalEventExecutor.java:106
    #6 io.netty.util.concurrent.GlobalEventExecutor$TaskRunner.run()V GlobalEventExecutor.java:240
    #7 io.netty.util.internal.ThreadExecutorMap$2.run()V ThreadExecutorMap.java:74
    #8 io.netty.util.concurrent.FastThreadLocalRunnable.run()V FastThreadLocalRunnable.java:30
    #9 java.lang.Thread.run()V Thread.java:835
    #10 (Generated Stub) <null>

  Previous read of size 4 at 0x0000cea08774 by thread T110:
    #0 io.netty.util.internal.DefaultPriorityQueue.size()I DefaultPriorityQueue.java:46
    #1 io.netty.util.concurrent.GlobalEventExecutor$TaskRunner.run()V GlobalEventExecutor.java:263
    #2 io.netty.util.internal.ThreadExecutorMap$2.run()V ThreadExecutorMap.java:74
    #3 io.netty.util.concurrent.FastThreadLocalRunnable.run()V FastThreadLocalRunnable.java:30
    #4 java.lang.Thread.run()V Thread.java:835
    #5 (Generated Stub) <null>
```

The race is legit, but benign. To trigger it requires a TaskRunner to
begin exiting and set 'started' to false, more work to be scheduled
which starts a new TaskRunner, that work then needs to schedule
additional work which modifies 'scheduledTaskQueue', and then the
original TaskRunner checks 'scheduledTaskQueue'. But there is no danger
to this race as it can only produce a false negative in the condition
which causes the code to CAS 'started' which is thread-safe.

Modifications:

Delete problematic references to scheduledTaskQueue. The only way
scheduledTaskQueue could be modified since the last check is if another
TaskRunner is running, in which case the current TaskRunner doesn't
care.

Result:

Data-race free code, and a bit less code to boot.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants