Skip to content

Commit 1420aaf

Browse files
authored
feat: Logging and fsync delay for retention deletion (#27114) (#27129)
1 parent 52cd83d commit 1420aaf

File tree

6 files changed

+171
-41
lines changed

6 files changed

+171
-41
lines changed

cmd/influx_inspect/verify/seriesfile/verify_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ func NewTest(t *testing.T) *Test {
107107
}
108108

109109
// delete one series
110-
if err := seriesFile.DeleteSeriesID(ids[0]); err != nil {
110+
if _, err := seriesFile.DeleteSeriesID(ids[0], tsdb.Flush); err != nil {
111111
return err
112112
}
113113

tsdb/engine/tsm1/engine.go

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1852,12 +1852,15 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
18521852
// Remove the remaining ids from the series file as they no longer exist
18531853
// in any shard.
18541854
var err error
1855+
var partitionIDs = make(map[int]struct{}, tsdb.SeriesFilePartitionN)
18551856
ids.ForEach(func(id uint64) {
18561857
name, tags := e.sfile.Series(id)
1857-
if err1 := e.sfile.DeleteSeriesID(id); err1 != nil {
1858+
part, err1 := e.sfile.DeleteSeriesID(id, tsdb.NoFlush)
1859+
if err1 != nil {
18581860
err = err1
18591861
return
18601862
}
1863+
partitionIDs[part.ID()] = struct{}{}
18611864

18621865
// In the case of the inmem index the series can be removed across
18631866
// the global index (all shards).
@@ -1871,6 +1874,13 @@ func (e *Engine) deleteSeriesRange(seriesKeys [][]byte, min, max int64) error {
18711874
if err != nil {
18721875
return err
18731876
}
1877+
1878+
if err := e.sfile.FlushSegments(partitionIDs); err != nil {
1879+
e.sfile.Logger.Error(
1880+
"error while flushing a series file segment",
1881+
zap.String("series_file_path", e.sfile.Path()),
1882+
zap.Error(err))
1883+
}
18741884
}
18751885

18761886
return nil

tsdb/series_file.go

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,10 @@ const SeriesIDSize = 8
3030
const (
3131
// SeriesFilePartitionN is the number of partitions a series file is split into.
3232
SeriesFilePartitionN = 8
33+
// Flush lets us know when to fsync
34+
Flush = true
35+
// NoFlush lets us know when to not fsync
36+
NoFlush = false
3337
)
3438

3539
// SeriesFile represents the section of the index that holds series data.
@@ -194,13 +198,43 @@ func (f *SeriesFile) CreateSeriesListIfNotExists(names [][]byte, tagsSlice []mod
194198
}
195199

196200
// DeleteSeriesID flags a series as permanently deleted.
197-
// If the series is reintroduced later then it must create a new id.
198-
func (f *SeriesFile) DeleteSeriesID(id uint64) error {
201+
// If the series is reintroduced later than it must create a new id.
202+
// Setting flush will indicate whether this method triggers a fsync.
203+
func (f *SeriesFile) DeleteSeriesID(id uint64, flush bool) (*SeriesPartition, error) {
199204
p := f.SeriesIDPartition(id)
200205
if p == nil {
201-
return ErrInvalidSeriesPartitionID
206+
return nil, ErrInvalidSeriesPartitionID
202207
}
203-
return p.DeleteSeriesID(id)
208+
return p, p.DeleteSeriesID(id, flush)
209+
}
210+
211+
func (f *SeriesFile) FlushSegments(partitionIDs map[int]struct{}) error {
212+
var wg sync.WaitGroup
213+
errCh := make(chan error, SeriesFilePartitionN)
214+
215+
for id := range partitionIDs {
216+
wg.Add(1)
217+
p := f.partitions[id]
218+
go func() {
219+
defer wg.Done()
220+
p.mu.Lock()
221+
defer p.mu.Unlock()
222+
if segment := p.activeSegment(); segment != nil {
223+
if err := segment.Flush(); err != nil {
224+
errCh <- fmt.Errorf("unable to flush segment %s: %w", segment.file.Name(), err)
225+
}
226+
}
227+
}()
228+
}
229+
230+
wg.Wait()
231+
close(errCh)
232+
233+
var errs = make([]error, 0, SeriesFilePartitionN)
234+
for err := range errCh {
235+
errs = append(errs, err)
236+
}
237+
return errors.Join(errs...)
204238
}
205239

206240
// IsDeleted returns true if the ID has been deleted before.

tsdb/series_file_test.go

Lines changed: 81 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/influxdata/influxdb/logger"
1111
"github.com/influxdata/influxdb/models"
1212
"github.com/influxdata/influxdb/tsdb"
13+
"github.com/stretchr/testify/require"
1314
"golang.org/x/sync/errgroup"
1415
)
1516

@@ -191,38 +192,59 @@ func TestSeriesFileCompactor(t *testing.T) {
191192

192193
// Ensure series file deletions persist across compactions.
193194
func TestSeriesFile_DeleteSeriesID(t *testing.T) {
194-
sfile := MustOpenSeriesFile()
195-
defer sfile.Close()
196-
197-
ids0, err := sfile.CreateSeriesListIfNotExists([][]byte{[]byte("m1")}, []models.Tags{nil}, tsdb.NoopStatsTracker())
198-
if err != nil {
199-
t.Fatal(err)
200-
} else if _, err := sfile.CreateSeriesListIfNotExists([][]byte{[]byte("m2")}, []models.Tags{nil}, tsdb.NoopStatsTracker()); err != nil {
201-
t.Fatal(err)
202-
} else if err := sfile.ForceCompact(); err != nil {
203-
t.Fatal(err)
195+
deleteTestFn := func(flush bool) {
196+
sfile := MustOpenSeriesFile()
197+
defer func(sfile *SeriesFile) {
198+
err := sfile.Close()
199+
require.NoError(t, err, "close sfile")
200+
}(sfile)
201+
202+
ids0, err := sfile.CreateSeriesListIfNotExists([][]byte{[]byte("m1")}, []models.Tags{nil}, tsdb.NoopStatsTracker())
203+
require.NoError(t, err, "create series list")
204+
_, err = sfile.CreateSeriesListIfNotExists([][]byte{[]byte("m2")}, []models.Tags{nil}, tsdb.NoopStatsTracker())
205+
require.NoError(t, err, "create series list")
206+
err = sfile.ForceCompact()
207+
require.NoError(t, err, "force compact")
208+
209+
// Delete and ensure deletion.
210+
_, err = sfile.DeleteSeriesID(ids0[0], flush)
211+
require.NoError(t, err, "delete series list")
212+
_, err = sfile.CreateSeriesListIfNotExists([][]byte{[]byte("m1")}, []models.Tags{nil}, tsdb.NoopStatsTracker())
213+
require.NoError(t, err, "create series list")
214+
require.True(t, sfile.IsDeleted(ids0[0]), "expected deleted")
215+
216+
err = sfile.ForceCompact()
217+
require.NoError(t, err, "force compact")
218+
require.True(t, sfile.IsDeleted(ids0[0]), "expected deleted")
219+
220+
err = sfile.Reopen()
221+
require.NoError(t, err, "reopen")
222+
require.True(t, sfile.IsDeleted(ids0[0]), "expected deleted")
204223
}
205224

206-
// Delete and ensure deletion.
207-
if err := sfile.DeleteSeriesID(ids0[0]); err != nil {
208-
t.Fatal(err)
209-
} else if _, err := sfile.CreateSeriesListIfNotExists([][]byte{[]byte("m1")}, []models.Tags{nil}, tsdb.NoopStatsTracker()); err != nil {
210-
t.Fatal(err)
211-
} else if !sfile.IsDeleted(ids0[0]) {
212-
t.Fatal("expected deletion before compaction")
225+
tests := []struct {
226+
name string
227+
fn func()
228+
}{{
229+
name: "delete series with flush",
230+
fn: func() {
231+
deleteTestFn(tsdb.Flush)
232+
},
233+
},
234+
{
235+
name: "delete series with no flush",
236+
fn: func() {
237+
deleteTestFn(tsdb.NoFlush)
238+
},
239+
},
213240
}
214241

215-
if err := sfile.ForceCompact(); err != nil {
216-
t.Fatal(err)
217-
} else if !sfile.IsDeleted(ids0[0]) {
218-
t.Fatal("expected deletion after compaction")
242+
for _, tt := range tests {
243+
t.Run(tt.name, func(t *testing.T) {
244+
tt.fn()
245+
})
219246
}
220247

221-
if err := sfile.Reopen(); err != nil {
222-
t.Fatal(err)
223-
} else if !sfile.IsDeleted(ids0[0]) {
224-
t.Fatal("expected deletion after reopen")
225-
}
226248
}
227249

228250
func TestSeriesFile_Compaction(t *testing.T) {
@@ -246,7 +268,7 @@ func TestSeriesFile_Compaction(t *testing.T) {
246268
// Delete a subset of keys.
247269
for i, id := range ids {
248270
if i%10 == 0 {
249-
if err := sfile.DeleteSeriesID(id); err != nil {
271+
if _, err := sfile.DeleteSeriesID(id, tsdb.Flush); err != nil {
250272
t.Fatal(err)
251273
}
252274
}
@@ -325,7 +347,7 @@ func BenchmarkSeriesFile_Compaction(b *testing.B) {
325347

326348
// Delete a subset of keys.
327349
for i := 0; i < len(ids); i += 10 {
328-
if err := sfile.DeleteSeriesID(ids[i]); err != nil {
350+
if _, err := sfile.DeleteSeriesID(ids[i], tsdb.Flush); err != nil {
329351
b.Fatal(err)
330352
}
331353
}
@@ -353,6 +375,37 @@ func BenchmarkSeriesFile_Compaction(b *testing.B) {
353375
}
354376
}
355377

378+
func TestSeriesFile_FlushSegments(t *testing.T) {
379+
const SeriesN = 100
380+
sfile := MustOpenSeriesFile()
381+
defer func() {
382+
require.NoError(t, sfile.Close(), "series file close")
383+
}()
384+
385+
// Create some series to ensure there's data in the segments.
386+
var names = make([][]byte, 0, SeriesN)
387+
var tagsSlice = make([]models.Tags, 0, SeriesN)
388+
for i := 0; i < SeriesN; i++ {
389+
names = append(names, []byte(fmt.Sprintf("measurement%d", i)))
390+
tagsSlice = append(tagsSlice, models.NewTags(map[string]string{"tag": "value"}))
391+
}
392+
ids, err := sfile.CreateSeriesListIfNotExists(names, tagsSlice, tsdb.NoopStatsTracker())
393+
require.NoError(t, err)
394+
395+
// Collect all partition IDs that have series.
396+
partitionIDs := make(map[int]struct{}, tsdb.SeriesFilePartitionN)
397+
for _, id := range ids {
398+
partitionIDs[sfile.SeriesIDPartitionID(id)] = struct{}{}
399+
}
400+
401+
// Flush the segments.
402+
err = sfile.FlushSegments(partitionIDs)
403+
require.NoError(t, err)
404+
405+
// Verify series still exist after flush.
406+
require.Equal(t, uint64(SeriesN), sfile.SeriesCount())
407+
}
408+
356409
// Series represents name/tagset pairs that are used in testing.
357410
type Series struct {
358411
Name []byte

tsdb/series_partition.go

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ func (p *SeriesPartition) CreateSeriesListIfNotExists(keys [][]byte, keyPartitio
278278
// Flush active segment writes so we can access data in mmap.
279279
if segment := p.activeSegment(); segment != nil {
280280
if err := segment.Flush(); err != nil {
281-
return err
281+
return fmt.Errorf("unable to flush segment %s: %w", segment.file.Name(), err)
282282
}
283283
}
284284

@@ -329,8 +329,9 @@ func (p *SeriesPartition) Compacting() bool {
329329
}
330330

331331
// DeleteSeriesID flags a series as permanently deleted.
332-
// If the series is reintroduced later then it must create a new id.
333-
func (p *SeriesPartition) DeleteSeriesID(id uint64) error {
332+
// If the series is reintroduced later than it must create a new id.
333+
// Setting flush will indicate whether this method triggers a fsync.
334+
func (p *SeriesPartition) DeleteSeriesID(id uint64, flush bool) error {
334335
p.mu.Lock()
335336
defer p.mu.Unlock()
336337

@@ -350,9 +351,11 @@ func (p *SeriesPartition) DeleteSeriesID(id uint64) error {
350351
}
351352

352353
// Flush active segment write.
353-
if segment := p.activeSegment(); segment != nil {
354-
if err := segment.Flush(); err != nil {
355-
return err
354+
if flush {
355+
if segment := p.activeSegment(); segment != nil {
356+
if err := segment.Flush(); err != nil {
357+
return fmt.Errorf("unable to flush segment %s: %w", segment.file.Name(), err)
358+
}
356359
}
357360
}
358361

tsdb/store.go

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1073,15 +1073,45 @@ func (s *Store) DeleteShard(shardID uint64) error {
10731073
}
10741074
}
10751075

1076+
const DeleteLogTrigger = 10_000
1077+
seriesCount := ss.Cardinality()
1078+
deleteStart := time.Now()
1079+
var deletedCount atomic.Uint64
1080+
var partitionIDs = make(map[int]struct{}, SeriesFilePartitionN)
1081+
10761082
ss.ForEach(func(id uint64) {
1077-
if err := sfile.DeleteSeriesID(id); err != nil {
1083+
p, err := sfile.DeleteSeriesID(id, NoFlush)
1084+
if err != nil {
10781085
sfile.Logger.Error(
10791086
"cannot delete series in shard",
10801087
zap.Uint64("series_id", id),
10811088
zap.Uint64("shard_id", shardID),
1089+
zap.String("series_file_path", sfile.Path()),
10821090
zap.Error(err))
1091+
} else {
1092+
partitionIDs[p.id] = struct{}{}
1093+
deleted := deletedCount.Add(1)
1094+
1095+
if deleted%DeleteLogTrigger == 0 {
1096+
s.Logger.Info(fmt.Sprintf("DeleteShard: %d series deleted", DeleteLogTrigger),
1097+
zap.String("db", db),
1098+
zap.Uint64("shard_id", shardID),
1099+
zap.String("series_file_path", sfile.Path()),
1100+
zap.Uint64("deleted", deleted),
1101+
zap.Uint64("remaining", seriesCount-deleted),
1102+
zap.Uint64("total", seriesCount),
1103+
zap.Duration("elapsed", time.Since(deleteStart)))
1104+
}
10831105
}
10841106
})
1107+
1108+
if err := sfile.FlushSegments(partitionIDs); err != nil {
1109+
sfile.Logger.Error(
1110+
"error while flushing a series file segment",
1111+
zap.Uint64("shard_id", shardID),
1112+
zap.String("series_file_path", sfile.Path()),
1113+
zap.Error(err))
1114+
}
10851115
}
10861116
}
10871117

0 commit comments

Comments
 (0)