@@ -405,60 +405,59 @@ func (i *Indexer) flush(ctx context.Context, bulkIndexer *bulkIndexer) error {
405405func (i * Indexer ) runActiveIndexer () {
406406 i .g .Go (func () error {
407407 atomic .AddInt64 (& i .activeBulkRequests , 1 )
408- var mu sync.Mutex
409- // `active` and `timer` must be accessed with `mu` locked.
410- var active * bulkIndexer
411408 var timer * time.Timer
412409 flush := make (chan * bulkIndexer )
410+ active := make (chan * bulkIndexer , 1 )
413411 defer func () {
414412 atomic .AddInt64 (& i .activeBulkRequests , - 1 )
415413 close (flush )
414+ close (active )
416415 }()
417416 for item := range i .bulkItems {
418- mu .Lock ()
419- if active == nil {
420- active = <- i .available
417+ var indexer * bulkIndexer
418+ select {
419+ case indexer = <- active :
420+ default :
421+ indexer = <- i .available
421422 atomic .AddInt64 (& i .availableBulkRequests , - 1 )
422423 if timer == nil {
423424 timer = time .AfterFunc (i .config .FlushInterval , func () {
424- mu .Lock ()
425- defer mu .Unlock ()
426- flush <- active
427- active = nil
425+ flush <- <- active
428426 })
429427 } else {
430428 timer .Reset (i .config .FlushInterval )
431429 }
432430 i .g .Go (func () error {
433- if indexer := <- flush ; indexer != nil {
434- return i .flushIndexer (context .Background (), indexer )
431+ if idx := <- flush ; idx != nil {
432+ return i .flushIndexer (context .Background (), idx )
435433 }
436434 return nil
437435 })
438436 }
439- if err := active .Add (item ); err != nil {
437+ if err := indexer .Add (item ); err != nil {
440438 i .logger .Errorf ("failed adding event to bulk indexer: %v" , err )
441439 }
442440 // Flush the active bulk indexer when it's at or exceeds the
443441 // configured FlushBytes threshold.
444- if active .Len () >= i .config .FlushBytes {
442+ if indexer .Len () >= i .config .FlushBytes {
445443 // Stop the timer and only request a flush if the stop
446444 // operation succeeded, otherwise the active indexer has
447445 // already been flushed by the idle timer.
448446 if timer .Stop () {
449- flush <- active
450- active = nil
447+ flush <- indexer
451448 }
449+ } else {
450+ active <- indexer
452451 }
453- mu .Unlock ()
454452 }
455453 // Flush the active indexer when it hasn't yet been flushed.
456- mu .Lock ()
457- if active != nil && timer .Stop () {
458- flush <- active
459- active = nil
454+ select {
455+ case indexer := <- active :
456+ if timer .Stop () {
457+ flush <- indexer
458+ }
459+ default :
460460 }
461- mu .Unlock ()
462461 return nil
463462 })
464463}
0 commit comments