@@ -10,6 +10,7 @@ import (
1010 "strconv"
1111 "strings"
1212 "sync"
13+ "sync/atomic"
1314 "time"
1415 "unsafe"
1516
@@ -62,7 +63,7 @@ type Partition struct {
6263 // Fieldset shared with engine.
6364 fieldset * tsdb.MeasurementFieldSet
6465
65- currentCompactionN int // counter of in-progress compactions
66+ currentCompactionN atomic. Int32 // counter of in-progress compactions
6667
6768 // Directory of the Partition's index files.
6869 path string
@@ -348,22 +349,38 @@ func (p *Partition) buildSeriesSet() error {
348349}
349350
350351// CurrentCompactionN returns the number of compactions currently running.
351- func (p * Partition ) CurrentCompactionN () int {
352- p .mu .RLock ()
353- defer p .mu .RUnlock ()
354- return p .currentCompactionN
352+ func (p * Partition ) CurrentCompactionN () int32 {
353+ return p .currentCompactionN .Load ()
355354}
356355
357356// Wait will block until all compactions are finished.
358357// Must only be called while they are disabled.
359358func (p * Partition ) Wait () {
360359 ticker := time .NewTicker (10 * time .Millisecond )
361360 defer ticker .Stop ()
361+
362+ // Debug level timeout
363+ timeoutDuration := 24 * time .Hour
364+ startTime := time .Now ()
365+
362366 for {
363367 if p .CurrentCompactionN () == 0 {
364368 return
365369 }
366- <- ticker .C
370+ select {
371+ case <- ticker .C :
372+ elapsed := time .Since (startTime )
373+ if elapsed >= timeoutDuration {
374+ files := make ([]string , 0 )
375+ for _ , v := range p .fileSet .Files () {
376+ files = append (files , v .Path ())
377+ }
378+ p .logger .Warn ("Partition.Wait() timed out waiting for compactions to complete" ,
379+ zap .Int32 ("stuck_compactions" , p .CurrentCompactionN ()), zap .Duration ("timeout" , timeoutDuration ),
380+ zap .Strings ("files" , files ))
381+ startTime = time .Now ()
382+ }
383+ }
367384 }
368385}
369386
@@ -1040,14 +1057,17 @@ func (p *Partition) compact() {
10401057 }
10411058 // Mark the level as compacting.
10421059 p .levelCompacting [0 ] = true
1043- p .currentCompactionN ++
1060+ p .currentCompactionN . Add ( 1 )
10441061 go func () {
1062+ defer func () {
1063+ p .mu .Lock ()
1064+ p .currentCompactionN .Add (- 1 )
1065+ p .levelCompacting [0 ] = false
1066+ p .mu .Unlock ()
1067+ p .Compact ()
1068+ }()
1069+
10451070 p .compactLogFile (logFile )
1046- p .mu .Lock ()
1047- p .currentCompactionN --
1048- p .levelCompacting [0 ] = false
1049- p .mu .Unlock ()
1050- p .Compact ()
10511071 }()
10521072 }
10531073 }
@@ -1079,20 +1099,21 @@ func (p *Partition) compact() {
10791099 // Execute in closure to save reference to the group within the loop.
10801100 func (files []* IndexFile , level int ) {
10811101 // Start compacting in a separate goroutine.
1082- p .currentCompactionN ++
1102+ p .currentCompactionN . Add ( 1 )
10831103 go func () {
1104+ defer func () {
1105+ // Ensure compaction lock for the level is released.
1106+ p .mu .Lock ()
1107+ p .levelCompacting [level ] = false
1108+ p .currentCompactionN .Add (- 1 )
1109+ p .mu .Unlock ()
1110+
1111+ // Check for new compactions
1112+ p .Compact ()
1113+ }()
10841114
10851115 // Compact to a new level.
10861116 p .compactToLevel (files , level + 1 , interrupt )
1087-
1088- // Ensure compaction lock for the level is released.
1089- p .mu .Lock ()
1090- p .levelCompacting [level ] = false
1091- p .currentCompactionN --
1092- p .mu .Unlock ()
1093-
1094- // Check for new compactions
1095- p .Compact ()
10961117 }()
10971118 }(files , level )
10981119 }
0 commit comments