Skip to content

Commit f748c5c

Browse files
authored
feat: Defer cleanup for log/index compactions, add debug log (#26511) (#26871)
1 parent ba2c2b6 commit f748c5c

File tree

1 file changed

+43
-22
lines changed

1 file changed

+43
-22
lines changed

tsdb/index/tsi1/partition.go

Lines changed: 43 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"strconv"
1111
"strings"
1212
"sync"
13+
"sync/atomic"
1314
"time"
1415
"unsafe"
1516

@@ -62,7 +63,7 @@ type Partition struct {
6263
// Fieldset shared with engine.
6364
fieldset *tsdb.MeasurementFieldSet
6465

65-
currentCompactionN int // counter of in-progress compactions
66+
currentCompactionN atomic.Int32 // counter of in-progress compactions
6667

6768
// Directory of the Partition's index files.
6869
path string
@@ -348,22 +349,38 @@ func (p *Partition) buildSeriesSet() error {
348349
}
349350

350351
// CurrentCompactionN returns the number of compactions currently running.
351-
func (p *Partition) CurrentCompactionN() int {
352-
p.mu.RLock()
353-
defer p.mu.RUnlock()
354-
return p.currentCompactionN
352+
func (p *Partition) CurrentCompactionN() int32 {
353+
return p.currentCompactionN.Load()
355354
}
356355

357356
// Wait will block until all compactions are finished.
358357
// Must only be called while they are disabled.
359358
func (p *Partition) Wait() {
360359
ticker := time.NewTicker(10 * time.Millisecond)
361360
defer ticker.Stop()
361+
362+
// Debug level timeout
363+
timeoutDuration := 24 * time.Hour
364+
startTime := time.Now()
365+
362366
for {
363367
if p.CurrentCompactionN() == 0 {
364368
return
365369
}
366-
<-ticker.C
370+
select {
371+
case <-ticker.C:
372+
elapsed := time.Since(startTime)
373+
if elapsed >= timeoutDuration {
374+
files := make([]string, 0)
375+
for _, v := range p.fileSet.Files() {
376+
files = append(files, v.Path())
377+
}
378+
p.logger.Warn("Partition.Wait() timed out waiting for compactions to complete",
379+
zap.Int32("stuck_compactions", p.CurrentCompactionN()), zap.Duration("timeout", timeoutDuration),
380+
zap.Strings("files", files))
381+
startTime = time.Now()
382+
}
383+
}
367384
}
368385
}
369386

@@ -1040,14 +1057,17 @@ func (p *Partition) compact() {
10401057
}
10411058
// Mark the level as compacting.
10421059
p.levelCompacting[0] = true
1043-
p.currentCompactionN++
1060+
p.currentCompactionN.Add(1)
10441061
go func() {
1062+
defer func() {
1063+
p.mu.Lock()
1064+
p.currentCompactionN.Add(-1)
1065+
p.levelCompacting[0] = false
1066+
p.mu.Unlock()
1067+
p.Compact()
1068+
}()
1069+
10451070
p.compactLogFile(logFile)
1046-
p.mu.Lock()
1047-
p.currentCompactionN--
1048-
p.levelCompacting[0] = false
1049-
p.mu.Unlock()
1050-
p.Compact()
10511071
}()
10521072
}
10531073
}
@@ -1079,20 +1099,21 @@ func (p *Partition) compact() {
10791099
// Execute in closure to save reference to the group within the loop.
10801100
func(files []*IndexFile, level int) {
10811101
// Start compacting in a separate goroutine.
1082-
p.currentCompactionN++
1102+
p.currentCompactionN.Add(1)
10831103
go func() {
1104+
defer func() {
1105+
// Ensure compaction lock for the level is released.
1106+
p.mu.Lock()
1107+
p.levelCompacting[level] = false
1108+
p.currentCompactionN.Add(-1)
1109+
p.mu.Unlock()
1110+
1111+
// Check for new compactions
1112+
p.Compact()
1113+
}()
10841114

10851115
// Compact to a new level.
10861116
p.compactToLevel(files, level+1, interrupt)
1087-
1088-
// Ensure compaction lock for the level is released.
1089-
p.mu.Lock()
1090-
p.levelCompacting[level] = false
1091-
p.currentCompactionN--
1092-
p.mu.Unlock()
1093-
1094-
// Check for new compactions
1095-
p.Compact()
10961117
}()
10971118
}(files, level)
10981119
}

0 commit comments

Comments
 (0)