core,netty: retain buffer in MessageFramer to reuse for subsequent messages#2092
core,netty: retain buffer in MessageFramer to reuse for subsequent messages#2092carl-mastrangelo wants to merge 3 commits intogrpc:masterfrom
Conversation
…ssages When a buffer is allocated in the MessageFramer, the capacity is at least 4K in size, which leaves a lot of space wasted on small messages. This change alters the WritableBuffers to attempt to split themselves into a "remaining" buffer that reuses space. This should only have a serious impact on streaming, and only when messages are small. Benchmarks for FlowControlled messages per second are promising showing at least 10% Results: Benchmark (channelCount) (clientExecutor) (maxConcurrentStreams) (responseSize) Mode Cnt Score Error Units Score Error Units Speedup i.g.benchmarks.netty.FlowControlledMessagesPerSecondBenchmark.stream:messagesPerSecond 1 DEFAULT 1 SMALL thrpt 10 755282.751 ±29529.715 ops/s 905364.204 ±29097.292 ops/s 1.198708964 i.g.benchmarks.netty.FlowControlledMessagesPerSecondBenchmark.stream:messagesPerSecond 1 DEFAULT 2 SMALL thrpt 10 791301.174 ±20232.22 ops/s 994319.597 ±40626.872 ops/s 1.256562772 i.g.benchmarks.netty.FlowControlledMessagesPerSecondBenchmark.stream:messagesPerSecond 1 DEFAULT 10 SMALL thrpt 10 658210.954 ±21180.062 ops/s 857288.67 ±34114.984 ops/s 1.302452754 i.g.benchmarks.netty.FlowControlledMessagesPerSecondBenchmark.stream:messagesPerSecond 1 DEFAULT 100 SMALL thrpt 10 657083.367 ±19745.56 ops/s 944762.512 ±19204.798 ops/s 1.437812246 i.g.benchmarks.netty.FlowControlledMessagesPerSecondBenchmark.stream:messagesPerSecond 1 DIRECT 1 SMALL thrpt 10 887901.239 ±19285.937 ops/s 1343311.784 ±31011.844 ops/s 1.512906757 i.g.benchmarks.netty.FlowControlledMessagesPerSecondBenchmark.stream:messagesPerSecond 1 DIRECT 2 SMALL thrpt 10 970308.851 ±27653.139 ops/s 1307188.816 ±35358.558 ops/s 1.347188387 i.g.benchmarks.netty.FlowControlledMessagesPerSecondBenchmark.stream:messagesPerSecond 1 DIRECT 10 SMALL thrpt 10 735228.889 ±24770.534 ops/s 1199052.762 ±33300.275 ops/s 1.630856431 i.g.benchmarks.netty.FlowControlledMessagesPerSecondBenchmark.stream:messagesPerSecond 1 DIRECT 100 SMALL thrpt 10 716183.74 ±12900.773 ops/s 1127230.832 ±33133.574 ops/s 1.573940833 i.g.benchmarks.netty.FlowControlledMessagesPerSecondBenchmark.stream:messagesPerSecond 2 DEFAULT 1 SMALL thrpt 10 1291716.222 ±31902.31 ops/s 1573493.799 ±39530.656 ops/s 1.218142013 i.g.benchmarks.netty.FlowControlledMessagesPerSecondBenchmark.stream:messagesPerSecond 2 DEFAULT 2 SMALL thrpt 10 1268950.933 ±42048.729 ops/s 1536156.083 ±34455.726 ops/s 1.210571696 i.g.benchmarks.netty.FlowControlledMessagesPerSecondBenchmark.stream:messagesPerSecond 2 DEFAULT 10 SMALL thrpt 10 1128048.116 ±26606.668 ops/s 1356561.752 ±23734.552 ops/s 1.20257437 i.g.benchmarks.netty.FlowControlledMessagesPerSecondBenchmark.stream:messagesPerSecond 2 DEFAULT 100 SMALL thrpt 10 988027.512 ±23084.526 ops/s 1347382.882 ±30614.784 ops/s 1.363709882 i.g.benchmarks.netty.FlowControlledMessagesPerSecondBenchmark.stream:messagesPerSecond 2 DIRECT 1 SMALL thrpt 10 1478108.147 ±65561.972 ops/s 2163858.787 ±79516.791 ops/s 1.463938069 i.g.benchmarks.netty.FlowControlledMessagesPerSecondBenchmark.stream:messagesPerSecond 2 DIRECT 2 SMALL thrpt 10 1524911.028 ±61438.415 ops/s 2176216.821 ±73718.403 ops/s 1.427110685 i.g.benchmarks.netty.FlowControlledMessagesPerSecondBenchmark.stream:messagesPerSecond 2 DIRECT 10 SMALL thrpt 10 1313905.422 ±36649.59 ops/s 2055314.846 ±73080.768 ops/s 1.564279142 i.g.benchmarks.netty.FlowControlledMessagesPerSecondBenchmark.stream:messagesPerSecond 2 DIRECT 100 SMALL thrpt 10 1117786.572 ±35215.016 ops/s 1865375.865 ±49877.609 ops/s 1.668812197 i.g.benchmarks.netty.FlowControlledMessagesPerSecondBenchmark.stream:messagesPerSecond 4 DEFAULT 1 SMALL thrpt 10 1878378.751 ±26112.523 ops/s 1928202.574 ±43622.205 ops/s 1.026524908 i.g.benchmarks.netty.FlowControlledMessagesPerSecondBenchmark.stream:messagesPerSecond 4 DEFAULT 2 SMALL thrpt 10 1982666.038 ±24621.442 ops/s 1941969.538 ±40390.353 ops/s 0.9794738503 i.g.benchmarks.netty.FlowControlledMessagesPerSecondBenchmark.stream:messagesPerSecond 4 DEFAULT 10 SMALL thrpt 10 1707623.426 ±20835.524 ops/s 1725687.175 ±18794.404 ops/s 1.010578298 i.g.benchmarks.netty.FlowControlledMessagesPerSecondBenchmark.stream:messagesPerSecond 4 DEFAULT 100 SMALL thrpt 10 ≈0 ops/s 1490847.182 ± 43668.394 ops/s i.g.benchmarks.netty.FlowControlledMessagesPerSecondBenchmark.stream:messagesPerSecond 4 DIRECT 1 SMALL thrpt 10 2375070.989 ±53326.398 ops/s 3600611.491 ±68078.043 ops/s 1.51600163 i.g.benchmarks.netty.FlowControlledMessagesPerSecondBenchmark.stream:messagesPerSecond 4 DIRECT 2 SMALL thrpt 10 2462611.986 ±49230.177 ops/s 3370385.331 ±73550.54 ops/s 1.368622158 i.g.benchmarks.netty.FlowControlledMessagesPerSecondBenchmark.stream:messagesPerSecond 4 DIRECT 10 SMALL thrpt 10 2108888.769 ±37617.889 ops/s 3179585.616 ±75017.316 ops/s 1.507706648 i.g.benchmarks.netty.FlowControlledMessagesPerSecondBenchmark.stream:messagesPerSecond 4 DIRECT 100 SMALL thrpt 10 ≈0 ops/s 2292144.755 ± 69949.443 ops/s
|
@carl-mastrangelo, we can't merge this right now, right? Because otherwise we could leak buffers? |
|
I'm pretty sure we already leak buffers, if close / flush is never called. |
|
It is worth noting that reducing the minimum size of the buffer in NettyWritableBufferAllocator to 32 produces a similar effect. It looks like it may be slightly faster for the Flow Control benchmark. |
Which is not currently possible, with the exception of unary requests that lack a halfClose. It's hard to consider a unary request that lacks a halfClose that isn't a blatant bug in the application and wouldn't be noticed when the unary request is never sent. That leakage is impossible when using our stubs. With this PR the only way we don't leak is if halfClose/close is called (note that cancel() isn't enough). That impacts every outbound streaming RPC. That concerns me, and doesn't seem like something that can be ignored.
That sounds like a much easier solution. If it gets us performance in the same ballpark, I'd be much more eager to take it. If we're going to retain direct buffers in |
|
From skimming over the code it seems like here we are also allocating a 4K buffer for when we know we only need 5 bytes ... https://github.com/grpc/grpc-java/blob/master/core/src/main/java/io/grpc/internal/MessageFramer.java#L204
I agree with Eric that this sounds much simpler. Why 32? Is that specific to the benchmark payload size? Do we know how often piggy-backing writes in the message framer actually happens? |
|
32 = 5 bytes of frame which is large enough to hold small messages. Next size down is 16, which my gut says is too small. Not changing the smallest size was based on a conversion with Louis. I was under the impression that changing the size was a not feasible. Reusing the buffer also has some benefits, assuming that chunks are returned in powers of two. A stream of large messages that straddle the power of two boundary would still cause a lot of waste (possibly <1/2 the buffer). If the allocator returns chunks of power size 2^-3 (as in there will never be more than 12.5% waste) then it doesn't make much sense to reuse the buffer. That is a decision of the WritableBufferAllocator and the Writable buffer. They can decide if splitting the buffer makes sense based on decisions the MessageFramer can't which is why it can return null. As for your question about piggy backed writes: they happen in streaming RPCs, which is why the benchmarks no longer OOM with this change. Another common case is streaming large chunks of data from a database, as Cloud Spanner does. Having the stream close out the buffer safely is possible. |
I thought on server streaming we call |
|
@buchgr You are correct, but I don't think that is relevant. The next write after the flush is going to want a buffer. |
| buffer = null; | ||
| // If we have a buffer still and we are not trying to do the last flush on a close() call, | ||
| // try and reuse the remainder of the buffer. | ||
| if (buf != null && !isClosed()) { |
There was a problem hiding this comment.
One alternative way of doing this is to hold a reference to the buffer but not to early increment it's reference count. When we attempt to use the buffer the next time we can try and increment it's refcount and if we can't its been recycled already and we will need to get a new buffer. This should be thread safe and it doesn't introduce a memory leak risk of any significance
There was a problem hiding this comment.
Don't think that works. Note that not only the backing memory is pooled / reused, but also the ByteBuf object itself. How do you distinquish if a ByteBuf object with refCnt() > 0 is still that same object with the same backing memory or if it has meanwhile been claimed by some other call side? Even if additionally to the ref count we compared ByteBuf.memoryAddress, it wouldn't be safe as 1) we can't compare memoryAddress + refCnt and increment refCnt atomically or 2) by chance another call side might have gotten the same ByteBuf object with the same backing memory and we can't distinquish that case or 3) there is some time between getting the buffer from the recycler and assigning it backing memory and so you could end up with a ByteBuf that is not backed by any (valid) memory.
|
Just so I don't confuse things. Here is my understanding. Right now (current master), we don't piggy-back writes for server streaming. We only piggy-back writes for client streaming, as we don't flush until 4K have been written or half close (whatever comes first). So in a sense that 4K minimum size right now is mostly useless, in terms of doing fewer allocations and wastes lots of space for small writes. Correct? Now in this PR you suggest to slice the buffer after flush and re-use. Which sounds like a good idea, but with the downside that we might leak buffers if halfClose / close never gets called. Did you measure the allocation latency? It might be so fast that slicing vs. allocating doesn't really matter. Note, that small allocations are served from thread caches without synchronization. So doing several small allocations vs. one 4K allocation might be faster. I am not exactly sure how Netty's allocator thead caches work. Real jemalloc does cache allocations up to 32K by default, but with fewer slots for larger buffers. So it might cache e.g. up to 1000 64 byte buffers, but only 10 4K buffers or so. Allocation / Deallocation in a thread cache in jemalloc is just pointer pumping. Your other suggestion is to do one allocation per write, with that allocation having a minimum size of 32 bytes. Now my question, why have the minimum size if we effectively never piggy-back? On the fast-path (known payload length), we know how much space we need anyway. |
Current master does no piggy-backing to my knowledge. I also think any solution we would apply to both client streaming and server streaming.
I think the minimum was originally put in to avoid fragmentation in the allocator, although it was also written with flush delaying in mind (although we don't do that today). Yeah, we many not really need the minimum size. We just tell Netty what we need and let Netty's allocator do what it thinks is best. The only advantage we have vs the allocator is that the allocator API has stricter semantics than we need. When we allocate a size, the size is only a hint. We can handle smaller sizes and sometimes make use of larger. Netty's allocator may allocate something larger, but I haven't seen an API to know how much extra space there is past the capacity we requested. Right now the min and max in NettyWritableBufferAllocator is semi-arbitrary. I don't believe we've done any real investigation on whether the numbers are appropriate. It was introduced in f7010f2 via #313 and hasn't changed since. |
You are right. So if we never piggy-back, then there is no point in having a minimum size really?
I think we should have confidence in the allocator and let the allocator deal with fragmentation.
It will fit an allocation to a specific size class. Those size classes were chosen to keep overhead to a minimum, so I think that's fine. I would argue that the allocator would certainly do a better job with keeping that extra capacity to a minimum, than us choosing some fixed size class. |
|
So we get rid of the minimum size in |
|
While working on #2139, I noticed that a large minimum size does make sense for another reason, that's not immediately obvious. In the That's more a problem of the current code structure though and could be fixed, as we call |
|
But just before writeRaw we will generally allocate an appropriately-sized buffer. (Maybe that allocation in writeRaw should go away? It is probably left-over from an earlier version of the code) |
yep, but the buffer allocator is allowed to return a buffer less than |
But if the message is 1 GB, we don't want a contiguous buffer. 1024 1 MB buffers would be fine. We could make writeRaw a bit smarter and inform it how much we expect is remaining? So then if we get a smaller buffer initially we will continue asking for large buffers. |
In case you are thinking about peak memory usage - we wouldn't be flushing until that 1GB has been framed. So for peak memory usage it should make no difference. Also I am not saying that a contiguous 1GB buffer should be returned. In fact, the allocator might not be able to do that. It's specific to the transport. However, it could always return a |
|
@buchgr Having to allocate any memory seems like a problem in a near future. I expect that proto will be able to self describe as ByteBuffers (or equiv) and thus not need to be copied by message framer. In that world, does it make sense to move serialization into the transport? |
I was not thinking of peak memory usage. I was thinking more about fragmentation and the effects of large contiguous allocations. For example, it would be virtually impossible to reliably allocate a 1 GB buffer on a 32 bit VM, assuming you have memory available. The flushing could be changed fairly trivially, but I don't expect much benefit from going down that road. Pretty soon we will want protobuf to stop implementing KnownLength, because that will allow writing backward with only a single pass over the protobuff, instead of two right now: one to get lengths, one to write. Right now protobuf itself requires two passes, so it is "free" for us to get the length. But Nathan's work to write backward will be available eventually. Also you can't release any of the 1GB buffer until the entire thing is written. Splitting the buffer into smaller pieces is a feature, not a downfall. At some point combining buffers stops being helpful/performance gain. And that makes sense, because otherwise writev/readv wouldn't exist. Only small buffers cause issue.
I don't see what advantage that provides. Whether it is 1
What self describing feature are you talking about? I've not heard anything about that. (Or rather, I've not heard of anything along those lines that isn't a completely different wire format.) Even if we had that though, it seems like it wouldn't matter where we serialize since serializing doesn't actually need to serialize anything, so it is close to free. I do know there is work to serialize directly to |
My thinking was that the code in |
Oh, okay. That's fair. That may be the case. Although I think My instinct tells me that we don't save much though. In that, by trying to simplify we make things more complicated, more opaque, and harder to optimize. Especially if we ever stop flushing after every message. |
|
The following code of allocating 1024 blocks of 1MB each would take about 15 seconds - which could be a bit long - taking into account an insert for a import java.util.LinkedList;
public class GBConsecutive
{
public static void main(String[] args)
{
long tick = System.currentTimeMillis();
LinkedList<byte[]> ll = new LinkedList<byte[]>();
int ONE_MB = 1048576;
for ( int i = 0 ; i < 1024 ; i++ ) {
byte [] ONE_MB_BYTE = new byte[ONE_MB];
ll.add(ONE_MB_BYTE);
}
long elapsed = System.currentTimeMillis() - tick;
System.out.println("Elapsed milliseconds: " + elapsed );
}
} |
|
Now the above code runs in 46 seconds, and when I perform it all at once like this, it runs in 28 seconds: import java.util.LinkedList;
public class GBConsecutiveAll
{
public static void main(String[] args)
{
long tick = System.currentTimeMillis();
LinkedList<byte[]> ll = new LinkedList<byte[]>();
int ONE_GB = 1073741824;
for ( int i = 0 ; i < 1 ; i++ ) {
byte [] ONE_GB_BYTE = new byte[ONE_GB];
ll.add(ONE_GB_BYTE);
}
long elapsed = System.currentTimeMillis() - tick;
System.out.println("Elapsed milliseconds:" + elapsed );
}
} |
When a buffer is allocated in the MessageFramer, the capacity is at least 4K
in size, which leaves a lot of space wasted on small messages. This change
alters the WritableBuffers to attempt to split themselves into a "remaining"
buffer that reuses space.
This should only have a serious impact on streaming, and only when messages
are small.
Benchmarks for FlowControlled messages per second are promising showing at
least 10%
Results:
cc: @louiscryan