@@ -10,6 +10,7 @@ import (
1010 "github.com/influxdata/influxdb/logger"
1111 "github.com/influxdata/influxdb/models"
1212 "github.com/influxdata/influxdb/tsdb"
13+ "github.com/stretchr/testify/require"
1314 "golang.org/x/sync/errgroup"
1415)
1516
@@ -191,38 +192,59 @@ func TestSeriesFileCompactor(t *testing.T) {
191192
192193// Ensure series file deletions persist across compactions.
193194func TestSeriesFile_DeleteSeriesID (t * testing.T ) {
194- sfile := MustOpenSeriesFile ()
195- defer sfile .Close ()
196-
197- ids0 , err := sfile .CreateSeriesListIfNotExists ([][]byte {[]byte ("m1" )}, []models.Tags {nil }, tsdb .NoopStatsTracker ())
198- if err != nil {
199- t .Fatal (err )
200- } else if _ , err := sfile .CreateSeriesListIfNotExists ([][]byte {[]byte ("m2" )}, []models.Tags {nil }, tsdb .NoopStatsTracker ()); err != nil {
201- t .Fatal (err )
202- } else if err := sfile .ForceCompact (); err != nil {
203- t .Fatal (err )
195+ deleteTestFn := func (flush bool ) {
196+ sfile := MustOpenSeriesFile ()
197+ defer func (sfile * SeriesFile ) {
198+ err := sfile .Close ()
199+ require .NoError (t , err , "close sfile" )
200+ }(sfile )
201+
202+ ids0 , err := sfile .CreateSeriesListIfNotExists ([][]byte {[]byte ("m1" )}, []models.Tags {nil }, tsdb .NoopStatsTracker ())
203+ require .NoError (t , err , "create series list" )
204+ _ , err = sfile .CreateSeriesListIfNotExists ([][]byte {[]byte ("m2" )}, []models.Tags {nil }, tsdb .NoopStatsTracker ())
205+ require .NoError (t , err , "create series list" )
206+ err = sfile .ForceCompact ()
207+ require .NoError (t , err , "force compact" )
208+
209+ // Delete and ensure deletion.
210+ _ , err = sfile .DeleteSeriesID (ids0 [0 ], flush )
211+ require .NoError (t , err , "delete series list" )
212+ _ , err = sfile .CreateSeriesListIfNotExists ([][]byte {[]byte ("m1" )}, []models.Tags {nil }, tsdb .NoopStatsTracker ())
213+ require .NoError (t , err , "create series list" )
214+ require .True (t , sfile .IsDeleted (ids0 [0 ]), "expected deleted" )
215+
216+ err = sfile .ForceCompact ()
217+ require .NoError (t , err , "force compact" )
218+ require .True (t , sfile .IsDeleted (ids0 [0 ]), "expected deleted" )
219+
220+ err = sfile .Reopen ()
221+ require .NoError (t , err , "reopen" )
222+ require .True (t , sfile .IsDeleted (ids0 [0 ]), "expected deleted" )
204223 }
205224
206- // Delete and ensure deletion.
207- if err := sfile .DeleteSeriesID (ids0 [0 ]); err != nil {
208- t .Fatal (err )
209- } else if _ , err := sfile .CreateSeriesListIfNotExists ([][]byte {[]byte ("m1" )}, []models.Tags {nil }, tsdb .NoopStatsTracker ()); err != nil {
210- t .Fatal (err )
211- } else if ! sfile .IsDeleted (ids0 [0 ]) {
212- t .Fatal ("expected deletion before compaction" )
225+ tests := []struct {
226+ name string
227+ fn func ()
228+ }{{
229+ name : "delete series with flush" ,
230+ fn : func () {
231+ deleteTestFn (tsdb .Flush )
232+ },
233+ },
234+ {
235+ name : "delete series with no flush" ,
236+ fn : func () {
237+ deleteTestFn (tsdb .NoFlush )
238+ },
239+ },
213240 }
214241
215- if err := sfile . ForceCompact (); err != nil {
216- t .Fatal ( err )
217- } else if ! sfile . IsDeleted ( ids0 [ 0 ]) {
218- t . Fatal ( "expected deletion after compaction" )
242+ for _ , tt := range tests {
243+ t .Run ( tt . name , func ( t * testing. T ) {
244+ tt . fn ()
245+ } )
219246 }
220247
221- if err := sfile .Reopen (); err != nil {
222- t .Fatal (err )
223- } else if ! sfile .IsDeleted (ids0 [0 ]) {
224- t .Fatal ("expected deletion after reopen" )
225- }
226248}
227249
228250func TestSeriesFile_Compaction (t * testing.T ) {
@@ -246,7 +268,7 @@ func TestSeriesFile_Compaction(t *testing.T) {
246268 // Delete a subset of keys.
247269 for i , id := range ids {
248270 if i % 10 == 0 {
249- if err := sfile .DeleteSeriesID (id ); err != nil {
271+ if _ , err := sfile .DeleteSeriesID (id , tsdb . Flush ); err != nil {
250272 t .Fatal (err )
251273 }
252274 }
@@ -325,7 +347,7 @@ func BenchmarkSeriesFile_Compaction(b *testing.B) {
325347
326348 // Delete a subset of keys.
327349 for i := 0 ; i < len (ids ); i += 10 {
328- if err := sfile .DeleteSeriesID (ids [i ]); err != nil {
350+ if _ , err := sfile .DeleteSeriesID (ids [i ], tsdb . Flush ); err != nil {
329351 b .Fatal (err )
330352 }
331353 }
@@ -353,6 +375,37 @@ func BenchmarkSeriesFile_Compaction(b *testing.B) {
353375 }
354376}
355377
378+ func TestSeriesFile_FlushSegments (t * testing.T ) {
379+ const SeriesN = 100
380+ sfile := MustOpenSeriesFile ()
381+ defer func () {
382+ require .NoError (t , sfile .Close (), "series file close" )
383+ }()
384+
385+ // Create some series to ensure there's data in the segments.
386+ var names = make ([][]byte , 0 , SeriesN )
387+ var tagsSlice = make ([]models.Tags , 0 , SeriesN )
388+ for i := 0 ; i < SeriesN ; i ++ {
389+ names = append (names , []byte (fmt .Sprintf ("measurement%d" , i )))
390+ tagsSlice = append (tagsSlice , models .NewTags (map [string ]string {"tag" : "value" }))
391+ }
392+ ids , err := sfile .CreateSeriesListIfNotExists (names , tagsSlice , tsdb .NoopStatsTracker ())
393+ require .NoError (t , err )
394+
395+ // Collect all partition IDs that have series.
396+ partitionIDs := make (map [int ]struct {}, tsdb .SeriesFilePartitionN )
397+ for _ , id := range ids {
398+ partitionIDs [sfile .SeriesIDPartitionID (id )] = struct {}{}
399+ }
400+
401+ // Flush the segments.
402+ err = sfile .FlushSegments (partitionIDs )
403+ require .NoError (t , err )
404+
405+ // Verify series still exist after flush.
406+ require .Equal (t , uint64 (SeriesN ), sfile .SeriesCount ())
407+ }
408+
356409// Series represents name/tagset pairs that are used in testing.
357410type Series struct {
358411 Name []byte
0 commit comments