We (the profiling team) noticed that we could OOM crash apm-server, by running a heavy but typical workload. After digging into the crash, we discovered that go-elasticsearch through BulkIndexer was the trigger, since it was using more than 1GB of memory, exceeding the container limit.
I tested the following two suggestions locally, and they reduce memory consumption for my workloads by 40-50%. I feel that more drastic improvements are possible, but they'd require either an API change or an API extension (feel free to ping me for more information).
- Here we see that the buffer size is checked against the configured limit (
FlushBytes) after writeMeta and writeBody have been called. If these calls result in data written to the buffer that exceeds the configured limit, the underlying array of w.buf will be resized 2x. Since flush only resets the length of the buffer, underlying storage is retained, which means that there is a constant memory hit that will not go away for the life of the process.
Example: For profiling, we configure FlushBytes to be 16M. Our workload will always trigger more data written to the buffer than its configured size, and therefore each per-worker buffer will be resized 2x. This means that we pay a memory cost of numWorkers*16M which will be constant for the life of the process. Note that this extra memory will mostly be unused.
Proposed fix: Move the buffer size check that triggers flushing before writeMeta / writeBody. Example:
if w.buf.Len() + item.BodyLength + item.MetaLength >= w.bi.config.FlushBytes {
if err := w.flush(ctx); err != nil {..}
}
if err := w.writeMeta(item); err != nil {..}
if err := w.writeBody(&item); err != nil {..}
w.items = append(w.items, item)
where item.BodyLength and item.MetaLength are the number of bytes written to the buffer by writeBody and writeMeta respectively (multiple ways to implement this, for my quick tests I just added a Length field to BulkIndexerItem and worst-cased MetaLength to a constant 1024 bytes).
This proposed fix assumes that there will not be a single body payload+meta greater than the configured limit. Alternatively, the buffer can be allowed to grow beyond FlushBytes just for this case (the code in the previous proposed fix still applies, but there is an extra check):
if payloadLen >= FlushBytes { flushLater := true }
if !flushLater && w.buf.Len() + payloadLen >= w.bi.config.FlushBytes {
// per-item payloadLen includes the meta overhead
if err := w.flush(ctx); err != nil {..}
}
// SNIP: writeMeta / writeBody
// After writeBody:
if flushLater { // flush }
In flush instead of w.buf.Reset, one would have:
if w.buf.Cap() > w.bi.config.FlushBytes {
w.buf = bytes.NewBuffer(make([]byte, 0, w.bi.config.FlushBytes))
} else {
w.buf.Reset()
}
- Here
w.items is re-sliced. This retains underlying storage, which in this case is an array that contains BulkIndexerItems that keep references to allocated byte buffers (which can not be GCed). Unless elements of this array are overwritten by future access patterns, there will be a memory hit.
Proposed fix: The slice can be set to nil.
EDIT: I did not open a PR with the suggestions described in this issue as there are multiple ways to implement equivalent functionality and I lack a lot of context regarding usage scenarios for BulkIndexer. I'm happy to start the discussion and help with crafting a solution that satisfies everyone.
CC: @Anaethelion
We (the profiling team) noticed that we could OOM crash apm-server, by running a heavy but typical workload. After digging into the crash, we discovered that go-elasticsearch through
BulkIndexerwas the trigger, since it was using more than 1GB of memory, exceeding the container limit.I tested the following two suggestions locally, and they reduce memory consumption for my workloads by 40-50%. I feel that more drastic improvements are possible, but they'd require either an API change or an API extension (feel free to ping me for more information).
FlushBytes) afterwriteMetaandwriteBodyhave been called. If these calls result in data written to the buffer that exceeds the configured limit, the underlying array ofw.bufwill be resized 2x. Sinceflushonly resets the length of the buffer, underlying storage is retained, which means that there is a constant memory hit that will not go away for the life of the process.Example: For profiling, we configure
FlushBytesto be 16M. Our workload will always trigger more data written to the buffer than its configured size, and therefore each per-worker buffer will be resized 2x. This means that we pay a memory cost ofnumWorkers*16Mwhich will be constant for the life of the process. Note that this extra memory will mostly be unused.Proposed fix: Move the buffer size check that triggers flushing before
writeMeta/writeBody. Example:where
item.BodyLengthanditem.MetaLengthare the number of bytes written to the buffer bywriteBodyandwriteMetarespectively (multiple ways to implement this, for my quick tests I just added aLengthfield toBulkIndexerItemand worst-casedMetaLengthto a constant 1024 bytes).This proposed fix assumes that there will not be a single body payload+meta greater than the configured limit. Alternatively, the buffer can be allowed to grow beyond
FlushBytesjust for this case (the code in the previous proposed fix still applies, but there is an extra check):In flush instead of
w.buf.Reset, one would have:w.itemsis re-sliced. This retains underlying storage, which in this case is an array that containsBulkIndexerItemsthat keep references to allocated byte buffers (which can not be GCed). Unless elements of this array are overwritten by future access patterns, there will be a memory hit.Proposed fix: The slice can be set to
nil.EDIT: I did not open a PR with the suggestions described in this issue as there are multiple ways to implement equivalent functionality and I lack a lot of context regarding usage scenarios for
BulkIndexer. I'm happy to start the discussion and help with crafting a solution that satisfies everyone.CC: @Anaethelion