Skip to content

Commit 6d41432

Browse files
authored
TBS: Log pubsub errors at error or warn level (#17135)
* Surface pubsub subscriber errors by logging at Warn level for 429 and Error level for others, instead of Debug level. Do not log context canceled. This may introduce noise to logs, e.g. when ES returns 429, but at worst it will only log one line every search interval (default = 1m/2 = 30s) which is acceptable. * Also change publisher logs from debug level to error level.
1 parent cc8201e commit 6d41432

3 files changed

Lines changed: 182 additions & 58 deletions

File tree

x-pack/apm-server/sampling/pubsub/checkpoints.go

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ func getGlobalCheckpoints(
2424
indexGlobalCheckpoints := make(map[string]int64)
2525
req, err := http.NewRequestWithContext(ctx, http.MethodGet, "/"+dataStream+"/_stats/get?level=shards", nil)
2626
if err != nil {
27-
return nil, fmt.Errorf("failed to created indices request: %w", err)
27+
return nil, fmt.Errorf("failed to create index stats request: %w", err)
2828
}
2929
resp, err := client.Perform(req)
3030
if err != nil {
@@ -36,14 +36,17 @@ func getGlobalCheckpoints(
3636
case http.StatusNotFound:
3737
// Data stream does not yet exist.
3838
return indexGlobalCheckpoints, nil
39+
case http.StatusTooManyRequests:
40+
message, _ := io.ReadAll(resp.Body)
41+
return nil, fmt.Errorf("index stats request failed with status code %w: %s", errTooManyRequests, message)
3942
}
4043
message, _ := io.ReadAll(resp.Body)
41-
return nil, fmt.Errorf("index stats request failed: %s", message)
44+
return nil, fmt.Errorf("index stats request failed with status code %d: %s", resp.StatusCode, message)
4245
}
4346

4447
var stats dataStreamStats
4548
if err := json.NewDecoder(resp.Body).Decode(&stats); err != nil {
46-
return nil, err
49+
return nil, fmt.Errorf("failed to parse index stats response: %w", err)
4750
}
4851

4952
for index, indexStats := range stats.Indices {

x-pack/apm-server/sampling/pubsub/pubsub.go

Lines changed: 34 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,10 @@ import (
3131
// ErrClosed may be returned by Pubsub methods after the Close method is called.
3232
var ErrClosed = errors.New("pubsub closed")
3333

34-
var errIndexNotFound = errors.New("index not found")
34+
var (
35+
errIndexNotFound = errors.New("index not found")
36+
errTooManyRequests = errors.New("429")
37+
)
3538

3639
// Pubsub provides a means of publishing and subscribing to sampled trace IDs,
3740
// using Elasticsearch for temporary storage.
@@ -109,15 +112,15 @@ func (p *Pubsub) indexSampledTraceIDs(ctx context.Context, traceIDs <-chan strin
109112
p.config.Logger.With(
110113
logp.Error(err),
111114
logp.Reflect("event", &doc),
112-
).Debug("failed to encode sampled trace document")
115+
).Error("failed to encode sampled trace ID document")
113116
return err
114117
}
115118
data := w.Bytes()
116119
if err := appender.Add(ctx, index, bytes.NewReader(data)); err != nil {
117120
p.config.Logger.With(
118121
logp.Error(err),
119122
logp.Reflect("event", &doc),
120-
).Debug("failed to index sampled trace document")
123+
).Error("failed to index sampled trace ID document")
121124
return err
122125
}
123126
}
@@ -153,9 +156,15 @@ func (p *Pubsub) SubscribeSampledTraceIDs(
153156
case <-ticker.C:
154157
changed, err := p.searchTraceIDs(ctx, traceIDs, pos.observedSeqnos)
155158
if err != nil {
156-
// Errors may occur due to rate limiting, or while the index is
157-
// still being created, so just log and continue.
158-
p.config.Logger.With(logp.Error(err)).With(logp.Reflect("position", pos)).Debug("error searching for trace IDs")
159+
logger := p.config.Logger.With(logp.Error(err)).With(logp.Reflect("position", pos))
160+
switch {
161+
case errors.Is(err, context.Canceled):
162+
// Ignore shutdown errors
163+
case errors.Is(err, errTooManyRequests):
164+
logger.Warn("error searching for trace IDs")
165+
default:
166+
logger.Error("error searching for trace IDs")
167+
}
159168
continue
160169
}
161170
if changed {
@@ -235,17 +244,22 @@ func (p *Pubsub) refreshIndices(ctx context.Context, indices []string) error {
235244
}
236245
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "/"+strings.Join(indices, ",")+"/_refresh?ignore_unavailable=true", nil)
237246
if err != nil {
238-
return err
247+
return fmt.Errorf("failed to create index refresh request: %w", err)
239248
}
240249

241250
resp, err := p.config.Client.Perform(req)
242251
if err != nil {
243-
return err
252+
return fmt.Errorf("index refresh request failed: %w", err)
244253
}
245254
defer resp.Body.Close()
246255
if resp.StatusCode > 299 {
256+
// No need to handle 404 because of ignore_unavailable=true
247257
message, _ := io.ReadAll(resp.Body)
248-
return fmt.Errorf("index refresh request failed: %s", message)
258+
switch resp.StatusCode {
259+
case http.StatusTooManyRequests:
260+
return fmt.Errorf("index refresh request failed with status code %w: %s", errTooManyRequests, message)
261+
}
262+
return fmt.Errorf("index refresh request failed with status code %d: %s", resp.StatusCode, message)
249263
}
250264
return nil
251265
}
@@ -334,23 +348,30 @@ func (p *Pubsub) searchIndexTraceIDs(ctx context.Context, out chan<- string, ind
334348
func (p *Pubsub) doSearchRequest(ctx context.Context, index string, body io.Reader, out interface{}) error {
335349
req, err := http.NewRequestWithContext(ctx, http.MethodPost, "/"+index+"/_search", body)
336350
if err != nil {
337-
return err
351+
return fmt.Errorf("failed to create search request: %w", err)
338352
}
339353
req.Header.Add("Content-Type", "application/json")
340354

341355
resp, err := p.config.Client.Perform(req)
342356
if err != nil {
343-
return err
357+
return fmt.Errorf("search request failed: %w", err)
344358
}
345359
defer resp.Body.Close()
346360
if resp.StatusCode > 299 {
347361
if resp.StatusCode == http.StatusNotFound {
348362
return errIndexNotFound
349363
}
350364
message, _ := io.ReadAll(resp.Body)
351-
return fmt.Errorf("search request failed: %s", message)
365+
switch resp.StatusCode {
366+
case http.StatusTooManyRequests:
367+
return fmt.Errorf("search request failed with status code %w: %s", errTooManyRequests, message)
368+
}
369+
return fmt.Errorf("search request failed with status code %d: %s", resp.StatusCode, message)
352370
}
353-
return json.NewDecoder(resp.Body).Decode(out)
371+
if err := json.NewDecoder(resp.Body).Decode(out); err != nil {
372+
return fmt.Errorf("failed to parse search response: %w", err)
373+
}
374+
return nil
354375
}
355376

356377
type traceIDDocument struct {

0 commit comments

Comments
 (0)