Skip to content

Commit d5a48e0

Browse files
fix: reduce lock contention and races in purger (#27146) (#27177)
Use a sync.Map and minimize mutex sections to avoid blocking in calls to the purger Fixes #26110 (cherry picked from commit 304e1ae) Fixes #27175
1 parent 1420aaf commit d5a48e0

File tree

2 files changed

+436
-34
lines changed

2 files changed

+436
-34
lines changed

tsdb/engine/tsm1/file_store.go

Lines changed: 54 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020

2121
"github.com/influxdata/influxdb/logger"
2222
"github.com/influxdata/influxdb/models"
23+
"github.com/influxdata/influxdb/pkg/data/gensyncmap"
2324
"github.com/influxdata/influxdb/pkg/file"
2425
"github.com/influxdata/influxdb/pkg/limiter"
2526
"github.com/influxdata/influxdb/pkg/metrics"
@@ -305,15 +306,13 @@ func NewFileStore(dir string, options ...TsmReaderOption) *FileStore {
305306
openLimiter: limiter.NewFixed(runtime.GOMAXPROCS(0)),
306307
stats: &FileStoreStatistics{},
307308
purger: &purger{
308-
files: map[string]TSMFile{},
309309
logger: logger,
310310
},
311311
obs: noFileStoreObserver{},
312312
parseFileName: DefaultParseFileName,
313313
copyFiles: runtime.GOOS == "windows",
314314
readerOptions: append([]TsmReaderOption{WithParseFileNameFunc(DefaultParseFileName)}, options...),
315315
}
316-
fs.purger.fileStore = fs
317316
return fs
318317
}
319318

@@ -1595,88 +1594,109 @@ func (c *KeyCursor) nextDescending() {
15951594
}
15961595
}
15971596

1597+
// purger manages asynchronous deletion of TSM files that have been
1598+
// replaced by compaction, but are temporarily held open by queries
15981599
type purger struct {
1599-
mu sync.RWMutex
1600-
fileStore *FileStore
1601-
files map[string]TSMFile
1602-
running bool
1600+
files gensyncmap.Map[string, TSMFile]
1601+
mu sync.Mutex
1602+
running bool
16031603

16041604
logger *zap.Logger
16051605
}
16061606

16071607
func (p *purger) add(files []TSMFile) {
1608-
var fileNames []string
1609-
16101608
if len(files) == 0 {
16111609
return
16121610
}
1613-
p.mu.Lock()
1611+
1612+
var fileNames []string
16141613
for _, f := range files {
16151614
fileName := f.Path()
16161615
fileNames = append(fileNames, fileName)
1617-
p.files[fileName] = f
1616+
p.files.Store(fileName, f)
16181617
}
1619-
p.mu.Unlock()
1618+
16201619
p.purge(fileNames)
16211620
}
16221621

1622+
// purge starts a goroutine to purge files from disk if one isn't already running.
16231623
func (p *purger) purge(fileNames []string) {
16241624
logger, logEndOp := logger.NewOperation(p.logger, "Purge held files", "filestore_purger")
16251625

16261626
logger.Info("added", zap.Int("count", len(fileNames)))
16271627
logger.Debug("purging", zap.Strings("files", fileNames))
1628+
16281629
p.mu.Lock()
1630+
defer p.mu.Unlock()
16291631
if p.running {
1630-
p.mu.Unlock()
16311632
logger.Info("already running, files added to previous operation")
16321633
logEndOp()
16331634
return
16341635
}
16351636
p.running = true
1636-
p.mu.Unlock()
16371637

16381638
go func() {
16391639
var purgeCount int
1640+
var failCount int
16401641
defer func() {
16411642
logger.Info("removed", zap.Int("files", purgeCount))
1643+
if failCount > 0 {
1644+
logger.Warn("failed to remove", zap.Int("files", failCount))
1645+
}
16421646
logEndOp()
16431647
}()
1644-
for {
1645-
p.mu.Lock()
1646-
for k, v := range p.files {
1648+
1649+
// hasFiles() acquires the lock to check files.Len() and, if empty,
1650+
// sets running = false before returning. This ensures no race between
1651+
// add() checking running and the goroutine exiting.
1652+
for p.hasFiles() {
1653+
p.files.Range(func(k string, v TSMFile) bool {
16471654
// In order to ensure that there are no races with this (file held externally calls Ref
16481655
// after we check InUse), we need to maintain the invariant that every handle to a file
16491656
// is handed out in use (Ref'd), and handlers only ever relinquish the file once (call Unref
1650-
// exactly once, and never use it again). InUse is only valid during a write lock, since
1651-
// we allow calls to Ref and Unref under the read lock and no lock at all respectively.
1657+
// exactly once, and never use it again).
16521658
if !v.InUse() {
16531659
if err := v.Close(); err != nil {
16541660
logger.Error("close file failed", zap.String("file", k), zap.Error(err))
1655-
continue
1656-
}
1657-
1658-
if err := v.Remove(); err != nil {
1661+
failCount++
1662+
} else if err := v.Remove(); err != nil {
16591663
logger.Error("remove file failed", zap.String("file", k), zap.Error(err))
1660-
continue
1664+
failCount++
1665+
} else {
1666+
logger.Debug("successfully removed", zap.String("file", k))
1667+
purgeCount++
16611668
}
1662-
logger.Debug("successfully removed", zap.String("file", k))
1663-
delete(p.files, k)
1664-
purgeCount++
1669+
// Remove the file regardless of success or failure.
1670+
// Do not retry files which could not be closed or removed.
1671+
// This is because they have already been closed, or there
1672+
// is an operating system problem that is unlikely to
1673+
// resolve by itself.
1674+
p.files.Delete(k)
16651675
}
1666-
}
1667-
1668-
if len(p.files) == 0 {
1669-
p.running = false
1670-
p.mu.Unlock()
1671-
return
1672-
}
1676+
// InUse files are left to be tried later.
1677+
return true
1678+
})
16731679

1674-
p.mu.Unlock()
16751680
time.Sleep(time.Second)
16761681
}
16771682
}()
16781683
}
16791684

1685+
func (p *purger) hasFiles() bool {
1686+
p.mu.Lock()
1687+
defer p.mu.Unlock()
1688+
1689+
has := false
1690+
1691+
// Avoid calling Len() which iterates over the whole map.
1692+
p.files.Range(func(k string, v TSMFile) bool {
1693+
has = true
1694+
return false // stop iteration after finding the first file
1695+
})
1696+
p.running = has
1697+
return has
1698+
}
1699+
16801700
type tsmReaders []TSMFile
16811701

16821702
func (a tsmReaders) Len() int { return len(a) }

0 commit comments

Comments
 (0)