@@ -148,8 +148,7 @@ func (c *s3Collector) processMessage(svcS3 s3iface.ClientAPI, message sqs.Messag
148148 // read from s3 object and create event for each log line
149149 err = c .handleS3Objects (svcS3 , s3Infos , errC )
150150 if err != nil {
151- err = fmt .Errorf ("handleS3Objects failed: %w" , err )
152- c .logger .Error (err )
151+ c .logger .Error (fmt .Errorf ("handleS3Objects failed: %w" , err ))
153152 return err
154153 }
155154 c .logger .Debugf ("handleS3Objects succeed" )
@@ -163,7 +162,12 @@ func (c *s3Collector) processorKeepAlive(svcSQS sqsiface.ClientAPI, message sqs.
163162 return nil
164163 case err := <- errC :
165164 if err != nil {
166- c .logger .Warn ("Processing message failed, updating visibility timeout" )
165+ if err == context .DeadlineExceeded {
166+ c .logger .Info ("Context deadline exceeded, updating visibility timeout" )
167+ } else {
168+ c .logger .Warnf ("Processing message failed '%w', updating visibility timeout" , err )
169+ }
170+
167171 err := c .changeVisibilityTimeout (queueURL , visibilityTimeout , svcSQS , message .ReceiptHandle )
168172 if err != nil {
169173 c .logger .Error (fmt .Errorf ("SQS ChangeMessageVisibilityRequest failed: %w" , err ))
@@ -298,8 +302,7 @@ func (c *s3Collector) handleS3Objects(svc s3iface.ClientAPI, s3Infos []s3Info, e
298302 c .logger .Debugf ("Processing file from s3 bucket \" %s\" with name \" %s\" " , info .name , info .key )
299303 err := c .createEventsFromS3Info (svc , info , s3Ctx )
300304 if err != nil {
301- err = fmt .Errorf ("createEventsFromS3Info failed processing file from s3 bucket \" %s\" with name \" %s\" : %w" , info .name , info .key , err )
302- c .logger .Error (err )
305+ c .logger .Error (fmt .Errorf ("createEventsFromS3Info failed processing file from s3 bucket \" %s\" with name \" %s\" : %w" , info .name , info .key , err ))
303306 s3Ctx .setError (err )
304307 }
305308 }
@@ -326,8 +329,7 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info,
326329 // If the SDK can determine the request or retry delay was canceled
327330 // by a context the ErrCodeRequestCanceled error will be returned.
328331 if awsErr .Code () == awssdk .ErrCodeRequestCanceled {
329- err = fmt .Errorf ("s3 GetObjectRequest canceled for '%s' from S3 bucket '%s': %w" , info .key , info .name , err )
330- c .logger .Error (err )
332+ c .logger .Error (fmt .Errorf ("s3 GetObjectRequest canceled for '%s' from S3 bucket '%s': %w" , info .key , info .name , err ))
331333 return err
332334 }
333335
@@ -345,16 +347,14 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info,
345347
346348 isS3ObjGzipped , err := isStreamGzipped (reader )
347349 if err != nil {
348- err = fmt .Errorf ("could not determine if S3 object is gzipped: %w" , err )
349- c .logger .Error (err )
350+ c .logger .Error (fmt .Errorf ("could not determine if S3 object is gzipped: %w" , err ))
350351 return err
351352 }
352353
353354 if isS3ObjGzipped {
354355 gzipReader , err := gzip .NewReader (reader )
355356 if err != nil {
356- err = fmt .Errorf ("gzip.NewReader failed for '%s' from S3 bucket '%s': %w" , info .key , info .name , err )
357- c .logger .Error (err )
357+ c .logger .Error (fmt .Errorf ("gzip.NewReader failed for '%s' from S3 bucket '%s': %w" , info .key , info .name , err ))
358358 return err
359359 }
360360 reader = bufio .NewReader (gzipReader )
@@ -366,8 +366,7 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info,
366366 decoder := json .NewDecoder (reader )
367367 err := c .decodeJSON (decoder , objectHash , info , s3Ctx )
368368 if err != nil {
369- err = fmt .Errorf ("decodeJSONWithKey failed for '%s' from S3 bucket '%s': %w" , info .key , info .name , err )
370- c .logger .Error (err )
369+ c .logger .Error (fmt .Errorf ("decodeJSONWithKey failed for '%s' from S3 bucket '%s': %w" , info .key , info .name , err ))
371370 return err
372371 }
373372 return nil
@@ -383,14 +382,12 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info,
383382 event := createEvent (log , offset , info , objectHash , s3Ctx )
384383 err = c .forwardEvent (event )
385384 if err != nil {
386- err = fmt .Errorf ("forwardEvent failed: %w" , err )
387- c .logger .Error (err )
385+ c .logger .Error (fmt .Errorf ("forwardEvent failed: %w" , err ))
388386 return err
389387 }
390388 return nil
391389 } else if err != nil {
392- err = fmt .Errorf ("readStringAndTrimDelimiter failed: %w" , err )
393- c .logger .Error (err )
390+ c .logger .Error (fmt .Errorf ("readStringAndTrimDelimiter failed: %w" , err ))
394391 return err
395392 }
396393
@@ -403,8 +400,7 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info,
403400 event := createEvent (log , offset , info , objectHash , s3Ctx )
404401 err = c .forwardEvent (event )
405402 if err != nil {
406- err = fmt .Errorf ("forwardEvent failed: %w" , err )
407- c .logger .Error (err )
403+ c .logger .Error (fmt .Errorf ("forwardEvent failed: %w" , err ))
408404 return err
409405 }
410406 }
0 commit comments