Skip to content

Commit dab2927

Browse files
Ibrahim Jarifmanishrjain
authored andcommitted
fix(compaction): Use separate compactors for L0, L1 (#1466)
Related to #1459 This PR contains the following changes to compactions - Use a separate thread for compacting Level 0 and 1 and a separate one for other levels - Pick levels to compact based on score. - Stall Level 0 if compactions cannot keep up (we had added this in #1186) - Limit the number of open table builders to 5 in compactions.
1 parent c146f49 commit dab2927

File tree

7 files changed

+124
-122
lines changed

7 files changed

+124
-122
lines changed

badger/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ func main() {
2929
go func() {
3030
for i := 8080; i < 9080; i++ {
3131
fmt.Printf("Listening for /debug HTTP requests at port: %d\n", i)
32-
if err := http.ListenAndServe(fmt.Sprintf("localhost:%d", i), nil); err != nil {
32+
if err := http.ListenAndServe(fmt.Sprintf("0.0.0.0:%d", i), nil); err != nil {
3333
fmt.Println("Port busy. Trying another one...")
3434
continue
3535

db.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,12 @@ func (db *DB) replayFunction() func(Entry, valuePointer) error {
192192

193193
// Open returns a new DB object.
194194
func Open(opt Options) (db *DB, err error) {
195+
// It's okay to have zero compactors which will disable all compactions but
196+
// we cannot have just one compactor otherwise we will end up with all data
197+
// one level 2.
198+
if opt.NumCompactors == 1 {
199+
return nil, errors.New("Cannot have 1 compactor. Need at least 2")
200+
}
195201
if opt.InMemory && (opt.Dir != "" || opt.ValueDir != "") {
196202
return nil, errors.New("Cannot use badger in Disk-less mode with Dir or ValueDir set")
197203
}
@@ -528,7 +534,7 @@ func (db *DB) close() (err error) {
528534
// Force Compact L0
529535
// We don't need to care about cstatus since no parallel compaction is running.
530536
if db.opt.CompactL0OnClose {
531-
err := db.lc.doCompact(compactionPriority{level: 0, score: 1.73})
537+
err := db.lc.doCompact(173, compactionPriority{level: 0, score: 1.73})
532538
switch err {
533539
case errFillTables:
534540
// This error only means that there might be enough tables to do a compaction. So, we
@@ -1438,7 +1444,7 @@ func (db *DB) Flatten(workers int) error {
14381444
errCh := make(chan error, 1)
14391445
for i := 0; i < workers; i++ {
14401446
go func() {
1441-
errCh <- db.lc.doCompact(cp)
1447+
errCh <- db.lc.doCompact(175, cp)
14421448
}()
14431449
}
14441450
var success int

level_handler.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -188,9 +188,8 @@ func (s *levelHandler) tryAddLevel0Table(t *table.Table) bool {
188188
// Need lock as we may be deleting the first table during a level 0 compaction.
189189
s.Lock()
190190
defer s.Unlock()
191-
// Return false only if L0 is in memory and number of tables is more than number of
192-
// ZeroTableStall. For on disk L0, we should just add the tables to the level.
193-
if s.db.opt.KeepL0InMemory && len(s.tables) >= s.db.opt.NumLevelZeroTablesStall {
191+
// Stall (by returning false) if we are above the specified stall setting for L0.
192+
if len(s.tables) >= s.db.opt.NumLevelZeroTablesStall {
194193
return false
195194
}
196195

levels.go

Lines changed: 69 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ func (s *levelsController) dropPrefixes(prefixes [][]byte) error {
306306
// function in logs, and forces a compaction.
307307
dropPrefixes: prefixes,
308308
}
309-
if err := s.doCompact(cp); err != nil {
309+
if err := s.doCompact(174, cp); err != nil {
310310
opt.Warningf("While compacting level 0: %v", err)
311311
return nil
312312
}
@@ -366,11 +366,13 @@ func (s *levelsController) startCompact(lc *y.Closer) {
366366
n := s.kv.opt.NumCompactors
367367
lc.AddRunning(n - 1)
368368
for i := 0; i < n; i++ {
369-
go s.runWorker(lc)
369+
// The worker with id=0 is dedicated to L0 and L1. This is not counted
370+
// towards the user specified NumCompactors.
371+
go s.runCompactor(i, lc)
370372
}
371373
}
372374

373-
func (s *levelsController) runWorker(lc *y.Closer) {
375+
func (s *levelsController) runCompactor(id int, lc *y.Closer) {
374376
defer lc.Done()
375377

376378
randomDelay := time.NewTimer(time.Duration(rand.Int31n(1000)) * time.Millisecond)
@@ -381,7 +383,7 @@ func (s *levelsController) runWorker(lc *y.Closer) {
381383
return
382384
}
383385

384-
ticker := time.NewTicker(time.Second)
386+
ticker := time.NewTicker(100 * time.Millisecond)
385387
defer ticker.Stop()
386388

387389
for {
@@ -391,7 +393,15 @@ func (s *levelsController) runWorker(lc *y.Closer) {
391393
prios := s.pickCompactLevels()
392394
loop:
393395
for _, p := range prios {
394-
err := s.doCompact(p)
396+
if id == 0 && p.level > 1 {
397+
// If I'm ID zero, I only compact L0 and L1.
398+
continue
399+
}
400+
if id != 0 && p.level <= 1 {
401+
// If I'm ID non-zero, I do NOT compact L0 and L1.
402+
continue
403+
}
404+
err := s.doCompact(id, p)
395405
switch err {
396406
case nil:
397407
break loop
@@ -453,10 +463,11 @@ func (s *levelsController) pickCompactLevels() (prios []compactionPriority) {
453463
prios = append(prios, pri)
454464
}
455465
}
456-
// We used to sort compaction priorities based on the score. But, we
457-
// decided to compact based on the level, not the priority. So, upper
458-
// levels (level 0, level 1, etc) always get compacted first, before the
459-
// lower levels -- this allows us to avoid stalls.
466+
// We should continue to sort the compaction priorities by score. Now that we have a dedicated
467+
// compactor for L0 and L1, we don't need to sort by level here.
468+
sort.Slice(prios, func(i, j int) bool {
469+
return prios[i].score > prios[j].score
470+
})
460471
return prios
461472
}
462473

@@ -544,15 +555,13 @@ nextTable:
544555
// that would affect the snapshot view guarantee provided by transactions.
545556
discardTs := s.kv.orc.discardAtOrBelow()
546557

547-
// Start generating new tables.
548-
type newTableResult struct {
549-
table *table.Table
550-
err error
551-
}
552-
resultCh := make(chan newTableResult)
553558
var numBuilds, numVersions int
554559
var lastKey, skipKey []byte
555560
var vp valuePointer
561+
var newTables []*table.Table
562+
mu := new(sync.Mutex) // Guards newTables
563+
564+
inflightBuilders := y.NewThrottle(5)
556565
for it.Valid() {
557566
timeStart := time.Now()
558567
dk, err := s.kv.registry.latestDataKey()
@@ -649,67 +658,66 @@ nextTable:
649658
// called Add() at least once, and builder is not Empty().
650659
s.kv.opt.Debugf("LOG Compact. Added %d keys. Skipped %d keys. Iteration took: %v",
651660
numKeys, numSkips, time.Since(timeStart))
652-
build := func(fileID uint64) (*table.Table, error) {
653-
fd, err := y.CreateSyncedFile(table.NewFilename(fileID, s.kv.opt.Dir), true)
654-
if err != nil {
655-
return nil, errors.Wrapf(err, "While opening new table: %d", fileID)
656-
}
657-
658-
if _, err := fd.Write(builder.Finish()); err != nil {
659-
return nil, errors.Wrapf(err, "Unable to write to file: %d", fileID)
660-
}
661-
tbl, err := table.OpenTable(fd, bopts)
662-
// decrRef is added below.
663-
return tbl, errors.Wrapf(err, "Unable to open table: %q", fd.Name())
664-
}
665661
if builder.Empty() {
666662
continue
667663
}
668664
numBuilds++
669665
fileID := s.reserveFileID()
666+
if err := inflightBuilders.Do(); err != nil {
667+
// Can't return from here, until I decrRef all the tables that I built so far.
668+
break
669+
}
670670
go func(builder *table.Builder) {
671671
defer builder.Close()
672-
var (
673-
tbl *table.Table
674-
err error
675-
)
672+
defer inflightBuilders.Done(err)
673+
674+
build := func(fileID uint64) (*table.Table, error) {
675+
fd, err := y.CreateSyncedFile(table.NewFilename(fileID, s.kv.opt.Dir), true)
676+
if err != nil {
677+
return nil, errors.Wrapf(err, "While opening new table: %d", fileID)
678+
}
679+
680+
if _, err := fd.Write(builder.Finish()); err != nil {
681+
return nil, errors.Wrapf(err, "Unable to write to file: %d", fileID)
682+
}
683+
tbl, err := table.OpenTable(fd, bopts)
684+
// decrRef is added below.
685+
return tbl, errors.Wrapf(err, "Unable to open table: %q", fd.Name())
686+
}
687+
688+
var tbl *table.Table
689+
var err error
676690
if s.kv.opt.InMemory {
677691
tbl, err = table.OpenInMemoryTable(builder.Finish(), fileID, &bopts)
678692
} else {
679693
tbl, err = build(fileID)
680694
}
681-
resultCh <- newTableResult{tbl, err}
682-
}(builder)
683-
}
684695

685-
newTables := make([]*table.Table, 0, 20)
686-
// Wait for all table builders to finish.
687-
var firstErr error
688-
for x := 0; x < numBuilds; x++ {
689-
res := <-resultCh
690-
newTables = append(newTables, res.table)
691-
if firstErr == nil {
692-
firstErr = res.err
693-
}
696+
// If we couldn't build the table, return fast.
697+
if err != nil {
698+
return
699+
}
700+
701+
mu.Lock()
702+
newTables = append(newTables, tbl)
703+
mu.Unlock()
704+
}(builder)
694705
}
695706

696-
if firstErr == nil {
707+
// Wait for all table builders to finish and also for newTables accumulator to finish.
708+
err := inflightBuilders.Finish()
709+
if err == nil {
697710
// Ensure created files' directory entries are visible. We don't mind the extra latency
698711
// from not doing this ASAP after all file creation has finished because this is a
699712
// background operation.
700-
firstErr = s.kv.syncDir(s.kv.opt.Dir)
713+
err = s.kv.syncDir(s.kv.opt.Dir)
701714
}
702715

703-
if firstErr != nil {
716+
if err != nil {
704717
// An error happened. Delete all the newly created table files (by calling DecrRef
705718
// -- we're the only holders of a ref).
706-
for j := 0; j < numBuilds; j++ {
707-
if newTables[j] != nil {
708-
_ = newTables[j].DecrRef()
709-
}
710-
}
711-
errorReturn := errors.Wrapf(firstErr, "While running compaction for: %+v", cd)
712-
return nil, nil, errorReturn
719+
_ = decrRefs(newTables)
720+
return nil, nil, errors.Wrapf(err, "while running compactions for: %+v", cd)
713721
}
714722

715723
sort.Slice(newTables, func(i, j int) bool {
@@ -956,7 +964,7 @@ func (s *levelsController) runCompactDef(l int, cd compactDef) (err error) {
956964
var errFillTables = errors.New("Unable to fill tables")
957965

958966
// doCompact picks some table on level l and compacts it away to the next level.
959-
func (s *levelsController) doCompact(p compactionPriority) error {
967+
func (s *levelsController) doCompact(id int, p compactionPriority) error {
960968
l := p.level
961969
y.AssertTrue(l+1 < s.kv.opt.MaxLevels) // Sanity check.
962970

@@ -969,7 +977,7 @@ func (s *levelsController) doCompact(p compactionPriority) error {
969977
cd.elog.SetMaxEvents(100)
970978
defer cd.elog.Finish()
971979

972-
s.kv.opt.Infof("Got compaction priority: %+v", p)
980+
s.kv.opt.Debugf("[Compactor: %d] Attempting to run compaction: %+v", id, p)
973981

974982
// While picking tables to be compacted, both levels' tables are expected to
975983
// remain unchanged.
@@ -985,16 +993,17 @@ func (s *levelsController) doCompact(p compactionPriority) error {
985993
}
986994
defer s.cstatus.delete(cd) // Remove the ranges from compaction status.
987995

988-
s.kv.opt.Infof("Running for level: %d\n", cd.thisLevel.level)
996+
s.kv.opt.Infof("[Compactor: %d] Running compaction: %+v for level: %d\n",
997+
id, p, cd.thisLevel.level)
989998
s.cstatus.toLog(cd.elog)
990999
if err := s.runCompactDef(l, cd); err != nil {
9911000
// This compaction couldn't be done successfully.
992-
s.kv.opt.Warningf("LOG Compact FAILED with error: %+v: %+v", err, cd)
1001+
s.kv.opt.Warningf("[Compactor: %d] LOG Compact FAILED with error: %+v: %+v", id, err, cd)
9931002
return err
9941003
}
9951004

9961005
s.cstatus.toLog(cd.elog)
997-
s.kv.opt.Infof("Compaction for level: %d DONE", cd.thisLevel.level)
1006+
s.kv.opt.Infof("[Compactor: %d] Compaction for level: %d DONE", id, cd.thisLevel.level)
9981007
return nil
9991008
}
10001009

@@ -1018,7 +1027,7 @@ func (s *levelsController) addLevel0Table(t *table.Table) error {
10181027
// Stall. Make sure all levels are healthy before we unstall.
10191028
var timeStart time.Time
10201029
{
1021-
s.kv.opt.Debugf("STALLED STALLED STALLED: %v\n", time.Since(s.lastUnstalled))
1030+
s.kv.opt.Infof("STALLED STALLED STALLED: %v\n", time.Since(s.lastUnstalled))
10221031
s.cstatus.RLock()
10231032
for i := 0; i < s.kv.opt.MaxLevels; i++ {
10241033
s.kv.opt.Debugf("level=%d. Status=%s Size=%d\n",

levels_test.go

Lines changed: 39 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -715,52 +715,6 @@ func createEmptyTable(db *DB) *table.Table {
715715
}
716716

717717
func TestL0Stall(t *testing.T) {
718-
test := func(t *testing.T, opt *Options) {
719-
runBadgerTest(t, opt, func(t *testing.T, db *DB) {
720-
db.lc.levels[0].Lock()
721-
// Add NumLevelZeroTableStall+1 number of tables to level 0. This would fill up level
722-
// zero and all new additions are expected to stall if L0 is in memory.
723-
for i := 0; i < opt.NumLevelZeroTablesStall+1; i++ {
724-
db.lc.levels[0].tables = append(db.lc.levels[0].tables, createEmptyTable(db))
725-
}
726-
db.lc.levels[0].Unlock()
727-
728-
timeout := time.After(5 * time.Second)
729-
done := make(chan bool)
730-
731-
go func() {
732-
tab := createEmptyTable(db)
733-
require.NoError(t, db.lc.addLevel0Table(tab))
734-
tab.DecrRef()
735-
done <- true
736-
}()
737-
// Let it stall for a second.
738-
time.Sleep(time.Second)
739-
740-
select {
741-
case <-timeout:
742-
if opt.KeepL0InMemory {
743-
t.Log("Timeout triggered")
744-
// Mark this test as successful since L0 is in memory and the
745-
// addition of new table to L0 is supposed to stall.
746-
747-
// Remove tables from level 0 so that the stalled
748-
// compaction can make progress. This does not have any
749-
// effect on the test. This is done so that the goroutine
750-
// stuck on addLevel0Table can make progress and end.
751-
db.lc.levels[0].Lock()
752-
db.lc.levels[0].tables = nil
753-
db.lc.levels[0].Unlock()
754-
<-done
755-
} else {
756-
t.Fatal("Test didn't finish in time")
757-
}
758-
case <-done:
759-
// The test completed before 5 second timeout. Mark it as successful.
760-
}
761-
})
762-
}
763-
764718
opt := DefaultOptions("")
765719
// Disable all compactions.
766720
opt.NumCompactors = 0
@@ -769,13 +723,45 @@ func TestL0Stall(t *testing.T) {
769723
// Addition of new tables will stall if there are 4 or more L0 tables.
770724
opt.NumLevelZeroTablesStall = 4
771725

772-
t.Run("with KeepL0InMemory", func(t *testing.T) {
773-
opt.KeepL0InMemory = true
774-
test(t, &opt)
775-
})
776-
t.Run("with L0 on disk", func(t *testing.T) {
777-
opt.KeepL0InMemory = false
778-
test(t, &opt)
726+
runBadgerTest(t, &opt, func(t *testing.T, db *DB) {
727+
db.lc.levels[0].Lock()
728+
// Add NumLevelZeroTableStall+1 number of tables to level 0. This would fill up level
729+
// zero and all new additions are expected to stall if L0 is in memory.
730+
for i := 0; i < opt.NumLevelZeroTablesStall+1; i++ {
731+
db.lc.levels[0].tables = append(db.lc.levels[0].tables, createEmptyTable(db))
732+
}
733+
db.lc.levels[0].Unlock()
734+
735+
timeout := time.After(5 * time.Second)
736+
done := make(chan bool)
737+
738+
go func() {
739+
tab := createEmptyTable(db)
740+
require.NoError(t, db.lc.addLevel0Table(tab))
741+
tab.DecrRef()
742+
done <- true
743+
}()
744+
// Let it stall for a second.
745+
time.Sleep(time.Second)
746+
747+
select {
748+
case <-timeout:
749+
t.Log("Timeout triggered")
750+
// Mark this test as successful since L0 is in memory and the
751+
// addition of new table to L0 is supposed to stall.
752+
753+
// Remove tables from level 0 so that the stalled
754+
// compaction can make progress. This does not have any
755+
// effect on the test. This is done so that the goroutine
756+
// stuck on addLevel0Table can make progress and end.
757+
db.lc.levels[0].Lock()
758+
db.lc.levels[0].tables = nil
759+
db.lc.levels[0].Unlock()
760+
<-done
761+
case <-done:
762+
// The test completed before 5 second timeout. Mark it as successful.
763+
t.Fatal("Test did not stall")
764+
}
779765
})
780766
}
781767

0 commit comments

Comments
 (0)