Skip to content

Commit 31b41a6

Browse files
feat: improve dropped point logging (#26257) (#26830)
Co-authored-by: davidby-influx <72418212+davidby-influx@users.noreply.github.com> closes #26252
1 parent 1141c66 commit 31b41a6

File tree

2 files changed

+158
-36
lines changed

2 files changed

+158
-36
lines changed

coordinator/points_writer.go

Lines changed: 110 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"errors"
55
"fmt"
66
"sort"
7+
"strings"
78
"sync"
89
"sync/atomic"
910
"time"
@@ -95,23 +96,109 @@ func NewPointsWriter() *PointsWriter {
9596
}
9697
}
9798

99+
type BoundType int
100+
101+
const (
102+
WithinBounds BoundType = iota
103+
RetentionPolicyBound
104+
WriteWindowUpperBound
105+
WriteWindowLowerBound
106+
MaxBoundType // always the largest bound type, not for actual use
107+
)
108+
109+
func (b BoundType) String() string {
110+
switch b {
111+
case RetentionPolicyBound:
112+
return "Retention Policy Lower Bound"
113+
case WriteWindowUpperBound:
114+
return "Write Window Upper Bound"
115+
case WriteWindowLowerBound:
116+
return "Write Window Lower Bound"
117+
case WithinBounds:
118+
return "Within Bounds"
119+
default:
120+
return "Unknown"
121+
}
122+
}
123+
124+
type DroppedPoint struct {
125+
Point models.Point
126+
ViolatedBound time.Time
127+
Reason BoundType
128+
}
129+
130+
func (d *DroppedPoint) String() string {
131+
return fmt.Sprintf("point %s at %s dropped because it violates a %s at %s", d.Point.Key(), d.Point.Time().UTC().Format(time.RFC3339Nano), d.Reason.String(), d.ViolatedBound.UTC().Format(time.RFC3339Nano))
132+
}
133+
98134
// ShardMapping contains a mapping of shards to points.
99135
type ShardMapping struct {
100-
n int
101-
Points map[uint64][]models.Point // The points associated with a shard ID
102-
Shards map[uint64]*meta.ShardInfo // The shards that have been mapped, keyed by shard ID
103-
Dropped []models.Point // Points that were dropped
136+
n int
137+
Points map[uint64][]models.Point // The points associated with a shard ID
138+
Shards map[uint64]*meta.ShardInfo // The shards that have been mapped, keyed by shard ID
139+
MaxDropped DroppedPoint
140+
MinDropped DroppedPoint
141+
RetentionDropped int
142+
WriteWindowDropped int
143+
rpi *meta.RetentionPolicyInfo
104144
}
105145

106146
// NewShardMapping creates an empty ShardMapping.
107-
func NewShardMapping(n int) *ShardMapping {
147+
func NewShardMapping(rpi *meta.RetentionPolicyInfo, n int) *ShardMapping {
108148
return &ShardMapping{
109149
n: n,
110150
Points: map[uint64][]models.Point{},
111151
Shards: map[uint64]*meta.ShardInfo{},
152+
rpi: rpi,
112153
}
113154
}
114155

156+
func (s *ShardMapping) AddDropped(p models.Point, t time.Time, b BoundType) {
157+
if s.MaxDropped.Point == nil || p.Time().After(s.MaxDropped.Point.Time()) {
158+
s.MaxDropped = DroppedPoint{Point: p, ViolatedBound: t, Reason: b}
159+
}
160+
if s.MinDropped.Point == nil || p.Time().Before(s.MinDropped.Point.Time()) {
161+
s.MinDropped = DroppedPoint{Point: p, ViolatedBound: t, Reason: b}
162+
}
163+
switch b {
164+
case RetentionPolicyBound:
165+
s.RetentionDropped++
166+
case WriteWindowLowerBound, WriteWindowUpperBound:
167+
s.WriteWindowDropped++
168+
}
169+
}
170+
171+
func (s *ShardMapping) Dropped() int {
172+
return s.RetentionDropped + s.WriteWindowDropped
173+
}
174+
175+
func (s *ShardMapping) SummariseDropped() string {
176+
if s.Dropped() <= 0 {
177+
return ""
178+
}
179+
summary := strings.Builder{}
180+
if s.rpi.PastWriteLimit > 0 || s.rpi.FutureWriteLimit > 0 {
181+
summary.WriteString(fmt.Sprintf(" and %d points outside write window (", s.WriteWindowDropped))
182+
if s.rpi.PastWriteLimit > 0 {
183+
summary.WriteString("-")
184+
summary.WriteString(s.rpi.PastWriteLimit.String())
185+
}
186+
if s.rpi.FutureWriteLimit > 0 {
187+
if s.rpi.PastWriteLimit > 0 {
188+
summary.WriteString(" to ")
189+
}
190+
summary.WriteString(s.rpi.FutureWriteLimit.String())
191+
}
192+
summary.WriteString(")")
193+
}
194+
return fmt.Sprintf("dropped %d points outside retention policy of duration %s%s - oldest %s, newest %s",
195+
s.RetentionDropped,
196+
s.rpi.Duration.String(),
197+
summary.String(),
198+
s.MinDropped.String(),
199+
s.MaxDropped.String())
200+
}
201+
115202
// MapPoint adds the point to the ShardMapping, associated with the given shardInfo.
116203
func (s *ShardMapping) MapPoint(shardInfo *meta.ShardInfo, p models.Point) {
117204
if cap(s.Points[shardInfo.ID]) < s.n {
@@ -147,14 +234,14 @@ func NewWriteWindow(rp *meta.RetentionPolicyInfo) *WriteWindow {
147234
return w
148235
}
149236

150-
func (w *WriteWindow) WithinWindow(t time.Time) bool {
237+
func (w *WriteWindow) WithinWindow(t time.Time) (bool, time.Time, BoundType) {
151238
if w.checkBefore && t.Before(w.before) {
152-
return false
239+
return false, w.before, WriteWindowLowerBound
153240
}
154241
if w.checkAfter && t.After(w.after) {
155-
return false
242+
return false, w.after, WriteWindowUpperBound
156243
}
157-
return true
244+
return true, time.Time{}, WithinBounds
158245
}
159246

160247
// Open opens the communication channel with the point writer.
@@ -229,7 +316,8 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)
229316
// Either the point is outside the scope of the RP, we already have
230317
// a suitable shard group for the point, or it is outside the write window
231318
// for the RP, and we don't want to unnecessarily create a shard for it
232-
if p.Time().Before(min) || list.Covers(p.Time()) || !ww.WithinWindow(p.Time()) {
319+
withinWindow, _, _ := ww.WithinWindow(p.Time())
320+
if p.Time().Before(min) || list.Covers(p.Time()) || !withinWindow {
233321
continue
234322
}
235323

@@ -246,13 +334,18 @@ func (w *PointsWriter) MapShards(wp *WritePointsRequest) (*ShardMapping, error)
246334
list.Add(*sg)
247335
}
248336

249-
mapping := NewShardMapping(len(wp.Points))
337+
mapping := NewShardMapping(rp, len(wp.Points))
250338
for _, p := range wp.Points {
251339
sg := list.ShardGroupAt(p.Time())
252-
if sg == nil || !ww.WithinWindow(p.Time()) {
340+
if sg == nil {
253341
// We didn't create a shard group because the point was outside the
254-
// scope of the RP, or the point is outside the write window for the RP.
255-
mapping.Dropped = append(mapping.Dropped, p)
342+
// scope of the RP
343+
mapping.AddDropped(p, min, RetentionPolicyBound)
344+
atomic.AddInt64(&w.stats.WriteDropped, 1)
345+
continue
346+
} else if withinWindow, bound, reason := ww.WithinWindow(p.Time()); !withinWindow {
347+
// The point is outside the write window for the RP.
348+
mapping.AddDropped(p, bound, reason)
256349
atomic.AddInt64(&w.stats.WriteDropped, 1)
257350
continue
258351
} else if len(sg.Shards) <= 0 {
@@ -420,9 +513,9 @@ func (w *PointsWriter) WritePointsPrivileged(writeCtx tsdb.WriteContext, databas
420513
w.Subscriber.Send(pts)
421514
atomic.AddInt64(&w.stats.SubWriteOK, 1)
422515

423-
if err == nil && len(shardMappings.Dropped) > 0 {
424-
err = tsdb.PartialWriteError{Reason: "points beyond retention policy or outside permissible write window",
425-
Dropped: len(shardMappings.Dropped),
516+
if err == nil && shardMappings.Dropped() > 0 {
517+
err = tsdb.PartialWriteError{Reason: shardMappings.SummariseDropped(),
518+
Dropped: shardMappings.Dropped(),
426519
Database: database,
427520
RetentionPolicy: retentionPolicy,
428521
}

coordinator/points_writer_test.go

Lines changed: 48 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package coordinator_test
22

33
import (
4+
"errors"
45
"fmt"
56
"reflect"
67
"sync"
@@ -89,9 +90,11 @@ func TestPointsWriter_MapShards_WriteLimits(t *testing.T) {
8990
pr.AddPoint("cpu", -2.0, time.Now().Add(-time.Minute*20), nil)
9091

9192
values := []float64{0.0, 1.0, -1.0}
92-
dropped := []float64{2.0, -2.0}
9393

94-
MapPoints(t, c, pr, values, dropped)
94+
MapPoints(t, c, pr, values, 2,
95+
&coordinator.DroppedPoint{Point: pr.Points[4], Reason: coordinator.WriteWindowLowerBound},
96+
&coordinator.DroppedPoint{Point: pr.Points[2], Reason: coordinator.WriteWindowUpperBound},
97+
"dropped 0 points outside retention policy of duration 3h0m0s and 2 points outside write window (-10m0s to 15m0s) -")
9598

9699
// Clear the write limits by setting them to zero
97100
// No points should be dropped
@@ -106,11 +109,28 @@ func TestPointsWriter_MapShards_WriteLimits(t *testing.T) {
106109
}
107110
require.NoError(t, meta.ApplyRetentionUpdate(rpu, rp), "ApplyRetentionUpdate failed")
108111
values = []float64{0.0, 1.0, 2.0, -1.0, -2.0}
109-
dropped = []float64{}
110-
MapPoints(t, c, pr, values, dropped)
112+
MapPoints(t, c, pr, values, 0, nil, nil, "dropped 0 points outside retention policy of duration 3h0m0s -")
113+
114+
rpu.SetFutureWriteLimit(futureWriteLimit)
115+
require.NoError(t, meta.ApplyRetentionUpdate(rpu, rp), "ApplyRetentionUpdate failed")
116+
values = []float64{0.0, 1.0, -1.0, -2.0}
117+
MapPoints(t, c, pr, values, 1,
118+
&coordinator.DroppedPoint{Point: pr.Points[2], Reason: coordinator.WriteWindowUpperBound},
119+
&coordinator.DroppedPoint{Point: pr.Points[2], Reason: coordinator.WriteWindowUpperBound},
120+
"dropped 0 points outside retention policy of duration 3h0m0s and 1 points outside write window (15m0s) -")
121+
122+
rpu.SetFutureWriteLimit(zeroDuration)
123+
rpu.SetPastWriteLimit(pastWriteLimit)
124+
require.NoError(t, meta.ApplyRetentionUpdate(rpu, rp), "ApplyRetentionUpdate failed")
125+
values = []float64{0.0, 1.0, 2.0, -1.0}
126+
MapPoints(t, c, pr, values, 1,
127+
&coordinator.DroppedPoint{Point: pr.Points[4], Reason: coordinator.WriteWindowLowerBound},
128+
&coordinator.DroppedPoint{Point: pr.Points[4], Reason: coordinator.WriteWindowLowerBound},
129+
"dropped 0 points outside retention policy of duration 3h0m0s and 1 points outside write window (-10m0s) -")
130+
111131
}
112132

113-
func MapPoints(t *testing.T, c *coordinator.PointsWriter, pr *coordinator.WritePointsRequest, values []float64, dropped []float64) {
133+
func MapPoints(t *testing.T, c *coordinator.PointsWriter, pr *coordinator.WritePointsRequest, values []float64, droppedCount int, minDropped *coordinator.DroppedPoint, maxDropped *coordinator.DroppedPoint, summary string) {
114134
var (
115135
shardMappings *coordinator.ShardMapping
116136
err error
@@ -141,7 +161,14 @@ func MapPoints(t *testing.T, c *coordinator.PointsWriter, pr *coordinator.WriteP
141161
}
142162
}
143163
verify(p, values)
144-
verify(shardMappings.Dropped, dropped)
164+
require.Equal(t, shardMappings.Dropped(), droppedCount, "wrong number of points dropped")
165+
if shardMappings.Dropped() > 0 {
166+
require.Equal(t, minDropped.Point, shardMappings.MinDropped.Point, "minimum dropped point mismatch")
167+
require.Equal(t, minDropped.Reason, shardMappings.MinDropped.Reason, "minimum dropped reason mismatch")
168+
require.Equal(t, maxDropped.Point, shardMappings.MaxDropped.Point, "maximum dropped point mismatch")
169+
require.Equal(t, maxDropped.Reason, shardMappings.MaxDropped.Reason, "maximum dropped reason mismatch")
170+
require.Contains(t, shardMappings.SummariseDropped(), summary, "summary mismatch")
171+
}
145172
}
146173

147174
// Ensures the points writer maps to a new shard group when the shard duration
@@ -332,9 +359,11 @@ func TestPointsWriter_MapShards_Invalid(t *testing.T) {
332359
t.Errorf("MapShards() len mismatch. got %v, exp %v", got, exp)
333360
}
334361

335-
if got, exp := len(shardMappings.Dropped), 1; got != exp {
362+
if got, exp := shardMappings.RetentionDropped, 1; got != exp {
336363
t.Fatalf("MapShard() dropped mismatch: got %v, exp %v", got, exp)
337364
}
365+
366+
require.Equal(t, coordinator.RetentionPolicyBound, shardMappings.MinDropped.Reason, "unexpected reason for dropped point")
338367
}
339368

340369
func TestPointsWriter_WritePoints(t *testing.T) {
@@ -384,7 +413,7 @@ func TestPointsWriter_WritePoints(t *testing.T) {
384413

385414
// copy to prevent data race
386415
theTest := test
387-
sm := coordinator.NewShardMapping(16)
416+
sm := coordinator.NewShardMapping(nil, 16)
388417
sm.MapPoint(&meta.ShardInfo{ID: uint64(1), Owners: []meta.ShardOwner{
389418
{NodeID: 1},
390419
{NodeID: 2},
@@ -467,16 +496,9 @@ func TestPointsWriter_WritePoints_Dropped(t *testing.T) {
467496
// are created.
468497
ms := NewPointsWriterMetaClient()
469498

470-
// Three points that range over the shardGroup duration (1h) and should map to two
471-
// distinct shards
499+
// Add a point earlier than the retention period
472500
pr.AddPoint("cpu", 1.0, time.Now().Add(-24*time.Hour), nil)
473501

474-
// copy to prevent data race
475-
sm := coordinator.NewShardMapping(16)
476-
477-
// ShardMapper dropped this point
478-
sm.Dropped = append(sm.Dropped, pr.Points[0])
479-
480502
// Local coordinator.Node ShardWriter
481503
// lock on the write increment since these functions get called in parallel
482504
var mu sync.Mutex
@@ -506,13 +528,20 @@ func TestPointsWriter_WritePoints_Dropped(t *testing.T) {
506528
c.Subscriber = sub
507529
c.Node = &influxdb.Node{ID: 1}
508530

509-
c.Open()
510-
defer c.Close()
531+
require.NoError(t, c.Open(), "failure opening PointsWriter")
532+
defer func(pw *coordinator.PointsWriter) {
533+
require.NoError(t, pw.Close(), "failure closing PointsWriter")
534+
}(c)
511535

512536
err := c.WritePointsPrivileged(tsdb.WriteContext{}, pr.Database, pr.RetentionPolicy, models.ConsistencyLevelOne, pr.Points)
513-
if _, ok := err.(tsdb.PartialWriteError); !ok {
537+
require.Error(t, err, "unexpected success writing points")
538+
var pwErr tsdb.PartialWriteError
539+
if !errors.As(err, &pwErr) {
514540
t.Errorf("PointsWriter.WritePoints(): got %v, exp %v", err, tsdb.PartialWriteError{})
515541
}
542+
require.Equal(t, 1, pwErr.Dropped, "wrong number of points dropped")
543+
require.ErrorContains(t, pwErr, "partial write: dropped 1 points outside retention policy of duration 1h0m0s")
544+
require.ErrorContains(t, pwErr, "Retention Policy Lower Bound")
516545
}
517546

518547
type fakePointsWriter struct {

0 commit comments

Comments
 (0)