Skip to content

Commit 7addb4d

Browse files
author
kaiyan-sheng
authored
[Filebeat] Add check for context.DeadlineExceeded error (#21732)
1 parent f754515 commit 7addb4d

1 file changed

Lines changed: 15 additions & 19 deletions

File tree

x-pack/filebeat/input/s3/collector.go

Lines changed: 15 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,7 @@ func (c *s3Collector) processMessage(svcS3 s3iface.ClientAPI, message sqs.Messag
148148
// read from s3 object and create event for each log line
149149
err = c.handleS3Objects(svcS3, s3Infos, errC)
150150
if err != nil {
151-
err = fmt.Errorf("handleS3Objects failed: %w", err)
152-
c.logger.Error(err)
151+
c.logger.Error(fmt.Errorf("handleS3Objects failed: %w", err))
153152
return err
154153
}
155154
c.logger.Debugf("handleS3Objects succeed")
@@ -163,7 +162,12 @@ func (c *s3Collector) processorKeepAlive(svcSQS sqsiface.ClientAPI, message sqs.
163162
return nil
164163
case err := <-errC:
165164
if err != nil {
166-
c.logger.Warn("Processing message failed, updating visibility timeout")
165+
if err == context.DeadlineExceeded {
166+
c.logger.Info("Context deadline exceeded, updating visibility timeout")
167+
} else {
168+
c.logger.Warnf("Processing message failed '%w', updating visibility timeout", err)
169+
}
170+
167171
err := c.changeVisibilityTimeout(queueURL, visibilityTimeout, svcSQS, message.ReceiptHandle)
168172
if err != nil {
169173
c.logger.Error(fmt.Errorf("SQS ChangeMessageVisibilityRequest failed: %w", err))
@@ -298,8 +302,7 @@ func (c *s3Collector) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, e
298302
c.logger.Debugf("Processing file from s3 bucket \"%s\" with name \"%s\"", info.name, info.key)
299303
err := c.createEventsFromS3Info(svc, info, s3Ctx)
300304
if err != nil {
301-
err = fmt.Errorf("createEventsFromS3Info failed processing file from s3 bucket \"%s\" with name \"%s\": %w", info.name, info.key, err)
302-
c.logger.Error(err)
305+
c.logger.Error(fmt.Errorf("createEventsFromS3Info failed processing file from s3 bucket \"%s\" with name \"%s\": %w", info.name, info.key, err))
303306
s3Ctx.setError(err)
304307
}
305308
}
@@ -326,8 +329,7 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info,
326329
// If the SDK can determine the request or retry delay was canceled
327330
// by a context the ErrCodeRequestCanceled error will be returned.
328331
if awsErr.Code() == awssdk.ErrCodeRequestCanceled {
329-
err = fmt.Errorf("s3 GetObjectRequest canceled for '%s' from S3 bucket '%s': %w", info.key, info.name, err)
330-
c.logger.Error(err)
332+
c.logger.Error(fmt.Errorf("s3 GetObjectRequest canceled for '%s' from S3 bucket '%s': %w", info.key, info.name, err))
331333
return err
332334
}
333335

@@ -345,16 +347,14 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info,
345347

346348
isS3ObjGzipped, err := isStreamGzipped(reader)
347349
if err != nil {
348-
err = fmt.Errorf("could not determine if S3 object is gzipped: %w", err)
349-
c.logger.Error(err)
350+
c.logger.Error(fmt.Errorf("could not determine if S3 object is gzipped: %w", err))
350351
return err
351352
}
352353

353354
if isS3ObjGzipped {
354355
gzipReader, err := gzip.NewReader(reader)
355356
if err != nil {
356-
err = fmt.Errorf("gzip.NewReader failed for '%s' from S3 bucket '%s': %w", info.key, info.name, err)
357-
c.logger.Error(err)
357+
c.logger.Error(fmt.Errorf("gzip.NewReader failed for '%s' from S3 bucket '%s': %w", info.key, info.name, err))
358358
return err
359359
}
360360
reader = bufio.NewReader(gzipReader)
@@ -366,8 +366,7 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info,
366366
decoder := json.NewDecoder(reader)
367367
err := c.decodeJSON(decoder, objectHash, info, s3Ctx)
368368
if err != nil {
369-
err = fmt.Errorf("decodeJSONWithKey failed for '%s' from S3 bucket '%s': %w", info.key, info.name, err)
370-
c.logger.Error(err)
369+
c.logger.Error(fmt.Errorf("decodeJSONWithKey failed for '%s' from S3 bucket '%s': %w", info.key, info.name, err))
371370
return err
372371
}
373372
return nil
@@ -383,14 +382,12 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info,
383382
event := createEvent(log, offset, info, objectHash, s3Ctx)
384383
err = c.forwardEvent(event)
385384
if err != nil {
386-
err = fmt.Errorf("forwardEvent failed: %w", err)
387-
c.logger.Error(err)
385+
c.logger.Error(fmt.Errorf("forwardEvent failed: %w", err))
388386
return err
389387
}
390388
return nil
391389
} else if err != nil {
392-
err = fmt.Errorf("readStringAndTrimDelimiter failed: %w", err)
393-
c.logger.Error(err)
390+
c.logger.Error(fmt.Errorf("readStringAndTrimDelimiter failed: %w", err))
394391
return err
395392
}
396393

@@ -403,8 +400,7 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info,
403400
event := createEvent(log, offset, info, objectHash, s3Ctx)
404401
err = c.forwardEvent(event)
405402
if err != nil {
406-
err = fmt.Errorf("forwardEvent failed: %w", err)
407-
c.logger.Error(err)
403+
c.logger.Error(fmt.Errorf("forwardEvent failed: %w", err))
408404
return err
409405
}
410406
}

0 commit comments

Comments
 (0)