@@ -31,7 +31,10 @@ import (
3131// ErrClosed may be returned by Pubsub methods after the Close method is called.
3232var ErrClosed = errors .New ("pubsub closed" )
3333
34- var errIndexNotFound = errors .New ("index not found" )
34+ var (
35+ errIndexNotFound = errors .New ("index not found" )
36+ errTooManyRequests = errors .New ("429" )
37+ )
3538
3639// Pubsub provides a means of publishing and subscribing to sampled trace IDs,
3740// using Elasticsearch for temporary storage.
@@ -109,15 +112,15 @@ func (p *Pubsub) indexSampledTraceIDs(ctx context.Context, traceIDs <-chan strin
109112 p .config .Logger .With (
110113 logp .Error (err ),
111114 logp .Reflect ("event" , & doc ),
112- ).Debug ("failed to encode sampled trace document" )
115+ ).Error ("failed to encode sampled trace ID document" )
113116 return err
114117 }
115118 data := w .Bytes ()
116119 if err := appender .Add (ctx , index , bytes .NewReader (data )); err != nil {
117120 p .config .Logger .With (
118121 logp .Error (err ),
119122 logp .Reflect ("event" , & doc ),
120- ).Debug ("failed to index sampled trace document" )
123+ ).Error ("failed to index sampled trace ID document" )
121124 return err
122125 }
123126 }
@@ -153,9 +156,15 @@ func (p *Pubsub) SubscribeSampledTraceIDs(
153156 case <- ticker .C :
154157 changed , err := p .searchTraceIDs (ctx , traceIDs , pos .observedSeqnos )
155158 if err != nil {
156- // Errors may occur due to rate limiting, or while the index is
157- // still being created, so just log and continue.
158- p .config .Logger .With (logp .Error (err )).With (logp .Reflect ("position" , pos )).Debug ("error searching for trace IDs" )
159+ logger := p .config .Logger .With (logp .Error (err )).With (logp .Reflect ("position" , pos ))
160+ switch {
161+ case errors .Is (err , context .Canceled ):
162+ // Ignore shutdown errors
163+ case errors .Is (err , errTooManyRequests ):
164+ logger .Warn ("error searching for trace IDs" )
165+ default :
166+ logger .Error ("error searching for trace IDs" )
167+ }
159168 continue
160169 }
161170 if changed {
@@ -235,17 +244,22 @@ func (p *Pubsub) refreshIndices(ctx context.Context, indices []string) error {
235244 }
236245 req , err := http .NewRequestWithContext (ctx , http .MethodPost , "/" + strings .Join (indices , "," )+ "/_refresh?ignore_unavailable=true" , nil )
237246 if err != nil {
238- return err
247+ return fmt . Errorf ( "failed to create index refresh request: %w" , err )
239248 }
240249
241250 resp , err := p .config .Client .Perform (req )
242251 if err != nil {
243- return err
252+ return fmt . Errorf ( "index refresh request failed: %w" , err )
244253 }
245254 defer resp .Body .Close ()
246255 if resp .StatusCode > 299 {
256+ // No need to handle 404 because of ignore_unavailable=true
247257 message , _ := io .ReadAll (resp .Body )
248- return fmt .Errorf ("index refresh request failed: %s" , message )
258+ switch resp .StatusCode {
259+ case http .StatusTooManyRequests :
260+ return fmt .Errorf ("index refresh request failed with status code %w: %s" , errTooManyRequests , message )
261+ }
262+ return fmt .Errorf ("index refresh request failed with status code %d: %s" , resp .StatusCode , message )
249263 }
250264 return nil
251265}
@@ -334,23 +348,30 @@ func (p *Pubsub) searchIndexTraceIDs(ctx context.Context, out chan<- string, ind
334348func (p * Pubsub ) doSearchRequest (ctx context.Context , index string , body io.Reader , out interface {}) error {
335349 req , err := http .NewRequestWithContext (ctx , http .MethodPost , "/" + index + "/_search" , body )
336350 if err != nil {
337- return err
351+ return fmt . Errorf ( "failed to create search request: %w" , err )
338352 }
339353 req .Header .Add ("Content-Type" , "application/json" )
340354
341355 resp , err := p .config .Client .Perform (req )
342356 if err != nil {
343- return err
357+ return fmt . Errorf ( "search request failed: %w" , err )
344358 }
345359 defer resp .Body .Close ()
346360 if resp .StatusCode > 299 {
347361 if resp .StatusCode == http .StatusNotFound {
348362 return errIndexNotFound
349363 }
350364 message , _ := io .ReadAll (resp .Body )
351- return fmt .Errorf ("search request failed: %s" , message )
365+ switch resp .StatusCode {
366+ case http .StatusTooManyRequests :
367+ return fmt .Errorf ("search request failed with status code %w: %s" , errTooManyRequests , message )
368+ }
369+ return fmt .Errorf ("search request failed with status code %d: %s" , resp .StatusCode , message )
352370 }
353- return json .NewDecoder (resp .Body ).Decode (out )
371+ if err := json .NewDecoder (resp .Body ).Decode (out ); err != nil {
372+ return fmt .Errorf ("failed to parse search response: %w" , err )
373+ }
374+ return nil
354375}
355376
356377type traceIDDocument struct {
0 commit comments