Skip to content

Commit 451e7f2

Browse files
committed
modelindexer: Run active in dedicated goroutine
Remove the `i.activeMu` from the modelindexer and shared sync primitives from the indexer structure, instead, run the active indexer in its own goroutine, minimizing the amount of shared locking that needs to happen when `ProcessBatch` is called. To allow (potentially) multiple active indexers from running in parallel, a new internal `bulkItems` channel is introduced, with a variable buffer depending on how powerful the machine where APM Server is run. The bigger the machine, the bigger the channel buffer, and vice versa. Additionally, the default `FlushBytes` reduced from `5MB` to `2MB`, and to keep the same total "bufferable" size, the default `MaxRequests` is increased to 25 (25 * 2mb = 50mb). The motivation behind this change is is twofold; Allowing the modelindexer to have more background flushes if enough events are being processed by the APM Server, and, creating slightly smaller bulk requests to Elasticsearch, since, with the default settings (5MB FlushBytes), the number of bulk documents is ~`18-24K`. Initial micro-benchmarks indicate that the performance improvement is ~42% with default settings, and macro-benchmarks ~27%: ``` benchstat old.txt single-lockless.txt name old time/op new time/op delta NoCompression-8 1.11µs ± 1% 1.15µs ± 4% +4.41% (p=0.008 n=5+5) BestSpeed-8 2.03µs ± 3% 1.88µs ± 1% -7.14% (p=0.008 n=5+5) DefaultCompression-8 4.10µs ± 2% 2.36µs ± 2% -42.32% (p=0.008 n=5+5) BestCompression-8 6.95µs ± 1% 4.21µs ± 1% -39.48% (p=0.008 n=5+5) name old alloc/op new alloc/op delta NoCompression-8 2.07kB ± 0% 2.14kB ± 0% +3.44% (p=0.016 n=4+5) BestSpeed-8 2.57kB ± 0% 2.56kB ± 0% -0.31% (p=0.008 n=5+5) DefaultCompression-8 2.55kB ± 0% 2.52kB ± 0% -0.87% (p=0.008 n=5+5) BestCompression-8 2.58kB ± 0% 2.57kB ± 0% -0.52% (p=0.008 n=5+5) ``` ``` name old events/sec new events/sec delta AgentAll-512 26.8k ± 0% 34.1k ±11% +27.11% (p=0.100 n=3+3) ``` Signed-off-by: Marc Lopez Rubio <marc5.12@outlook.com>
1 parent 3384f58 commit 451e7f2

4 files changed

Lines changed: 135 additions & 93 deletions

File tree

internal/beater/beater.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -886,6 +886,8 @@ func (s *serverRunner) newFinalBatchProcessor(
886886
stats := indexer.Stats()
887887
v.OnKey("available")
888888
v.OnInt(stats.AvailableBulkRequests)
889+
v.OnKey("active")
890+
v.OnInt(stats.ActiveBulkRequests)
889891
v.OnKey("completed")
890892
v.OnInt(stats.BulkRequests)
891893
})

internal/beater/server_test.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -975,8 +975,11 @@ func TestServerElasticsearchOutput(t *testing.T) {
975975
assert.Equal(t, map[string]interface{}{
976976
"elasticsearch": map[string]interface{}{
977977
"bulk_requests": map[string]interface{}{
978-
"available": int64(9),
979-
"completed": int64(0),
978+
"active": int64(1),
979+
"available": int64(9),
980+
"completed": int64(0),
981+
"upscales": int64(0),
982+
"downscales": int64(0),
980983
},
981984
},
982985
}, snapshot)

internal/model/modelindexer/indexer.go

Lines changed: 102 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ import (
2424
"fmt"
2525
"io"
2626
"net/http"
27+
"runtime"
2728
"strings"
2829
"sync"
2930
"sync/atomic"
@@ -76,20 +77,17 @@ type Indexer struct {
7677
tooManyRequests int64
7778
bytesTotal int64
7879
availableBulkRequests int64
80+
activeBulkRequests int64
7981

8082
config Config
8183
logger *logp.Logger
8284
available chan *bulkIndexer
85+
bulkItems chan elasticsearch.BulkIndexerItem
8386
g errgroup.Group
8487

85-
mu sync.RWMutex
86-
closing bool
87-
closed chan struct{}
88-
activeMu sync.Mutex
89-
activeCond *sync.Cond
90-
active *bulkIndexer
91-
timer *time.Timer
92-
timerStopped chan struct{}
88+
mu sync.RWMutex
89+
closing bool
90+
closed chan struct{}
9391
}
9492

9593
// Config holds configuration for Indexer.
@@ -106,7 +104,8 @@ type Config struct {
106104
// If MaxRequests is less than or equal to zero, the default of 10 will be used.
107105
MaxRequests int
108106

109-
// FlushBytes holds the flush threshold in bytes.
107+
// FlushBytes holds the flush threshold in bytes. If Compression is enabled,
108+
// The number of events that can be buffered will be greater.
110109
//
111110
// If FlushBytes is zero, the default of 5MB will be used.
112111
FlushBytes int
@@ -133,10 +132,10 @@ func New(client elasticsearch.Client, cfg Config) (*Indexer, error) {
133132
)
134133
}
135134
if cfg.MaxRequests <= 0 {
136-
cfg.MaxRequests = 10
135+
cfg.MaxRequests = 25
137136
}
138137
if cfg.FlushBytes <= 0 {
139-
cfg.FlushBytes = 5 * 1024 * 1024
138+
cfg.FlushBytes = 2 * 1024 * 1024
140139
}
141140
if cfg.FlushInterval <= 0 {
142141
cfg.FlushInterval = 30 * time.Second
@@ -151,9 +150,16 @@ func New(client elasticsearch.Client, cfg Config) (*Indexer, error) {
151150
logger: logger,
152151
available: available,
153152
closed: make(chan struct{}),
154-
timerStopped: make(chan struct{}),
153+
// NOTE(marclop) This channel size is arbitrary. It makes sense to set
154+
// the buffer channel to a bigger size when more CPUs are available, to
155+
// allow slightly more buffering to happen. It will speed up the blocking
156+
// ProcessBatch calls, when the input request rate > compression rate.
157+
bulkItems: make(
158+
chan elasticsearch.BulkIndexerItem,
159+
cfg.MaxRequests*runtime.GOMAXPROCS(0),
160+
),
155161
}
156-
indexer.activeCond = sync.NewCond(&indexer.activeMu)
162+
indexer.runActiveIndexer()
157163
return indexer, nil
158164
}
159165

@@ -167,7 +173,7 @@ func (i *Indexer) Close(ctx context.Context) error {
167173
defer i.mu.Unlock()
168174
if !i.closing {
169175
i.closing = true
170-
176+
close(i.bulkItems)
171177
// Close i.closed when ctx is cancelled,
172178
// unblocking any ongoing flush attempts.
173179
done := make(chan struct{})
@@ -179,12 +185,6 @@ func (i *Indexer) Close(ctx context.Context) error {
179185
case <-ctx.Done():
180186
}
181187
}()
182-
183-
i.activeMu.Lock()
184-
if i.active != nil && i.timer.Stop() {
185-
i.timerStopped <- struct{}{}
186-
}
187-
i.activeMu.Unlock()
188188
}
189189
return i.g.Wait()
190190
}
@@ -200,6 +200,7 @@ func (i *Indexer) Stats() Stats {
200200
TooManyRequests: atomic.LoadInt64(&i.tooManyRequests),
201201
BytesTotal: atomic.LoadInt64(&i.bytesTotal),
202202
AvailableBulkRequests: atomic.LoadInt64(&i.availableBulkRequests),
203+
ActiveBulkRequests: atomic.LoadInt64(&i.activeBulkRequests),
203204
}
204205
}
205206

@@ -234,54 +235,21 @@ func (i *Indexer) processEvent(ctx context.Context, event *model.APMEvent) error
234235
r.indexBuilder.WriteString(event.DataStream.Dataset)
235236
r.indexBuilder.WriteByte('-')
236237
r.indexBuilder.WriteString(event.DataStream.Namespace)
237-
index := r.indexBuilder.String()
238-
239-
i.activeMu.Lock()
240-
defer i.activeMu.Unlock()
241-
for i.active != nil && i.active.Len() >= i.config.FlushBytes {
242-
// The active bulk indexer is full: wait for it to be
243-
// switched out by the background flushActive goroutine.
244-
i.activeCond.Wait()
245-
}
246-
if i.active == nil {
247-
select {
248-
case <-ctx.Done():
249-
return ctx.Err()
250-
case i.active = <-i.available:
251-
atomic.AddInt64(&i.availableBulkRequests, -1)
252-
}
253-
if i.timer == nil {
254-
i.timer = time.NewTimer(i.config.FlushInterval)
255-
} else {
256-
i.timer.Reset(i.config.FlushInterval)
257-
}
258-
i.g.Go(func() error {
259-
// The timer may be stopped by i.Close or when
260-
// i.config.FlushBytes is exceeded, in which case
261-
// i.timerStopped will be signalled.
262-
select {
263-
case <-i.timerStopped:
264-
case <-i.timer.C:
265-
}
266-
return i.flushActive(context.Background())
267-
})
268-
}
269238

270-
if err := i.active.Add(elasticsearch.BulkIndexerItem{
271-
Index: index,
239+
select {
240+
// Send the BulkIndexerItem to the internal channel, allowing individual
241+
// events to be processed by an active bulk indexer in a dedicate goroutine,
242+
// which in turn speeds up event processing.
243+
case i.bulkItems <- elasticsearch.BulkIndexerItem{
244+
Index: r.indexBuilder.String(),
272245
Action: "create",
273246
Body: r,
274-
}); err != nil {
275-
return err
247+
}:
248+
case <-ctx.Done():
249+
return ctx.Err()
276250
}
277251
atomic.AddInt64(&i.eventsAdded, 1)
278252
atomic.AddInt64(&i.eventsActive, 1)
279-
280-
if i.active.Len() >= i.config.FlushBytes {
281-
if i.timer.Stop() {
282-
i.timerStopped <- struct{}{}
283-
}
284-
}
285253
return nil
286254
}
287255

@@ -332,7 +300,7 @@ func encodeMap(v map[string]interface{}, out *fastjson.Writer) error {
332300
return nil
333301
}
334302

335-
func (i *Indexer) flushActive(ctx context.Context) error {
303+
func (i *Indexer) flushIndexer(ctx context.Context, bulkIndexer *bulkIndexer) error {
336304
// Create a child context which is cancelled when the context passed to i.Close is cancelled.
337305
flushed := make(chan struct{})
338306
defer close(flushed)
@@ -345,11 +313,6 @@ func (i *Indexer) flushActive(ctx context.Context) error {
345313
}
346314
}()
347315

348-
i.activeMu.Lock()
349-
bulkIndexer := i.active
350-
i.active = nil
351-
i.activeCond.Broadcast()
352-
i.activeMu.Unlock()
353316
err := i.flush(ctx, bulkIndexer)
354317
bulkIndexer.Reset()
355318
i.available <- bulkIndexer
@@ -434,6 +397,72 @@ func (i *Indexer) flush(ctx context.Context, bulkIndexer *bulkIndexer) error {
434397
return nil
435398
}
436399

400+
// runActiveIndexer starts a new active indexer which pulls items from the
401+
// bulkItems channel. The more active indexers there are, the faster events
402+
// will be pulled out of the queue, but also the more likely it is that the
403+
// outgoing Elasticsearch bulk requests are flushed due to the idle timer,
404+
// rather than due to being full.
405+
func (i *Indexer) runActiveIndexer() {
406+
i.g.Go(func() error {
407+
atomic.AddInt64(&i.activeBulkRequests, 1)
408+
var mu sync.Mutex
409+
// `active` and `timer` must be accessed with `mu` locked.
410+
var active *bulkIndexer
411+
var timer *time.Timer
412+
flush := make(chan *bulkIndexer)
413+
defer func() {
414+
atomic.AddInt64(&i.activeBulkRequests, -1)
415+
close(flush)
416+
}()
417+
for item := range i.bulkItems {
418+
mu.Lock()
419+
if active == nil {
420+
active = <-i.available
421+
atomic.AddInt64(&i.availableBulkRequests, -1)
422+
if timer == nil {
423+
timer = time.AfterFunc(i.config.FlushInterval, func() {
424+
mu.Lock()
425+
defer mu.Unlock()
426+
flush <- active
427+
active = nil
428+
})
429+
} else {
430+
timer.Reset(i.config.FlushInterval)
431+
}
432+
i.g.Go(func() error {
433+
if indexer := <-flush; indexer != nil {
434+
return i.flushIndexer(context.Background(), indexer)
435+
}
436+
return nil
437+
})
438+
}
439+
if err := active.Add(item); err != nil {
440+
i.logger.Errorf("failed adding event to bulk indexer: %v", err)
441+
}
442+
// Flush the active bulk indexer when it's at or exceeds the
443+
// configured FlushBytes threshold.
444+
if active.Len() >= i.config.FlushBytes {
445+
// Stop the timer and only request a flush if the stop
446+
// operation succeeded, otherwise the active indexer has
447+
// already been flushed by the idle timer.
448+
if timer.Stop() {
449+
flush <- active
450+
active = nil
451+
}
452+
}
453+
mu.Unlock()
454+
}
455+
// Flush the active indexer when it hasn't yet been flushed.
456+
mu.Lock()
457+
if active != nil && timer.Stop() {
458+
flush <- active
459+
active = nil
460+
}
461+
mu.Unlock()
462+
return nil
463+
})
464+
}
465+
437466
var pool sync.Pool
438467

439468
type pooledReader struct {
@@ -495,7 +524,11 @@ type Stats struct {
495524
// which counts bytes at the transport level.
496525
BytesTotal int64
497526

498-
// AvailableBulkIndexers represents the number of bulk indexers
527+
// AvailableBulkRequests represents the number of bulk indexers
499528
// available for making bulk index requests.
500529
AvailableBulkRequests int64
530+
531+
// ActiveBulkRequests represents the number of active bulk indexers that are
532+
// concurrently processing batches.
533+
ActiveBulkRequests int64
501534
}

0 commit comments

Comments
 (0)