Skip to content

BulkIndexer: Excessive memory use #557

@christos68k

Description

@christos68k

We (the profiling team) noticed that we could OOM crash apm-server, by running a heavy but typical workload. After digging into the crash, we discovered that go-elasticsearch through BulkIndexer was the trigger, since it was using more than 1GB of memory, exceeding the container limit.

I tested the following two suggestions locally, and they reduce memory consumption for my workloads by 40-50%. I feel that more drastic improvements are possible, but they'd require either an API change or an API extension (feel free to ping me for more information).

  1. Here we see that the buffer size is checked against the configured limit (FlushBytes) after writeMeta and writeBody have been called. If these calls result in data written to the buffer that exceeds the configured limit, the underlying array of w.buf will be resized 2x. Since flush only resets the length of the buffer, underlying storage is retained, which means that there is a constant memory hit that will not go away for the life of the process.

Example: For profiling, we configure FlushBytes to be 16M. Our workload will always trigger more data written to the buffer than its configured size, and therefore each per-worker buffer will be resized 2x. This means that we pay a memory cost of numWorkers*16M which will be constant for the life of the process. Note that this extra memory will mostly be unused.

Proposed fix: Move the buffer size check that triggers flushing before writeMeta / writeBody. Example:

if w.buf.Len() + item.BodyLength + item.MetaLength >= w.bi.config.FlushBytes {
	if err := w.flush(ctx); err != nil {..}
}

if err := w.writeMeta(item); err != nil {..}
if err := w.writeBody(&item); err != nil {..}
w.items = append(w.items, item)

where item.BodyLength and item.MetaLength are the number of bytes written to the buffer by writeBody and writeMeta respectively (multiple ways to implement this, for my quick tests I just added a Length field to BulkIndexerItem and worst-cased MetaLength to a constant 1024 bytes).

This proposed fix assumes that there will not be a single body payload+meta greater than the configured limit. Alternatively, the buffer can be allowed to grow beyond FlushBytes just for this case (the code in the previous proposed fix still applies, but there is an extra check):

if payloadLen >= FlushBytes { flushLater := true }
if !flushLater && w.buf.Len() + payloadLen >= w.bi.config.FlushBytes {
        // per-item payloadLen includes the meta overhead
        if err := w.flush(ctx); err != nil {..}
}

// SNIP: writeMeta / writeBody
// After writeBody:
if flushLater { // flush }

In flush instead of w.buf.Reset, one would have:

if w.buf.Cap() > w.bi.config.FlushBytes {
        w.buf = bytes.NewBuffer(make([]byte, 0, w.bi.config.FlushBytes))
} else {
        w.buf.Reset()
}
  1. Here w.items is re-sliced. This retains underlying storage, which in this case is an array that contains BulkIndexerItems that keep references to allocated byte buffers (which can not be GCed). Unless elements of this array are overwritten by future access patterns, there will be a memory hit.

Proposed fix: The slice can be set to nil.

w.items = nil

EDIT: I did not open a PR with the suggestions described in this issue as there are multiple ways to implement equivalent functionality and I lack a lot of context regarding usage scenarios for BulkIndexer. I'm happy to start the discussion and help with crafting a solution that satisfies everyone.

CC: @Anaethelion

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions