@@ -20,6 +20,7 @@ import (
2020
2121 "github.com/influxdata/influxdb/logger"
2222 "github.com/influxdata/influxdb/models"
23+ "github.com/influxdata/influxdb/pkg/data/gensyncmap"
2324 "github.com/influxdata/influxdb/pkg/file"
2425 "github.com/influxdata/influxdb/pkg/limiter"
2526 "github.com/influxdata/influxdb/pkg/metrics"
@@ -305,15 +306,13 @@ func NewFileStore(dir string, options ...TsmReaderOption) *FileStore {
305306 openLimiter : limiter .NewFixed (runtime .GOMAXPROCS (0 )),
306307 stats : & FileStoreStatistics {},
307308 purger : & purger {
308- files : map [string ]TSMFile {},
309309 logger : logger ,
310310 },
311311 obs : noFileStoreObserver {},
312312 parseFileName : DefaultParseFileName ,
313313 copyFiles : runtime .GOOS == "windows" ,
314314 readerOptions : append ([]TsmReaderOption {WithParseFileNameFunc (DefaultParseFileName )}, options ... ),
315315 }
316- fs .purger .fileStore = fs
317316 return fs
318317}
319318
@@ -1595,88 +1594,109 @@ func (c *KeyCursor) nextDescending() {
15951594 }
15961595}
15971596
1597+ // purger manages asynchronous deletion of TSM files that have been
1598+ // replaced by compaction, but are temporarily held open by queries
15981599type purger struct {
1599- mu sync.RWMutex
1600- fileStore * FileStore
1601- files map [string ]TSMFile
1602- running bool
1600+ files gensyncmap.Map [string , TSMFile ]
1601+ mu sync.Mutex
1602+ running bool
16031603
16041604 logger * zap.Logger
16051605}
16061606
16071607func (p * purger ) add (files []TSMFile ) {
1608- var fileNames []string
1609-
16101608 if len (files ) == 0 {
16111609 return
16121610 }
1613- p .mu .Lock ()
1611+
1612+ var fileNames []string
16141613 for _ , f := range files {
16151614 fileName := f .Path ()
16161615 fileNames = append (fileNames , fileName )
1617- p .files [ fileName ] = f
1616+ p .files . Store ( fileName , f )
16181617 }
1619- p . mu . Unlock ()
1618+
16201619 p .purge (fileNames )
16211620}
16221621
1622+ // purge starts a goroutine to purge files from disk if one isn't already running.
16231623func (p * purger ) purge (fileNames []string ) {
16241624 logger , logEndOp := logger .NewOperation (p .logger , "Purge held files" , "filestore_purger" )
16251625
16261626 logger .Info ("added" , zap .Int ("count" , len (fileNames )))
16271627 logger .Debug ("purging" , zap .Strings ("files" , fileNames ))
1628+
16281629 p .mu .Lock ()
1630+ defer p .mu .Unlock ()
16291631 if p .running {
1630- p .mu .Unlock ()
16311632 logger .Info ("already running, files added to previous operation" )
16321633 logEndOp ()
16331634 return
16341635 }
16351636 p .running = true
1636- p .mu .Unlock ()
16371637
16381638 go func () {
16391639 var purgeCount int
1640+ var failCount int
16401641 defer func () {
16411642 logger .Info ("removed" , zap .Int ("files" , purgeCount ))
1643+ if failCount > 0 {
1644+ logger .Warn ("failed to remove" , zap .Int ("files" , failCount ))
1645+ }
16421646 logEndOp ()
16431647 }()
1644- for {
1645- p .mu .Lock ()
1646- for k , v := range p .files {
1648+
1649+ // hasFiles() acquires the lock to check files.Len() and, if empty,
1650+ // sets running = false before returning. This ensures no race between
1651+ // add() checking running and the goroutine exiting.
1652+ for p .hasFiles () {
1653+ p .files .Range (func (k string , v TSMFile ) bool {
16471654 // In order to ensure that there are no races with this (file held externally calls Ref
16481655 // after we check InUse), we need to maintain the invariant that every handle to a file
16491656 // is handed out in use (Ref'd), and handlers only ever relinquish the file once (call Unref
1650- // exactly once, and never use it again). InUse is only valid during a write lock, since
1651- // we allow calls to Ref and Unref under the read lock and no lock at all respectively.
1657+ // exactly once, and never use it again).
16521658 if ! v .InUse () {
16531659 if err := v .Close (); err != nil {
16541660 logger .Error ("close file failed" , zap .String ("file" , k ), zap .Error (err ))
1655- continue
1656- }
1657-
1658- if err := v .Remove (); err != nil {
1661+ failCount ++
1662+ } else if err := v .Remove (); err != nil {
16591663 logger .Error ("remove file failed" , zap .String ("file" , k ), zap .Error (err ))
1660- continue
1664+ failCount ++
1665+ } else {
1666+ logger .Debug ("successfully removed" , zap .String ("file" , k ))
1667+ purgeCount ++
16611668 }
1662- logger .Debug ("successfully removed" , zap .String ("file" , k ))
1663- delete (p .files , k )
1664- purgeCount ++
1669+ // Remove the file regardless of success or failure.
1670+ // Do not retry files which could not be closed or removed.
1671+ // This is because they have already been closed, or there
1672+ // is an operating system problem that is unlikely to
1673+ // resolve by itself.
1674+ p .files .Delete (k )
16651675 }
1666- }
1667-
1668- if len (p .files ) == 0 {
1669- p .running = false
1670- p .mu .Unlock ()
1671- return
1672- }
1676+ // InUse files are left to be tried later.
1677+ return true
1678+ })
16731679
1674- p .mu .Unlock ()
16751680 time .Sleep (time .Second )
16761681 }
16771682 }()
16781683}
16791684
1685+ func (p * purger ) hasFiles () bool {
1686+ p .mu .Lock ()
1687+ defer p .mu .Unlock ()
1688+
1689+ has := false
1690+
1691+ // Avoid calling Len() which iterates over the whole map.
1692+ p .files .Range (func (k string , v TSMFile ) bool {
1693+ has = true
1694+ return false // stop iteration after finding the first file
1695+ })
1696+ p .running = has
1697+ return has
1698+ }
1699+
16801700type tsmReaders []TSMFile
16811701
16821702func (a tsmReaders ) Len () int { return len (a ) }
0 commit comments