From e8d109ca87558887ea96a54508b5d4ebdc654475 Mon Sep 17 00:00:00 2001 From: buchgr Date: Thu, 30 Jun 2016 16:10:16 +0200 Subject: [PATCH 1/6] netty: add handler to combine small writes into larger buffer --- .../io/grpc/netty/NettyClientHandler.java | 6 ++ .../io/grpc/netty/NettyServerHandler.java | 1 + .../io/grpc/netty/WriteCombiningHandler.java | 82 +++++++++++++++++++ 3 files changed, 89 insertions(+) create mode 100644 netty/src/main/java/io/grpc/netty/WriteCombiningHandler.java diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index fdda262ab8d..83d4fdf7cfd 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -192,6 +192,12 @@ public void onStreamRemoved(Http2Stream stream) { }); } + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + ctx.pipeline().addBefore(ctx.executor(), ctx.name(), null, new WriteCombiningHandler()); + super.handlerAdded(ctx); + } + /** * Handler for commands sent from the stream. */ diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index d519fc1e85e..3d29462dafd 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -167,6 +167,7 @@ Throwable connectionError() { @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { serverWriteQueue = new WriteQueue(ctx.channel()); + ctx.pipeline().addBefore(ctx.executor(), ctx.name(), null, new WriteCombiningHandler()); super.handlerAdded(ctx); } diff --git a/netty/src/main/java/io/grpc/netty/WriteCombiningHandler.java b/netty/src/main/java/io/grpc/netty/WriteCombiningHandler.java new file mode 100644 index 00000000000..38e56ebca80 --- /dev/null +++ b/netty/src/main/java/io/grpc/netty/WriteCombiningHandler.java @@ -0,0 +1,82 @@ +/* + * Copyright 2016, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package io.grpc.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufAllocator; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import io.netty.util.ReferenceCountUtil; + +public class WriteCombiningHandler extends ChannelOutboundHandlerAdapter { + + private static final int BUFFER_SIZE = 4096; + + private ByteBuf buffer; + private ByteBufAllocator allocator; + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + allocator = ctx.channel().alloc(); + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + ByteBuf data = (ByteBuf) msg; + if (buffer == null) { + buffer = allocator.directBuffer(BUFFER_SIZE, BUFFER_SIZE); + } + + if (buffer.writableBytes() >= data.readableBytes()) { + buffer.writeBytes(data); + promise.setSuccess(); + ReferenceCountUtil.safeRelease(data); + } else { + ctx.write(buffer); + buffer = null; + if (data.readableBytes() < BUFFER_SIZE) { + write(ctx, msg, promise); + } else { + ctx.write(msg, promise); + } + } + } + + @Override + public void flush(ChannelHandlerContext ctx) throws Exception { + if (buffer != null && buffer.readableBytes() > 0) { + ctx.write(buffer, ctx.voidPromise()); + buffer = null; + } + ctx.flush(); + } +} From 08698831f713b12415450191ced66322769ed0bc Mon Sep 17 00:00:00 2001 From: buchgr Date: Thu, 30 Jun 2016 17:02:04 +0200 Subject: [PATCH 2/6] address norman's comment --- .../main/java/io/grpc/netty/WriteCombiningHandler.java | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/netty/src/main/java/io/grpc/netty/WriteCombiningHandler.java b/netty/src/main/java/io/grpc/netty/WriteCombiningHandler.java index 38e56ebca80..123163f3786 100644 --- a/netty/src/main/java/io/grpc/netty/WriteCombiningHandler.java +++ b/netty/src/main/java/io/grpc/netty/WriteCombiningHandler.java @@ -49,6 +49,14 @@ public void handlerAdded(ChannelHandlerContext ctx) throws Exception { allocator = ctx.channel().alloc(); } + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + if (buffer != null) { + ReferenceCountUtil.safeRelease(buffer); + buffer = null; + } + } + @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { ByteBuf data = (ByteBuf) msg; @@ -79,4 +87,6 @@ public void flush(ChannelHandlerContext ctx) throws Exception { } ctx.flush(); } + + } From b09bde56bb3a217e79f25fa17f4acc2bd22ec9ee Mon Sep 17 00:00:00 2001 From: buchgr Date: Thu, 28 Jul 2016 18:32:44 +0200 Subject: [PATCH 3/6] wip --- benchmarks/build.gradle | 5 +- .../grpc/benchmarks/netty/WriteBenchmark.java | 297 ++++++++++++++++++ .../io/grpc/netty/WriteCombiningHandler.java | 117 +++++-- .../grpc/netty/WriteCombiningHandlerTest.java | 77 +++++ 4 files changed, 465 insertions(+), 31 deletions(-) create mode 100644 benchmarks/src/jmh/java/io/grpc/benchmarks/netty/WriteBenchmark.java create mode 100644 netty/src/test/java/io/grpc/netty/WriteCombiningHandlerTest.java diff --git a/benchmarks/build.gradle b/benchmarks/build.gradle index 02d035f3f09..1513a5088f7 100644 --- a/benchmarks/build.gradle +++ b/benchmarks/build.gradle @@ -24,10 +24,11 @@ run.enabled = false jmh { jmhVersion = '1.12' - warmupIterations = 10 - iterations = 10 + warmupIterations = 5 + iterations = 1 fork = 1 jvmArgs = "-server -Xms2g -Xmx2g" + include = "io.grpc.benchmarks.netty.WriteBenchmark" } dependencies { diff --git a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/WriteBenchmark.java b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/WriteBenchmark.java new file mode 100644 index 00000000000..1723c4772e3 --- /dev/null +++ b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/WriteBenchmark.java @@ -0,0 +1,297 @@ +/* + * Copyright 2016, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.benchmarks.netty; + +import static io.grpc.benchmarks.netty.WriteBenchmark.ClientHandler.NO_HANDLER; + +import io.grpc.netty.WriteCombiningHandler; +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerAdapter; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import org.HdrHistogram.Histogram; +import org.HdrHistogram.HistogramIterationValue; +import org.openjdk.jmh.annotations.AuxCounters; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.concurrent.TimeUnit; + +/** + * foo bar. + */ +@State(Scope.Thread) +@Warmup(iterations = 1) +@Measurement(iterations = 1) +@Fork(1) +public class WriteBenchmark { + + private EventLoopGroup eventLoop; + private DiscardReadsHandler serverHandler; + private Channel serverChannel; + private Channel channel; + + private WriteTask writeTask; + private int numIterations = 100 * 1000; + + @Param + public ClientHandler clientHandler; + + @Param({"1", "2", "4", "8", "16", "32", "64", "128"}) + public int numWrites; + + @Param({"128", "256", "512", "1024", "2048", "4096", "8192"}) + public int bufferSize; + + @Setup + public void setup() throws Exception { + ChannelHandler handler = NO_HANDLER.equals(clientHandler) + ? new ChannelHandlerAdapter() { } + : new WriteCombiningHandler(); + setupChannelAndEventLoop(new WriteCombiningHandler()); + + int[] bufferSizes = new int[numWrites]; + for (int i = 0; i < bufferSizes.length; i++) { + bufferSizes[i] = bufferSize / numWrites; + } + writeTask = new ManyWritesAndFlush(channel, bufferSizes); + } + + private void setupChannelAndEventLoop(ChannelHandler channelHandler) throws Exception { + eventLoop = new NioEventLoopGroup(1); + //((NioEventLoopGroup) eventLoop).setIoRatio(100); + + serverHandler = new DiscardReadsHandler(); + final ServerBootstrap sb = new ServerBootstrap(); + sb.group(eventLoop) + .option(ChannelOption.SO_RCVBUF, 10 * 1024 * 1024) + .channel(NioServerSocketChannel.class) + .childHandler(serverHandler); + serverChannel = sb.bind(0).sync().channel(); + + Bootstrap cb = new Bootstrap(); + cb.group(eventLoop) + .option(ChannelOption.SO_SNDBUF, 10 * 1024 * 1024) + .channel(NioSocketChannel.class) + .handler(channelHandler); + + channel = cb.connect(serverChannel.localAddress()).sync().channel(); + } + + @TearDown + public void tearDown() throws InterruptedException { + channel.close().sync().await(1, TimeUnit.SECONDS); + serverChannel.close().sync().await(1, TimeUnit.SECONDS); + eventLoop.shutdownGracefully().await(1, TimeUnit.SECONDS); + } + + /** + * Foo bar. + */ + @Benchmark + public void writeAndFlush(LatencyCounters counters) throws InterruptedException { + Histogram hist = new Histogram(60000000L, 3); + writeTask.histogram(hist); + counters.histogram(hist); + for (int i = 0; i < numIterations; i++) { + eventLoop.execute(writeTask); + } + + int i = 0; + while (i < 20 && numIterations != hist.getTotalCount()) { + Thread.sleep(500); + i++; + } + + assert serverHandler.bytesDiscarded > 0; + } + + @AuxCounters + @State(Scope.Thread) + public static class LatencyCounters { + + private Histogram hist; + + void histogram(Histogram hist) { + this.hist = hist; + } + + public long pctl10_nanos() { + return hist.getValueAtPercentile(10); + } + + public long pctl30_nanos() { + return hist.getValueAtPercentile(30); + } + + public long pctl50_nanos() { + return hist.getValueAtPercentile(50); + } + + public long pctl70_nanos() { + return hist.getValueAtPercentile(70); + } + + public long pctl90_nanos() { + return hist.getValueAtPercentile(90); + } + + public long trimmedMean_nanos() { + return trimmedMean(hist); + } + } + + private static long trimmedMean(Histogram hist) { + long sum = 0; + int count = 0; + for (HistogramIterationValue value : hist.recordedValues()) { + if (value.getPercentile() > 10 && value.getPercentile() < 90) { + sum += hist.medianEquivalentValue(value.getValueIteratedTo()) + * value.getCountAtValueIteratedTo(); + count += value.getCountAtValueIteratedTo(); + } + } + double mean = sum * 1.0 / count; + return (long) mean; + } + + + private abstract static class WriteTask implements Runnable { + + final Channel channel; + Histogram hist; + + WriteTask(Channel channel) { + this.channel = channel; + } + + /** + * Needs to be the first method called. + */ + void histogram(Histogram hist) { + this.hist = hist; + } + + ChannelPromise newWriteDurationRecordingPromise(final long startNanos) { + return channel.newPromise().addListener(new GenericFutureListener>() { + @Override + public void operationComplete(Future future) throws Exception { + long durationNanos = System.nanoTime() - startNanos; + hist.recordValue(durationNanos); + } + }); + } + } + + private static final class OneWriteAndFlush extends WriteTask { + + final ByteBuf buffer; + + OneWriteAndFlush(Channel channel, int numBytes) { + super(channel); + buffer = channel.alloc().directBuffer(numBytes, numBytes).writeZero(numBytes); + } + + @Override + public void run() { + final ByteBuf b1 = buffer.retainedDuplicate(); + final long start = System.nanoTime(); + channel.write(b1, newWriteDurationRecordingPromise(start)); + channel.flush(); + } + } + + private static final class ManyWritesAndFlush extends WriteTask { + + final ByteBuf[] buffers; + + ManyWritesAndFlush(Channel channel, int[] bufferSizes) { + super(channel); + + buffers = new ByteBuf[bufferSizes.length]; + for (int i = 0; i < bufferSizes.length; i++) { + int bytes = bufferSizes[i]; + buffers[i] = channel.alloc().directBuffer(bytes, bytes).writeZero(bytes); + } + } + + @Override + public void run() { + final ByteBuf[] buffers0 = new ByteBuf[buffers.length]; + for (int i = 0; i < buffers0.length; i++) { + buffers0[i] = buffers[i].retainedDuplicate(); + } + // Start measuring + final long start = System.nanoTime(); + for (int i = 0; i < buffers0.length - 1; i++) { + channel.write(buffers0[i]); + } + channel.write(buffers0[buffers0.length - 1], newWriteDurationRecordingPromise(start)); + channel.flush(); + } + } + + static final class DiscardReadsHandler extends ChannelDuplexHandler { + + volatile long bytesDiscarded; + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + bytesDiscarded += ((ByteBuf) msg).readableBytes(); + ReferenceCountUtil.safeRelease(msg); + } + } + + public enum ClientHandler { + NO_HANDLER, WRITE_COMBINING + } +} diff --git a/netty/src/main/java/io/grpc/netty/WriteCombiningHandler.java b/netty/src/main/java/io/grpc/netty/WriteCombiningHandler.java index 123163f3786..335b3f900f5 100644 --- a/netty/src/main/java/io/grpc/netty/WriteCombiningHandler.java +++ b/netty/src/main/java/io/grpc/netty/WriteCombiningHandler.java @@ -30,63 +30,122 @@ */ package io.grpc.netty; +import static com.google.common.base.Preconditions.checkNotNull; +import static io.netty.util.ReferenceCountUtil.safeRelease; + import io.netty.buffer.ByteBuf; -import io.netty.buffer.ByteBufAllocator; +import io.netty.buffer.EmptyByteBuf; +import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; -import io.netty.util.ReferenceCountUtil; +import io.netty.channel.DefaultChannelPromise; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; + +import java.util.ArrayList; +import java.util.List; public class WriteCombiningHandler extends ChannelOutboundHandlerAdapter { - private static final int BUFFER_SIZE = 4096; + private static final int INITIAL_BUFFER_SIZE = 4096; + private static final int MAX_COPY_BUFFER_SIZE = 256; + + private static final ByteBuf EMPTY_BUFFER = new EmptyByteBuf(UnpooledByteBufAllocator.DEFAULT); + + private ByteBuf buffer = EMPTY_BUFFER; + private CollectivePromise bufferPromise; - private ByteBuf buffer; - private ByteBufAllocator allocator; + private ChannelHandlerContext ctx; @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - allocator = ctx.channel().alloc(); + assert buffer == EMPTY_BUFFER; + this.ctx = ctx; } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { - if (buffer != null) { - ReferenceCountUtil.safeRelease(buffer); - buffer = null; - } + safeRelease(buffer); + buffer = EMPTY_BUFFER; + this.ctx = null; } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { - ByteBuf data = (ByteBuf) msg; - if (buffer == null) { - buffer = allocator.directBuffer(BUFFER_SIZE, BUFFER_SIZE); + if (!(msg instanceof ByteBuf)) { + writeBuffer(); + ctx.write(msg, promise); + return; } - - if (buffer.writableBytes() >= data.readableBytes()) { - buffer.writeBytes(data); - promise.setSuccess(); - ReferenceCountUtil.safeRelease(data); + final ByteBuf data = (ByteBuf) msg; + if (data.readableBytes() > MAX_COPY_BUFFER_SIZE) { + writeBuffer(); + ctx.write(data, promise); + } else if (data.readableBytes() > buffer.writableBytes()) { + writeBuffer(); + newBufferAndPromise(); + copyBytes(data, promise); } else { - ctx.write(buffer); - buffer = null; - if (data.readableBytes() < BUFFER_SIZE) { - write(ctx, msg, promise); - } else { - ctx.write(msg, promise); - } + copyBytes(data, promise); } } @Override public void flush(ChannelHandlerContext ctx) throws Exception { - if (buffer != null && buffer.readableBytes() > 0) { - ctx.write(buffer, ctx.voidPromise()); - buffer = null; - } + writeBuffer(); + newBufferAndPromise(); ctx.flush(); } + private void writeBuffer() { + if (!buffer.isReadable()) { + return; + } + ctx.write(buffer.retainedSlice(), bufferPromise); + } + private void newBufferAndPromise() { + if (buffer.writableBytes() > MAX_COPY_BUFFER_SIZE * 2) { + buffer = buffer.slice(buffer.writerIndex(), buffer.writableBytes()); + } else { + safeRelease(buffer); + buffer = ctx.alloc().directBuffer(INITIAL_BUFFER_SIZE, INITIAL_BUFFER_SIZE); + } + bufferPromise = new CollectivePromise(ctx); + } + + private void copyBytes(ByteBuf buf, ChannelPromise promise) { + buffer.writeBytes(buf); + bufferPromise.add(promise); + safeRelease(buf); + } + + private static final class CollectivePromise extends DefaultChannelPromise + implements FutureListener { + + private final List promises = new ArrayList(); + + CollectivePromise(ChannelHandlerContext ctx) { + super(ctx.channel(), ctx.executor()); + } + + void add(ChannelPromise promise) { + checkNotNull(promise); + promises.add(promise); + } + + @Override + public void operationComplete(Future future) throws Exception { + final boolean isSuccess = future.isSuccess(); + for (int i = 0; i < promises.size(); i++) { + ChannelPromise promise = promises.get(i); + if (isSuccess) { + promise.trySuccess(); + } else { + promise.tryFailure(future.cause()); + } + } + } + } } diff --git a/netty/src/test/java/io/grpc/netty/WriteCombiningHandlerTest.java b/netty/src/test/java/io/grpc/netty/WriteCombiningHandlerTest.java new file mode 100644 index 00000000000..e6adf4be642 --- /dev/null +++ b/netty/src/test/java/io/grpc/netty/WriteCombiningHandlerTest.java @@ -0,0 +1,77 @@ +/* + * Copyright 2016, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package io.grpc.netty; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelPromise; +import io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Before; +import org.junit.Test; + +import java.net.InetSocketAddress; + +/** + * Tests for {@link WriteCombiningHandler} + */ +public class WriteCombiningHandlerTest { + + private EmbeddedChannel channel; + + @Before + public void setup() { + channel = new EmbeddedChannel(); + channel.connect(new InetSocketAddress(0)); + channel.pipeline().addLast(new WriteCombiningHandler()); + } + + @Test + public void basicFunctioning() { + ChannelPromise p1 = channel.newPromise(); + ChannelPromise p2 = channel.newPromise(); + channel.write(buf(10), p1); + channel.write(buf(10), p2); + channel.flush(); + + ByteBuf combined = channel.readInbound(); + assertEquals(20, combined.readableBytes()); + assertTrue(p1.isSuccess()); + assertTrue(p2.isSuccess()); + } + + ByteBuf buf(int size) { + return channel.alloc().directBuffer(size, size).writeZero(size); + } + + +} From e3b87a9b0315d2d13f3be130adddfc1aff7013eb Mon Sep 17 00:00:00 2001 From: buchgr Date: Fri, 29 Jul 2016 22:57:46 +0200 Subject: [PATCH 4/6] some fixes --- .../io/grpc/netty/WriteCombiningHandler.java | 16 ++-- .../grpc/netty/WriteCombiningHandlerTest.java | 74 ++++++++++++++++--- 2 files changed, 72 insertions(+), 18 deletions(-) diff --git a/netty/src/main/java/io/grpc/netty/WriteCombiningHandler.java b/netty/src/main/java/io/grpc/netty/WriteCombiningHandler.java index 335b3f900f5..b473f2d5d2a 100644 --- a/netty/src/main/java/io/grpc/netty/WriteCombiningHandler.java +++ b/netty/src/main/java/io/grpc/netty/WriteCombiningHandler.java @@ -51,23 +51,23 @@ public class WriteCombiningHandler extends ChannelOutboundHandlerAdapter { private static final int INITIAL_BUFFER_SIZE = 4096; private static final int MAX_COPY_BUFFER_SIZE = 256; - private static final ByteBuf EMPTY_BUFFER = new EmptyByteBuf(UnpooledByteBufAllocator.DEFAULT); - - private ByteBuf buffer = EMPTY_BUFFER; + private ByteBuf buffer; private CollectivePromise bufferPromise; private ChannelHandlerContext ctx; @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { - assert buffer == EMPTY_BUFFER; + buffer = ctx.alloc().directBuffer(INITIAL_BUFFER_SIZE, INITIAL_BUFFER_SIZE); + bufferPromise = new CollectivePromise(ctx); this.ctx = ctx; } @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { safeRelease(buffer); - buffer = EMPTY_BUFFER; + buffer = null; + bufferPromise = null; this.ctx = null; } @@ -84,7 +84,6 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) ctx.write(data, promise); } else if (data.readableBytes() > buffer.writableBytes()) { writeBuffer(); - newBufferAndPromise(); copyBytes(data, promise); } else { copyBytes(data, promise); @@ -94,7 +93,6 @@ public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) @Override public void flush(ChannelHandlerContext ctx) throws Exception { writeBuffer(); - newBufferAndPromise(); ctx.flush(); } @@ -103,13 +101,16 @@ private void writeBuffer() { return; } ctx.write(buffer.retainedSlice(), bufferPromise); + newBufferAndPromise(); } private void newBufferAndPromise() { if (buffer.writableBytes() > MAX_COPY_BUFFER_SIZE * 2) { buffer = buffer.slice(buffer.writerIndex(), buffer.writableBytes()); + buffer.writerIndex(0); } else { safeRelease(buffer); + assert buffer.refCnt() == 0; buffer = ctx.alloc().directBuffer(INITIAL_BUFFER_SIZE, INITIAL_BUFFER_SIZE); } bufferPromise = new CollectivePromise(ctx); @@ -128,6 +129,7 @@ private static final class CollectivePromise extends DefaultChannelPromise CollectivePromise(ChannelHandlerContext ctx) { super(ctx.channel(), ctx.executor()); + addListener(this); } void add(ChannelPromise promise) { diff --git a/netty/src/test/java/io/grpc/netty/WriteCombiningHandlerTest.java b/netty/src/test/java/io/grpc/netty/WriteCombiningHandlerTest.java index e6adf4be642..94cd889fc60 100644 --- a/netty/src/test/java/io/grpc/netty/WriteCombiningHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/WriteCombiningHandlerTest.java @@ -30,9 +30,6 @@ */ package io.grpc.netty; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelPromise; import io.netty.channel.embedded.EmbeddedChannel; @@ -41,6 +38,8 @@ import java.net.InetSocketAddress; +import static org.junit.Assert.*; + /** * Tests for {@link WriteCombiningHandler} */ @@ -49,29 +48,82 @@ public class WriteCombiningHandlerTest { private EmbeddedChannel channel; @Before - public void setup() { + public void setup() throws InterruptedException { channel = new EmbeddedChannel(); - channel.connect(new InetSocketAddress(0)); + channel.connect(new InetSocketAddress(0)).sync(); channel.pipeline().addLast(new WriteCombiningHandler()); } @Test - public void basicFunctioning() { + public void basicWriteCombining() { + ChannelPromise p1 = channel.newPromise(); + ChannelPromise p2 = channel.newPromise(); + ChannelPromise p3 = channel.newPromise(); + ChannelPromise p4 = channel.newPromise(); + + channel.write(buf(50), p1); + channel.write(buf(5), p2); + channel.write(buf(95), p3); + assertNull(channel.readOutbound()); + channel.flush(); + channel.write(buf(10), p4); + + ByteBuf combined = channel.readOutbound(); + assertNull(channel.readOutbound()); + assertEquals(150, combined.readableBytes()); + assertTrue(p1.isSuccess()); + assertTrue(p2.isSuccess()); + assertTrue(p3.isSuccess()); + assertFalse(p4.isSuccess()); + + channel.flush(); + combined = channel.readOutbound(); + assertEquals(10, combined.readableBytes()); + assertTrue(p4.isSuccess()); + } + + @Test + public void largeWritesShouldNotBeCombined() { ChannelPromise p1 = channel.newPromise(); ChannelPromise p2 = channel.newPromise(); + channel.write(buf(1024), p1); + channel.write(buf(2048), p2); + channel.flush(); + ByteBuf b = channel.readOutbound(); + assertEquals(1024, b.readableBytes()); + assertTrue(p1.isSuccess()); + b = channel.readOutbound(); + assertEquals(2048, b.readableBytes()); + assertTrue(p2.isSuccess()); + } + + @Test + public void mixingLargeAndCombinedWritesShouldWork() { + ChannelPromise p1 = channel.newPromise(); + ChannelPromise p2 = channel.newPromise(); + ChannelPromise p3 = channel.newPromise(); + ChannelPromise p4 = channel.newPromise(); + channel.write(buf(10), p1); - channel.write(buf(10), p2); + channel.write(buf(100), p2); + channel.write(buf(1024), p3); + channel.write(buf(10), p4); channel.flush(); - ByteBuf combined = channel.readInbound(); - assertEquals(20, combined.readableBytes()); + ByteBuf b = channel.readOutbound(); + assertEquals(110, b.readableBytes()); + b = channel.readOutbound(); + assertEquals(1024, b.readableBytes()); + b = channel.readOutbound(); + assertEquals(10, b.readableBytes()); + assertTrue(p1.isSuccess()); assertTrue(p2.isSuccess()); + assertTrue(p3.isSuccess()); + assertTrue(p4.isSuccess()); } ByteBuf buf(int size) { return channel.alloc().directBuffer(size, size).writeZero(size); } - - } From 2d0b374ab5731edcd4ae768466834fa557b11710 Mon Sep 17 00:00:00 2001 From: buchgr Date: Sat, 30 Jul 2016 19:26:08 +0200 Subject: [PATCH 5/6] fixes --- .../grpc/benchmarks/netty/WriteBenchmark.java | 33 +++++++++++-------- 1 file changed, 19 insertions(+), 14 deletions(-) diff --git a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/WriteBenchmark.java b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/WriteBenchmark.java index 1723c4772e3..41adf64fdf6 100644 --- a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/WriteBenchmark.java +++ b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/WriteBenchmark.java @@ -55,8 +55,11 @@ import org.HdrHistogram.HistogramIterationValue; import org.openjdk.jmh.annotations.AuxCounters; import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; @@ -71,7 +74,8 @@ */ @State(Scope.Thread) @Warmup(iterations = 1) -@Measurement(iterations = 1) +@Measurement(iterations = 1, time = 1) +@BenchmarkMode(Mode.Throughput) @Fork(1) public class WriteBenchmark { @@ -81,7 +85,7 @@ public class WriteBenchmark { private Channel channel; private WriteTask writeTask; - private int numIterations = 100 * 1000; + private final int numIterations = 10 * 1000; @Param public ClientHandler clientHandler; @@ -97,7 +101,7 @@ public void setup() throws Exception { ChannelHandler handler = NO_HANDLER.equals(clientHandler) ? new ChannelHandlerAdapter() { } : new WriteCombiningHandler(); - setupChannelAndEventLoop(new WriteCombiningHandler()); + setupChannelAndEventLoop(handler); int[] bufferSizes = new int[numWrites]; for (int i = 0; i < bufferSizes.length; i++) { @@ -139,17 +143,13 @@ public void tearDown() throws InterruptedException { */ @Benchmark public void writeAndFlush(LatencyCounters counters) throws InterruptedException { - Histogram hist = new Histogram(60000000L, 3); - writeTask.histogram(hist); - counters.histogram(hist); + writeTask.histogram(counters.histogram()); for (int i = 0; i < numIterations; i++) { eventLoop.execute(writeTask); } - int i = 0; - while (i < 20 && numIterations != hist.getTotalCount()) { - Thread.sleep(500); - i++; + while (numIterations != counters.histogram().getTotalCount()) { + Thread.sleep(1000); } assert serverHandler.bytesDiscarded > 0; @@ -159,10 +159,15 @@ public void writeAndFlush(LatencyCounters counters) throws InterruptedException @State(Scope.Thread) public static class LatencyCounters { - private Histogram hist; + private Histogram hist = new Histogram(60000000L, 3); - void histogram(Histogram hist) { - this.hist = hist; + Histogram histogram() { + return hist; + } + + @Setup(Level.Iteration) + public void clean() { + hist.reset(); } public long pctl10_nanos() { @@ -226,7 +231,7 @@ ChannelPromise newWriteDurationRecordingPromise(final long startNanos) { @Override public void operationComplete(Future future) throws Exception { long durationNanos = System.nanoTime() - startNanos; - hist.recordValue(durationNanos); + hist.recordValue(100); } }); } From 08c67f80d6ff33e350e02891a5fc9f067cf192bd Mon Sep 17 00:00:00 2001 From: buchgr Date: Tue, 2 Aug 2016 10:35:05 +0300 Subject: [PATCH 6/6] some fixes --- .../java/io/grpc/netty/WriteCombiningHandler.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) diff --git a/netty/src/main/java/io/grpc/netty/WriteCombiningHandler.java b/netty/src/main/java/io/grpc/netty/WriteCombiningHandler.java index b473f2d5d2a..e188a164e8f 100644 --- a/netty/src/main/java/io/grpc/netty/WriteCombiningHandler.java +++ b/netty/src/main/java/io/grpc/netty/WriteCombiningHandler.java @@ -34,8 +34,6 @@ import static io.netty.util.ReferenceCountUtil.safeRelease; import io.netty.buffer.ByteBuf; -import io.netty.buffer.EmptyByteBuf; -import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelOutboundHandlerAdapter; import io.netty.channel.ChannelPromise; @@ -46,6 +44,9 @@ import java.util.ArrayList; import java.util.List; +/** + * A handler that combines small writes into a larger {@link ByteBuf} + */ public class WriteCombiningHandler extends ChannelOutboundHandlerAdapter { private static final int INITIAL_BUFFER_SIZE = 4096; @@ -65,12 +66,19 @@ public void handlerAdded(ChannelHandlerContext ctx) throws Exception { @Override public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + writeBuffer(); safeRelease(buffer); buffer = null; bufferPromise = null; this.ctx = null; } + @Override + public void close(ChannelHandlerContext ctx, ChannelPromise promise) { + writeBuffer(); + ctx.close(promise); + } + @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { if (!(msg instanceof ByteBuf)) { @@ -97,7 +105,7 @@ public void flush(ChannelHandlerContext ctx) throws Exception { } private void writeBuffer() { - if (!buffer.isReadable()) { + if (buffer == null || !buffer.isReadable()) { return; } ctx.write(buffer.retainedSlice(), bufferPromise);