Skip to content

Commit ef72d77

Browse files
committed
Remove mutex from activeIndexer
Signed-off-by: Marc Lopez Rubio <marc5.12@outlook.com>
1 parent 451e7f2 commit ef72d77

1 file changed

Lines changed: 21 additions & 22 deletions

File tree

internal/model/modelindexer/indexer.go

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -405,60 +405,59 @@ func (i *Indexer) flush(ctx context.Context, bulkIndexer *bulkIndexer) error {
405405
func (i *Indexer) runActiveIndexer() {
406406
i.g.Go(func() error {
407407
atomic.AddInt64(&i.activeBulkRequests, 1)
408-
var mu sync.Mutex
409-
// `active` and `timer` must be accessed with `mu` locked.
410-
var active *bulkIndexer
411408
var timer *time.Timer
412409
flush := make(chan *bulkIndexer)
410+
active := make(chan *bulkIndexer, 1)
413411
defer func() {
414412
atomic.AddInt64(&i.activeBulkRequests, -1)
415413
close(flush)
414+
close(active)
416415
}()
417416
for item := range i.bulkItems {
418-
mu.Lock()
419-
if active == nil {
420-
active = <-i.available
417+
var indexer *bulkIndexer
418+
select {
419+
case indexer = <-active:
420+
default:
421+
indexer = <-i.available
421422
atomic.AddInt64(&i.availableBulkRequests, -1)
422423
if timer == nil {
423424
timer = time.AfterFunc(i.config.FlushInterval, func() {
424-
mu.Lock()
425-
defer mu.Unlock()
426-
flush <- active
427-
active = nil
425+
flush <- <-active
428426
})
429427
} else {
430428
timer.Reset(i.config.FlushInterval)
431429
}
432430
i.g.Go(func() error {
433-
if indexer := <-flush; indexer != nil {
434-
return i.flushIndexer(context.Background(), indexer)
431+
if idx := <-flush; idx != nil {
432+
return i.flushIndexer(context.Background(), idx)
435433
}
436434
return nil
437435
})
438436
}
439-
if err := active.Add(item); err != nil {
437+
if err := indexer.Add(item); err != nil {
440438
i.logger.Errorf("failed adding event to bulk indexer: %v", err)
441439
}
442440
// Flush the active bulk indexer when it's at or exceeds the
443441
// configured FlushBytes threshold.
444-
if active.Len() >= i.config.FlushBytes {
442+
if indexer.Len() >= i.config.FlushBytes {
445443
// Stop the timer and only request a flush if the stop
446444
// operation succeeded, otherwise the active indexer has
447445
// already been flushed by the idle timer.
448446
if timer.Stop() {
449-
flush <- active
450-
active = nil
447+
flush <- indexer
451448
}
449+
} else {
450+
active <- indexer
452451
}
453-
mu.Unlock()
454452
}
455453
// Flush the active indexer when it hasn't yet been flushed.
456-
mu.Lock()
457-
if active != nil && timer.Stop() {
458-
flush <- active
459-
active = nil
454+
select {
455+
case indexer := <-active:
456+
if timer.Stop() {
457+
flush <- indexer
458+
}
459+
default:
460460
}
461-
mu.Unlock()
462461
return nil
463462
})
464463
}

0 commit comments

Comments
 (0)