@@ -25,6 +25,7 @@ import (
2525 "github.com/influxdata/influxdb/v2/models"
2626 "github.com/influxdata/influxdb/v2/pkg/deep"
2727 "github.com/influxdata/influxdb/v2/pkg/slices"
28+ "github.com/influxdata/influxdb/v2/pkg/snowflake"
2829 "github.com/influxdata/influxdb/v2/predicate"
2930 "github.com/influxdata/influxdb/v2/tsdb"
3031 "github.com/influxdata/influxql"
@@ -886,6 +887,122 @@ func TestStore_FlushWALOnClose(t *testing.T) {
886887 }
887888}
888889
890+ func TestStore_WRRSegments (t * testing.T ) {
891+ // Check if uncommitted WRR segments are identified and cause opening the store to abort.
892+ for _ , index := range tsdb .RegisteredIndexes () {
893+ t .Run ("TestStore_WRRSegments_" + index , func (t * testing.T ) {
894+ idGen := snowflake .New (0 )
895+ generateWRRSegmentName := func () string {
896+ return fmt .Sprintf ("_v01_%020d.wrr" , idGen .Next ())
897+ }
898+ createFile := func (t * testing.T , fn string ) {
899+ t .Helper ()
900+ require .NoError (t , os .WriteFile (fn , nil , 0666 ))
901+ }
902+ createWRR := func (t * testing.T , path string ) string {
903+ t .Helper ()
904+ fn := filepath .Join (path , generateWRRSegmentName ())
905+ createFile (t , fn )
906+ return fn
907+ }
908+ generateWRRSnapshotName := func () string {
909+ return generateWRRSegmentName () + ".snapshot"
910+ }
911+ createWRRSnapshot := func (t * testing.T , path string ) string {
912+ t .Helper ()
913+ fn := filepath .Join (path , generateWRRSnapshotName ())
914+ createFile (t , fn )
915+ return fn
916+ }
917+ checkWRRError := func (t * testing.T , err error , wrrs ... []string ) {
918+ t .Helper ()
919+ require .ErrorIs (t , err , tsdb .ErrIncompatibleWAL )
920+ require .ErrorContains (t , err , "incompatible WAL format: uncommitted WRR files found" )
921+ // We don't know the exact order of the errors if there are multiple shards with
922+ // uncommitted WRRs, but this will insure that all of them are included in the error
923+ // message.
924+ for _ , w := range wrrs {
925+ if len (w ) > 0 {
926+ require .ErrorContains (t , err , fmt .Sprintf ("%v" , w ))
927+ }
928+ }
929+ }
930+
931+ s := MustOpenStore (t , index , WithWALFlushOnShutdown (true ))
932+ defer s .Close ()
933+
934+ // Create shard #0 with data.
935+ s .MustCreateShardWithData ("db0" , "rp0" , 0 ,
936+ `cpu,host=serverA value=1 0` ,
937+ `cpu,host=serverA value=2 10` ,
938+ `cpu,host=serverB value=3 20` ,
939+ )
940+
941+ // Create shard #1 with data.
942+ s .MustCreateShardWithData ("db0" , "rp0" , 1 ,
943+ `cpu,host=serverA value=1 30` ,
944+ `cpu,host=serverC value=3 60` ,
945+ )
946+
947+ sh0WALPath := filepath .Join (s .walPath , "db0" , "rp0" , "0" )
948+ require .DirExists (t , sh0WALPath )
949+ sh1WALPath := filepath .Join (s .walPath , "db0" , "rp0" , "1" )
950+ require .DirExists (t , sh1WALPath )
951+
952+ // No WRR segments, no error
953+ require .NoError (t , s .Reopen (t ))
954+
955+ // 1 uncommitted WRR segment in shard 0
956+ var sh0Uncommitted , sh1Uncommitted []string
957+ checkReopen := func (t * testing.T ) {
958+ t .Helper ()
959+ allUncommitted := [][]string {sh0Uncommitted , sh1Uncommitted }
960+ var hasUncommitted bool
961+ for _ , u := range allUncommitted {
962+ if len (u ) > 0 {
963+ hasUncommitted = true
964+ }
965+ }
966+
967+ if hasUncommitted {
968+ checkWRRError (t , s .Reopen (t ), allUncommitted ... )
969+ } else {
970+ require .NoError (t , s .Reopen (t ))
971+ }
972+ }
973+ sh0Uncommitted = append (sh0Uncommitted , createWRR (t , sh0WALPath ))
974+ checkReopen (t )
975+
976+ // 2 uncommitted WRR segments in shard 0
977+ sh0Uncommitted = append (sh0Uncommitted , createWRR (t , sh0WALPath ))
978+ checkReopen (t )
979+
980+ // 2 uncommitted WR segments in shard 0, 1 in shard 1
981+ sh1Uncommitted = append (sh1Uncommitted , createWRR (t , sh1WALPath ))
982+ checkReopen (t )
983+
984+ // No uncommitted WRR in shard 0, 1 in shard 1
985+ createWRRSnapshot (t , sh0WALPath )
986+ sh0Uncommitted = nil
987+ checkReopen (t )
988+
989+ // No uncommitted WRR segments
990+ createWRRSnapshot (t , sh1WALPath )
991+ sh1Uncommitted = nil
992+ checkReopen (t )
993+
994+ // Add 1 uncommitted to shard 1
995+ sh1Uncommitted = append (sh1Uncommitted , createWRR (t , sh1WALPath ))
996+ checkReopen (t )
997+
998+ // No uncommitted WRR segments
999+ createWRRSnapshot (t , sh1WALPath )
1000+ sh1Uncommitted = nil
1001+ checkReopen (t )
1002+ })
1003+ }
1004+ }
1005+
8891006// Test new reader blocking.
8901007func TestStore_NewReadersBlocked (t * testing.T ) {
8911008 //t.Parallel()
0 commit comments