@@ -306,7 +306,7 @@ func (s *levelsController) dropPrefixes(prefixes [][]byte) error {
306306 // function in logs, and forces a compaction.
307307 dropPrefixes : prefixes ,
308308 }
309- if err := s .doCompact (cp ); err != nil {
309+ if err := s .doCompact (174 , cp ); err != nil {
310310 opt .Warningf ("While compacting level 0: %v" , err )
311311 return nil
312312 }
@@ -366,11 +366,13 @@ func (s *levelsController) startCompact(lc *y.Closer) {
366366 n := s .kv .opt .NumCompactors
367367 lc .AddRunning (n - 1 )
368368 for i := 0 ; i < n ; i ++ {
369- go s .runWorker (lc )
369+ // The worker with id=0 is dedicated to L0 and L1. This is not counted
370+ // towards the user specified NumCompactors.
371+ go s .runCompactor (i , lc )
370372 }
371373}
372374
373- func (s * levelsController ) runWorker ( lc * y.Closer ) {
375+ func (s * levelsController ) runCompactor ( id int , lc * y.Closer ) {
374376 defer lc .Done ()
375377
376378 randomDelay := time .NewTimer (time .Duration (rand .Int31n (1000 )) * time .Millisecond )
@@ -381,7 +383,7 @@ func (s *levelsController) runWorker(lc *y.Closer) {
381383 return
382384 }
383385
384- ticker := time .NewTicker (time .Second )
386+ ticker := time .NewTicker (100 * time .Millisecond )
385387 defer ticker .Stop ()
386388
387389 for {
@@ -391,7 +393,15 @@ func (s *levelsController) runWorker(lc *y.Closer) {
391393 prios := s .pickCompactLevels ()
392394 loop:
393395 for _ , p := range prios {
394- err := s .doCompact (p )
396+ if id == 0 && p .level > 1 {
397+ // If I'm ID zero, I only compact L0 and L1.
398+ continue
399+ }
400+ if id != 0 && p .level <= 1 {
401+ // If I'm ID non-zero, I do NOT compact L0 and L1.
402+ continue
403+ }
404+ err := s .doCompact (id , p )
395405 switch err {
396406 case nil :
397407 break loop
@@ -453,10 +463,11 @@ func (s *levelsController) pickCompactLevels() (prios []compactionPriority) {
453463 prios = append (prios , pri )
454464 }
455465 }
456- // We used to sort compaction priorities based on the score. But, we
457- // decided to compact based on the level, not the priority. So, upper
458- // levels (level 0, level 1, etc) always get compacted first, before the
459- // lower levels -- this allows us to avoid stalls.
466+ // We should continue to sort the compaction priorities by score. Now that we have a dedicated
467+ // compactor for L0 and L1, we don't need to sort by level here.
468+ sort .Slice (prios , func (i , j int ) bool {
469+ return prios [i ].score > prios [j ].score
470+ })
460471 return prios
461472}
462473
@@ -544,15 +555,13 @@ nextTable:
544555 // that would affect the snapshot view guarantee provided by transactions.
545556 discardTs := s .kv .orc .discardAtOrBelow ()
546557
547- // Start generating new tables.
548- type newTableResult struct {
549- table * table.Table
550- err error
551- }
552- resultCh := make (chan newTableResult )
553558 var numBuilds , numVersions int
554559 var lastKey , skipKey []byte
555560 var vp valuePointer
561+ var newTables []* table.Table
562+ mu := new (sync.Mutex ) // Guards newTables
563+
564+ inflightBuilders := y .NewThrottle (5 )
556565 for it .Valid () {
557566 timeStart := time .Now ()
558567 dk , err := s .kv .registry .latestDataKey ()
@@ -649,67 +658,66 @@ nextTable:
649658 // called Add() at least once, and builder is not Empty().
650659 s .kv .opt .Debugf ("LOG Compact. Added %d keys. Skipped %d keys. Iteration took: %v" ,
651660 numKeys , numSkips , time .Since (timeStart ))
652- build := func (fileID uint64 ) (* table.Table , error ) {
653- fd , err := y .CreateSyncedFile (table .NewFilename (fileID , s .kv .opt .Dir ), true )
654- if err != nil {
655- return nil , errors .Wrapf (err , "While opening new table: %d" , fileID )
656- }
657-
658- if _ , err := fd .Write (builder .Finish ()); err != nil {
659- return nil , errors .Wrapf (err , "Unable to write to file: %d" , fileID )
660- }
661- tbl , err := table .OpenTable (fd , bopts )
662- // decrRef is added below.
663- return tbl , errors .Wrapf (err , "Unable to open table: %q" , fd .Name ())
664- }
665661 if builder .Empty () {
666662 continue
667663 }
668664 numBuilds ++
669665 fileID := s .reserveFileID ()
666+ if err := inflightBuilders .Do (); err != nil {
667+ // Can't return from here, until I decrRef all the tables that I built so far.
668+ break
669+ }
670670 go func (builder * table.Builder ) {
671671 defer builder .Close ()
672- var (
673- tbl * table.Table
674- err error
675- )
672+ defer inflightBuilders .Done (err )
673+
674+ build := func (fileID uint64 ) (* table.Table , error ) {
675+ fd , err := y .CreateSyncedFile (table .NewFilename (fileID , s .kv .opt .Dir ), true )
676+ if err != nil {
677+ return nil , errors .Wrapf (err , "While opening new table: %d" , fileID )
678+ }
679+
680+ if _ , err := fd .Write (builder .Finish ()); err != nil {
681+ return nil , errors .Wrapf (err , "Unable to write to file: %d" , fileID )
682+ }
683+ tbl , err := table .OpenTable (fd , bopts )
684+ // decrRef is added below.
685+ return tbl , errors .Wrapf (err , "Unable to open table: %q" , fd .Name ())
686+ }
687+
688+ var tbl * table.Table
689+ var err error
676690 if s .kv .opt .InMemory {
677691 tbl , err = table .OpenInMemoryTable (builder .Finish (), fileID , & bopts )
678692 } else {
679693 tbl , err = build (fileID )
680694 }
681- resultCh <- newTableResult {tbl , err }
682- }(builder )
683- }
684695
685- newTables := make ([] * table. Table , 0 , 20 )
686- // Wait for all table builders to finish.
687- var firstErr error
688- for x := 0 ; x < numBuilds ; x ++ {
689- res := <- resultCh
690- newTables = append ( newTables , res . table )
691- if firstErr == nil {
692- firstErr = res . err
693- }
696+ // If we couldn't build the table, return fast.
697+ if err != nil {
698+ return
699+ }
700+
701+ mu . Lock ( )
702+ newTables = append ( newTables , tbl )
703+ mu . Unlock ()
704+ }( builder )
694705 }
695706
696- if firstErr == nil {
707+ // Wait for all table builders to finish and also for newTables accumulator to finish.
708+ err := inflightBuilders .Finish ()
709+ if err == nil {
697710 // Ensure created files' directory entries are visible. We don't mind the extra latency
698711 // from not doing this ASAP after all file creation has finished because this is a
699712 // background operation.
700- firstErr = s .kv .syncDir (s .kv .opt .Dir )
713+ err = s .kv .syncDir (s .kv .opt .Dir )
701714 }
702715
703- if firstErr != nil {
716+ if err != nil {
704717 // An error happened. Delete all the newly created table files (by calling DecrRef
705718 // -- we're the only holders of a ref).
706- for j := 0 ; j < numBuilds ; j ++ {
707- if newTables [j ] != nil {
708- _ = newTables [j ].DecrRef ()
709- }
710- }
711- errorReturn := errors .Wrapf (firstErr , "While running compaction for: %+v" , cd )
712- return nil , nil , errorReturn
719+ _ = decrRefs (newTables )
720+ return nil , nil , errors .Wrapf (err , "while running compactions for: %+v" , cd )
713721 }
714722
715723 sort .Slice (newTables , func (i , j int ) bool {
@@ -956,7 +964,7 @@ func (s *levelsController) runCompactDef(l int, cd compactDef) (err error) {
956964var errFillTables = errors .New ("Unable to fill tables" )
957965
958966// doCompact picks some table on level l and compacts it away to the next level.
959- func (s * levelsController ) doCompact (p compactionPriority ) error {
967+ func (s * levelsController ) doCompact (id int , p compactionPriority ) error {
960968 l := p .level
961969 y .AssertTrue (l + 1 < s .kv .opt .MaxLevels ) // Sanity check.
962970
@@ -969,7 +977,7 @@ func (s *levelsController) doCompact(p compactionPriority) error {
969977 cd .elog .SetMaxEvents (100 )
970978 defer cd .elog .Finish ()
971979
972- s .kv .opt .Infof ( "Got compaction priority : %+v" , p )
980+ s .kv .opt .Debugf ( "[Compactor: %d] Attempting to run compaction : %+v", id , p )
973981
974982 // While picking tables to be compacted, both levels' tables are expected to
975983 // remain unchanged.
@@ -985,16 +993,17 @@ func (s *levelsController) doCompact(p compactionPriority) error {
985993 }
986994 defer s .cstatus .delete (cd ) // Remove the ranges from compaction status.
987995
988- s .kv .opt .Infof ("Running for level: %d\n " , cd .thisLevel .level )
996+ s .kv .opt .Infof ("[Compactor: %d] Running compaction: %+v for level: %d\n " ,
997+ id , p , cd .thisLevel .level )
989998 s .cstatus .toLog (cd .elog )
990999 if err := s .runCompactDef (l , cd ); err != nil {
9911000 // This compaction couldn't be done successfully.
992- s .kv .opt .Warningf ("LOG Compact FAILED with error: %+v: %+v" , err , cd )
1001+ s .kv .opt .Warningf ("[Compactor: %d] LOG Compact FAILED with error: %+v: %+v" , id , err , cd )
9931002 return err
9941003 }
9951004
9961005 s .cstatus .toLog (cd .elog )
997- s .kv .opt .Infof ("Compaction for level: %d DONE" , cd .thisLevel .level )
1006+ s .kv .opt .Infof ("[Compactor: %d] Compaction for level: %d DONE" , id , cd .thisLevel .level )
9981007 return nil
9991008}
10001009
@@ -1018,7 +1027,7 @@ func (s *levelsController) addLevel0Table(t *table.Table) error {
10181027 // Stall. Make sure all levels are healthy before we unstall.
10191028 var timeStart time.Time
10201029 {
1021- s .kv .opt .Debugf ("STALLED STALLED STALLED: %v\n " , time .Since (s .lastUnstalled ))
1030+ s .kv .opt .Infof ("STALLED STALLED STALLED: %v\n " , time .Since (s .lastUnstalled ))
10221031 s .cstatus .RLock ()
10231032 for i := 0 ; i < s .kv .opt .MaxLevels ; i ++ {
10241033 s .kv .opt .Debugf ("level=%d. Status=%s Size=%d\n " ,
0 commit comments