diff --git a/benchmarks/build.gradle b/benchmarks/build.gradle index 02d035f3f09..1513a5088f7 100644 --- a/benchmarks/build.gradle +++ b/benchmarks/build.gradle @@ -24,10 +24,11 @@ run.enabled = false jmh { jmhVersion = '1.12' - warmupIterations = 10 - iterations = 10 + warmupIterations = 5 + iterations = 1 fork = 1 jvmArgs = "-server -Xms2g -Xmx2g" + include = "io.grpc.benchmarks.netty.WriteBenchmark" } dependencies { diff --git a/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/WriteBenchmark.java b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/WriteBenchmark.java new file mode 100644 index 00000000000..41adf64fdf6 --- /dev/null +++ b/benchmarks/src/jmh/java/io/grpc/benchmarks/netty/WriteBenchmark.java @@ -0,0 +1,302 @@ +/* + * Copyright 2016, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.benchmarks.netty; + +import static io.grpc.benchmarks.netty.WriteBenchmark.ClientHandler.NO_HANDLER; + +import io.grpc.netty.WriteCombiningHandler; +import io.netty.bootstrap.Bootstrap; +import io.netty.bootstrap.ServerBootstrap; +import io.netty.buffer.ByteBuf; +import io.netty.channel.Channel; +import io.netty.channel.ChannelDuplexHandler; +import io.netty.channel.ChannelHandler; +import io.netty.channel.ChannelHandlerAdapter; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOption; +import io.netty.channel.ChannelPromise; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.ReferenceCountUtil; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; +import org.HdrHistogram.Histogram; +import org.HdrHistogram.HistogramIterationValue; +import org.openjdk.jmh.annotations.AuxCounters; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.concurrent.TimeUnit; + +/** + * foo bar. + */ +@State(Scope.Thread) +@Warmup(iterations = 1) +@Measurement(iterations = 1, time = 1) +@BenchmarkMode(Mode.Throughput) +@Fork(1) +public class WriteBenchmark { + + private EventLoopGroup eventLoop; + private DiscardReadsHandler serverHandler; + private Channel serverChannel; + private Channel channel; + + private WriteTask writeTask; + private final int numIterations = 10 * 1000; + + @Param + public ClientHandler clientHandler; + + @Param({"1", "2", "4", "8", "16", "32", "64", "128"}) + public int numWrites; + + @Param({"128", "256", "512", "1024", "2048", "4096", "8192"}) + public int bufferSize; + + @Setup + public void setup() throws Exception { + ChannelHandler handler = NO_HANDLER.equals(clientHandler) + ? new ChannelHandlerAdapter() { } + : new WriteCombiningHandler(); + setupChannelAndEventLoop(handler); + + int[] bufferSizes = new int[numWrites]; + for (int i = 0; i < bufferSizes.length; i++) { + bufferSizes[i] = bufferSize / numWrites; + } + writeTask = new ManyWritesAndFlush(channel, bufferSizes); + } + + private void setupChannelAndEventLoop(ChannelHandler channelHandler) throws Exception { + eventLoop = new NioEventLoopGroup(1); + //((NioEventLoopGroup) eventLoop).setIoRatio(100); + + serverHandler = new DiscardReadsHandler(); + final ServerBootstrap sb = new ServerBootstrap(); + sb.group(eventLoop) + .option(ChannelOption.SO_RCVBUF, 10 * 1024 * 1024) + .channel(NioServerSocketChannel.class) + .childHandler(serverHandler); + serverChannel = sb.bind(0).sync().channel(); + + Bootstrap cb = new Bootstrap(); + cb.group(eventLoop) + .option(ChannelOption.SO_SNDBUF, 10 * 1024 * 1024) + .channel(NioSocketChannel.class) + .handler(channelHandler); + + channel = cb.connect(serverChannel.localAddress()).sync().channel(); + } + + @TearDown + public void tearDown() throws InterruptedException { + channel.close().sync().await(1, TimeUnit.SECONDS); + serverChannel.close().sync().await(1, TimeUnit.SECONDS); + eventLoop.shutdownGracefully().await(1, TimeUnit.SECONDS); + } + + /** + * Foo bar. + */ + @Benchmark + public void writeAndFlush(LatencyCounters counters) throws InterruptedException { + writeTask.histogram(counters.histogram()); + for (int i = 0; i < numIterations; i++) { + eventLoop.execute(writeTask); + } + + while (numIterations != counters.histogram().getTotalCount()) { + Thread.sleep(1000); + } + + assert serverHandler.bytesDiscarded > 0; + } + + @AuxCounters + @State(Scope.Thread) + public static class LatencyCounters { + + private Histogram hist = new Histogram(60000000L, 3); + + Histogram histogram() { + return hist; + } + + @Setup(Level.Iteration) + public void clean() { + hist.reset(); + } + + public long pctl10_nanos() { + return hist.getValueAtPercentile(10); + } + + public long pctl30_nanos() { + return hist.getValueAtPercentile(30); + } + + public long pctl50_nanos() { + return hist.getValueAtPercentile(50); + } + + public long pctl70_nanos() { + return hist.getValueAtPercentile(70); + } + + public long pctl90_nanos() { + return hist.getValueAtPercentile(90); + } + + public long trimmedMean_nanos() { + return trimmedMean(hist); + } + } + + private static long trimmedMean(Histogram hist) { + long sum = 0; + int count = 0; + for (HistogramIterationValue value : hist.recordedValues()) { + if (value.getPercentile() > 10 && value.getPercentile() < 90) { + sum += hist.medianEquivalentValue(value.getValueIteratedTo()) + * value.getCountAtValueIteratedTo(); + count += value.getCountAtValueIteratedTo(); + } + } + double mean = sum * 1.0 / count; + return (long) mean; + } + + + private abstract static class WriteTask implements Runnable { + + final Channel channel; + Histogram hist; + + WriteTask(Channel channel) { + this.channel = channel; + } + + /** + * Needs to be the first method called. + */ + void histogram(Histogram hist) { + this.hist = hist; + } + + ChannelPromise newWriteDurationRecordingPromise(final long startNanos) { + return channel.newPromise().addListener(new GenericFutureListener>() { + @Override + public void operationComplete(Future future) throws Exception { + long durationNanos = System.nanoTime() - startNanos; + hist.recordValue(100); + } + }); + } + } + + private static final class OneWriteAndFlush extends WriteTask { + + final ByteBuf buffer; + + OneWriteAndFlush(Channel channel, int numBytes) { + super(channel); + buffer = channel.alloc().directBuffer(numBytes, numBytes).writeZero(numBytes); + } + + @Override + public void run() { + final ByteBuf b1 = buffer.retainedDuplicate(); + final long start = System.nanoTime(); + channel.write(b1, newWriteDurationRecordingPromise(start)); + channel.flush(); + } + } + + private static final class ManyWritesAndFlush extends WriteTask { + + final ByteBuf[] buffers; + + ManyWritesAndFlush(Channel channel, int[] bufferSizes) { + super(channel); + + buffers = new ByteBuf[bufferSizes.length]; + for (int i = 0; i < bufferSizes.length; i++) { + int bytes = bufferSizes[i]; + buffers[i] = channel.alloc().directBuffer(bytes, bytes).writeZero(bytes); + } + } + + @Override + public void run() { + final ByteBuf[] buffers0 = new ByteBuf[buffers.length]; + for (int i = 0; i < buffers0.length; i++) { + buffers0[i] = buffers[i].retainedDuplicate(); + } + // Start measuring + final long start = System.nanoTime(); + for (int i = 0; i < buffers0.length - 1; i++) { + channel.write(buffers0[i]); + } + channel.write(buffers0[buffers0.length - 1], newWriteDurationRecordingPromise(start)); + channel.flush(); + } + } + + static final class DiscardReadsHandler extends ChannelDuplexHandler { + + volatile long bytesDiscarded; + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + bytesDiscarded += ((ByteBuf) msg).readableBytes(); + ReferenceCountUtil.safeRelease(msg); + } + } + + public enum ClientHandler { + NO_HANDLER, WRITE_COMBINING + } +} diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index fdda262ab8d..83d4fdf7cfd 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -192,6 +192,12 @@ public void onStreamRemoved(Http2Stream stream) { }); } + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + ctx.pipeline().addBefore(ctx.executor(), ctx.name(), null, new WriteCombiningHandler()); + super.handlerAdded(ctx); + } + /** * Handler for commands sent from the stream. */ diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index d519fc1e85e..3d29462dafd 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -167,6 +167,7 @@ Throwable connectionError() { @Override public void handlerAdded(ChannelHandlerContext ctx) throws Exception { serverWriteQueue = new WriteQueue(ctx.channel()); + ctx.pipeline().addBefore(ctx.executor(), ctx.name(), null, new WriteCombiningHandler()); super.handlerAdded(ctx); } diff --git a/netty/src/main/java/io/grpc/netty/WriteCombiningHandler.java b/netty/src/main/java/io/grpc/netty/WriteCombiningHandler.java new file mode 100644 index 00000000000..e188a164e8f --- /dev/null +++ b/netty/src/main/java/io/grpc/netty/WriteCombiningHandler.java @@ -0,0 +1,161 @@ +/* + * Copyright 2016, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package io.grpc.netty; + +import static com.google.common.base.Preconditions.checkNotNull; +import static io.netty.util.ReferenceCountUtil.safeRelease; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; +import io.netty.channel.DefaultChannelPromise; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.FutureListener; + +import java.util.ArrayList; +import java.util.List; + +/** + * A handler that combines small writes into a larger {@link ByteBuf} + */ +public class WriteCombiningHandler extends ChannelOutboundHandlerAdapter { + + private static final int INITIAL_BUFFER_SIZE = 4096; + private static final int MAX_COPY_BUFFER_SIZE = 256; + + private ByteBuf buffer; + private CollectivePromise bufferPromise; + + private ChannelHandlerContext ctx; + + @Override + public void handlerAdded(ChannelHandlerContext ctx) throws Exception { + buffer = ctx.alloc().directBuffer(INITIAL_BUFFER_SIZE, INITIAL_BUFFER_SIZE); + bufferPromise = new CollectivePromise(ctx); + this.ctx = ctx; + } + + @Override + public void handlerRemoved(ChannelHandlerContext ctx) throws Exception { + writeBuffer(); + safeRelease(buffer); + buffer = null; + bufferPromise = null; + this.ctx = null; + } + + @Override + public void close(ChannelHandlerContext ctx, ChannelPromise promise) { + writeBuffer(); + ctx.close(promise); + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { + if (!(msg instanceof ByteBuf)) { + writeBuffer(); + ctx.write(msg, promise); + return; + } + final ByteBuf data = (ByteBuf) msg; + if (data.readableBytes() > MAX_COPY_BUFFER_SIZE) { + writeBuffer(); + ctx.write(data, promise); + } else if (data.readableBytes() > buffer.writableBytes()) { + writeBuffer(); + copyBytes(data, promise); + } else { + copyBytes(data, promise); + } + } + + @Override + public void flush(ChannelHandlerContext ctx) throws Exception { + writeBuffer(); + ctx.flush(); + } + + private void writeBuffer() { + if (buffer == null || !buffer.isReadable()) { + return; + } + ctx.write(buffer.retainedSlice(), bufferPromise); + newBufferAndPromise(); + } + + private void newBufferAndPromise() { + if (buffer.writableBytes() > MAX_COPY_BUFFER_SIZE * 2) { + buffer = buffer.slice(buffer.writerIndex(), buffer.writableBytes()); + buffer.writerIndex(0); + } else { + safeRelease(buffer); + assert buffer.refCnt() == 0; + buffer = ctx.alloc().directBuffer(INITIAL_BUFFER_SIZE, INITIAL_BUFFER_SIZE); + } + bufferPromise = new CollectivePromise(ctx); + } + + private void copyBytes(ByteBuf buf, ChannelPromise promise) { + buffer.writeBytes(buf); + bufferPromise.add(promise); + safeRelease(buf); + } + + private static final class CollectivePromise extends DefaultChannelPromise + implements FutureListener { + + private final List promises = new ArrayList(); + + CollectivePromise(ChannelHandlerContext ctx) { + super(ctx.channel(), ctx.executor()); + addListener(this); + } + + void add(ChannelPromise promise) { + checkNotNull(promise); + promises.add(promise); + } + + @Override + public void operationComplete(Future future) throws Exception { + final boolean isSuccess = future.isSuccess(); + for (int i = 0; i < promises.size(); i++) { + ChannelPromise promise = promises.get(i); + if (isSuccess) { + promise.trySuccess(); + } else { + promise.tryFailure(future.cause()); + } + } + } + } +} diff --git a/netty/src/test/java/io/grpc/netty/WriteCombiningHandlerTest.java b/netty/src/test/java/io/grpc/netty/WriteCombiningHandlerTest.java new file mode 100644 index 00000000000..94cd889fc60 --- /dev/null +++ b/netty/src/test/java/io/grpc/netty/WriteCombiningHandlerTest.java @@ -0,0 +1,129 @@ +/* + * Copyright 2016, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ +package io.grpc.netty; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelPromise; +import io.netty.channel.embedded.EmbeddedChannel; +import org.junit.Before; +import org.junit.Test; + +import java.net.InetSocketAddress; + +import static org.junit.Assert.*; + +/** + * Tests for {@link WriteCombiningHandler} + */ +public class WriteCombiningHandlerTest { + + private EmbeddedChannel channel; + + @Before + public void setup() throws InterruptedException { + channel = new EmbeddedChannel(); + channel.connect(new InetSocketAddress(0)).sync(); + channel.pipeline().addLast(new WriteCombiningHandler()); + } + + @Test + public void basicWriteCombining() { + ChannelPromise p1 = channel.newPromise(); + ChannelPromise p2 = channel.newPromise(); + ChannelPromise p3 = channel.newPromise(); + ChannelPromise p4 = channel.newPromise(); + + channel.write(buf(50), p1); + channel.write(buf(5), p2); + channel.write(buf(95), p3); + assertNull(channel.readOutbound()); + channel.flush(); + channel.write(buf(10), p4); + + ByteBuf combined = channel.readOutbound(); + assertNull(channel.readOutbound()); + assertEquals(150, combined.readableBytes()); + assertTrue(p1.isSuccess()); + assertTrue(p2.isSuccess()); + assertTrue(p3.isSuccess()); + assertFalse(p4.isSuccess()); + + channel.flush(); + combined = channel.readOutbound(); + assertEquals(10, combined.readableBytes()); + assertTrue(p4.isSuccess()); + } + + @Test + public void largeWritesShouldNotBeCombined() { + ChannelPromise p1 = channel.newPromise(); + ChannelPromise p2 = channel.newPromise(); + channel.write(buf(1024), p1); + channel.write(buf(2048), p2); + channel.flush(); + ByteBuf b = channel.readOutbound(); + assertEquals(1024, b.readableBytes()); + assertTrue(p1.isSuccess()); + b = channel.readOutbound(); + assertEquals(2048, b.readableBytes()); + assertTrue(p2.isSuccess()); + } + + @Test + public void mixingLargeAndCombinedWritesShouldWork() { + ChannelPromise p1 = channel.newPromise(); + ChannelPromise p2 = channel.newPromise(); + ChannelPromise p3 = channel.newPromise(); + ChannelPromise p4 = channel.newPromise(); + + channel.write(buf(10), p1); + channel.write(buf(100), p2); + channel.write(buf(1024), p3); + channel.write(buf(10), p4); + channel.flush(); + + ByteBuf b = channel.readOutbound(); + assertEquals(110, b.readableBytes()); + b = channel.readOutbound(); + assertEquals(1024, b.readableBytes()); + b = channel.readOutbound(); + assertEquals(10, b.readableBytes()); + + assertTrue(p1.isSuccess()); + assertTrue(p2.isSuccess()); + assertTrue(p3.isSuccess()); + assertTrue(p4.isSuccess()); + } + + ByteBuf buf(int size) { + return channel.alloc().directBuffer(size, size).writeZero(size); + } +}