Skip to content

Commit 1356a21

Browse files
authored
feat: Bubble error up to writer if fields are dropped (#26998) (#27015)
1 parent 32d8b00 commit 1356a21

File tree

4 files changed

+99
-15
lines changed

4 files changed

+99
-15
lines changed

tsdb/engine/tsm1/engine.go

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,6 @@ var (
6464
// Ensure Engine implements the interface.
6565
_ tsdb.Engine = &Engine{}
6666
// Static objects to prevent small allocs.
67-
timeBytes = []byte("time")
6867
keyFieldSeparatorBytes = []byte(keyFieldSeparator)
6968
emptyBytes = []byte{}
7069
)
@@ -1360,11 +1359,6 @@ func (e *Engine) WritePoints(points []models.Point, tracker tsdb.StatsTracker) e
13601359
npoints++
13611360
var nValuesForPoint int64
13621361
for iter.Next() {
1363-
// Skip fields name "time", they are illegal
1364-
if bytes.Equal(iter.FieldKey(), timeBytes) {
1365-
continue
1366-
}
1367-
13681362
keyBuf = append(keyBuf[:baseLen], iter.FieldKey()...)
13691363

13701364
if e.seriesTypeMap != nil {

tsdb/field_validator.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ func ValidateAndCreateFields(mf *MeasurementFields, point models.Point, skipSize
1818
pointSize := point.StringSize()
1919
iter := point.FieldIterator()
2020
var fieldsToCreate []*FieldCreate
21+
var partialWriteError *PartialWriteError
2122

2223
// We return fieldsToCreate even on error, because other writes
2324
// in parallel may depend on these previous fields having been
@@ -41,7 +42,16 @@ func ValidateAndCreateFields(mf *MeasurementFields, point models.Point, skipSize
4142

4243
fieldKey := iter.FieldKey()
4344
// Skip fields name "time", they are illegal.
44-
if bytes.Equal(fieldKey, timeBytes) {
45+
if bytes.Equal(fieldKey, TimeBytes) {
46+
if partialWriteError == nil {
47+
partialWriteError = &PartialWriteError{
48+
Reason: fmt.Sprintf(
49+
"invalid field name: input field \"%[1]s\" on measurement \"%s\" is invalid. Field \"%[1]s\" has been stripped from point.",
50+
fieldKey, point.Name()),
51+
Dropped: 0,
52+
}
53+
}
54+
4555
continue
4656
}
4757

@@ -70,7 +80,7 @@ func ValidateAndCreateFields(mf *MeasurementFields, point models.Point, skipSize
7080
fieldsToCreate = append(fieldsToCreate, &FieldCreate{point.Name(), f})
7181
}
7282
}
73-
return fieldsToCreate, nil
83+
return fieldsToCreate, partialWriteError
7484
}
7585

7686
// dataTypeFromModelsFieldType returns the influxql.DataType that corresponds to the

tsdb/shard.go

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ var (
8888

8989
var (
9090
// Static objects to prevent small allocs.
91-
timeBytes = []byte("time")
91+
TimeBytes = []byte("time")
9292
)
9393

9494
// A ShardError implements the error interface, and contains extra
@@ -631,7 +631,7 @@ func (s *Shard) validateSeriesAndFields(points []models.Point, tracker StatsTrac
631631
tags := p.Tags()
632632

633633
// Drop any series w/ a "time" tag, these are illegal
634-
if v := tags.Get(timeBytes); v != nil {
634+
if v := tags.Get(TimeBytes); v != nil {
635635
dropped++
636636
if reason == "" {
637637
reason = fmt.Sprintf(
@@ -689,7 +689,7 @@ func (s *Shard) validateSeriesAndFields(points []models.Point, tracker StatsTrac
689689
iter := p.FieldIterator()
690690
validField := false
691691
for iter.Next() {
692-
if bytes.Equal(iter.FieldKey(), timeBytes) {
692+
if bytes.Equal(iter.FieldKey(), TimeBytes) {
693693
continue
694694
}
695695
validField = true
@@ -716,13 +716,19 @@ func (s *Shard) validateSeriesAndFields(points []models.Point, tracker StatsTrac
716716
newFields, partialWriteError := ValidateAndCreateFields(mf, p, s.options.Config.SkipFieldSizeValidation)
717717
createdFieldsToSave = append(createdFieldsToSave, newFields...)
718718

719-
if partialWriteError != nil {
719+
if partialWriteError != nil && partialWriteError.Dropped > 0 {
720720
if reason == "" {
721721
reason = partialWriteError.Reason
722722
}
723723
dropped += partialWriteError.Dropped
724724
atomic.AddInt64(&s.stats.WritePointsDropped, int64(partialWriteError.Dropped))
725725
continue
726+
// Sometimes we will drop fields like 'time' but not an entire point
727+
// we want to inform the writer that something occurred.
728+
} else if partialWriteError != nil {
729+
partialWriteError.Database = s.Database()
730+
partialWriteError.RetentionPolicy = s.RetentionPolicy()
731+
err = *partialWriteError
726732
}
727733
points[j] = points[i]
728734
j++

tsdb/shard_test.go

Lines changed: 77 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -337,9 +337,8 @@ func TestWriteTimeTag(t *testing.T) {
337337
time.Unix(1, 2),
338338
)
339339

340-
if err := sh.WritePoints([]models.Point{pt}, tsdb.NoopStatsTracker()); err != nil {
341-
t.Fatalf("unexpected error: %v", err)
342-
}
340+
err := sh.WritePoints([]models.Point{pt}, tsdb.NoopStatsTracker())
341+
require.Error(t, err, "expected partial write error")
343342

344343
mf := sh.MeasurementFields([]byte("cpu"))
345344
if mf == nil {
@@ -435,6 +434,81 @@ func TestShardWriteAddNewField(t *testing.T) {
435434
}
436435
}
437436

437+
func TestShardWriteDropField(t *testing.T) {
438+
tmpDir, _ := os.MkdirTemp("", "shard_test")
439+
defer func(path string) {
440+
err := os.RemoveAll(path)
441+
require.NoError(t, err, "error removing temp dir")
442+
}(tmpDir)
443+
444+
tmpShard := filepath.Join(tmpDir, "shard")
445+
tmpWal := filepath.Join(tmpDir, "wal")
446+
447+
sfile := MustOpenSeriesFile()
448+
defer func(sfile *SeriesFile) {
449+
err := sfile.Close()
450+
require.NoError(t, err, "error closing series file")
451+
}(sfile)
452+
453+
opts := tsdb.NewEngineOptions()
454+
opts.Config.WALDir = filepath.Join(tmpDir, "wal")
455+
opts.InmemIndex = inmem.NewIndex(filepath.Base(tmpDir), sfile.SeriesFile)
456+
457+
sh := tsdb.NewShard(1, tmpShard, tmpWal, sfile.SeriesFile, opts)
458+
err := sh.Open()
459+
require.NoError(t, err, "error opening shard")
460+
461+
defer func(sh *tsdb.Shard) {
462+
err := sh.Close()
463+
require.NoError(t, err, "error closing shard")
464+
}(sh)
465+
466+
pt := models.MustNewPoint(
467+
"cpu",
468+
models.NewTags(map[string]string{"host": "server"}),
469+
map[string]interface{}{"value": 1.0},
470+
time.Unix(1, 2),
471+
)
472+
473+
err = sh.WritePoints([]models.Point{pt}, tsdb.NoopStatsTracker())
474+
require.NoError(t, err, "error writing point")
475+
476+
pt = models.MustNewPoint(
477+
"cpu",
478+
models.NewTags(map[string]string{"host": "server"}),
479+
map[string]interface{}{"value": 1.0, "value2": 2.0, "time": time.Now().Unix()},
480+
time.Unix(1, 2),
481+
)
482+
483+
err = sh.WritePoints([]models.Point{pt}, tsdb.NoopStatsTracker())
484+
require.Error(t, err, "writing point should error with partial write")
485+
require.ErrorContains(t, err, "partial write: invalid field name: input field \"time\" on measurement \"cpu\" is invalid. Field \"time\" has been stripped from point. dropped=0 for database:")
486+
487+
// Point should not be written and fully dropped due to having a single "time" field
488+
pt = models.MustNewPoint(
489+
"cpu",
490+
models.NewTags(map[string]string{"host": "server"}),
491+
map[string]interface{}{"time": time.Now().Unix()},
492+
time.Unix(1, 2),
493+
)
494+
495+
err = sh.WritePoints([]models.Point{pt}, tsdb.NoopStatsTracker())
496+
require.Error(t, err, "writing point should error with partial write")
497+
require.ErrorContains(t, err, "partial write: invalid field name: input field \"time\" on measurement \"cpu\" is invalid dropped=1 for database:")
498+
499+
require.Equal(t, int64(1), sh.SeriesN(), "wrong number of series")
500+
stats := sh.Statistics(nil)
501+
require.GreaterOrEqual(t, len(stats), 1, "wrong number of stats")
502+
values := stats[0].Values
503+
pointsOK := values["writePointsOk"].(int64)
504+
505+
require.Equal(t, int64(2), pointsOK, "should have written 2 points successfully")
506+
507+
mf := sh.MeasurementFields([]byte("cpu"))
508+
require.NotNil(t, mf, "measurement fields should not be nil")
509+
require.Equal(t, 2, mf.FieldN(), "measurement fields should have 2 values")
510+
}
511+
438512
// Tests concurrently writing to the same shard with different field types which
439513
// can trigger a panic when the shard is snapshotted to TSM files.
440514
func TestShard_WritePoints_FieldConflictConcurrent(t *testing.T) {

0 commit comments

Comments
 (0)