Skip to content

Commit cc2217c

Browse files
author
kaiyan-sheng
authored
Check context.Canceled and fix s3 input config (#22036)
1 parent 5d07709 commit cc2217c

4 files changed

Lines changed: 13 additions & 5 deletions

File tree

x-pack/filebeat/_meta/config/filebeat.inputs.reference.xpack.yml.tmpl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -67,8 +67,8 @@
6767
#session_token: '${AWS_SESSION_TOKEN:"”}'
6868
#credential_profile_name: test-s3-input
6969
70-
# Queue urls (required) to receive queue messages from
71-
#queue_urls: ["https://sqs.us-east-1.amazonaws.com/1234/test-s3-logs-queue"]
70+
# Queue url (required) to receive queue messages from
71+
#queue_url: "https://sqs.us-east-1.amazonaws.com/1234/test-s3-logs-queue"
7272
7373
# The duration (in seconds) that the received messages are hidden from subsequent
7474
# retrieve requests after being retrieved by a ReceiveMessage request.

x-pack/filebeat/filebeat.reference.yml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2410,8 +2410,8 @@ filebeat.inputs:
24102410
#session_token: '${AWS_SESSION_TOKEN:"”}'
24112411
#credential_profile_name: test-s3-input
24122412

2413-
# Queue urls (required) to receive queue messages from
2414-
#queue_urls: ["https://sqs.us-east-1.amazonaws.com/1234/test-s3-logs-queue"]
2413+
# Queue url (required) to receive queue messages from
2414+
#queue_url: "https://sqs.us-east-1.amazonaws.com/1234/test-s3-logs-queue"
24152415

24162416
# The duration (in seconds) that the received messages are hidden from subsequent
24172417
# retrieve requests after being retrieved by a ReceiveMessage request.

x-pack/filebeat/input/s3/collector.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,10 @@ func (c *s3Collector) processorKeepAlive(svcSQS sqsiface.ClientAPI, message sqs.
153153
for {
154154
select {
155155
case <-c.cancellation.Done():
156+
fmt.Println("------- c.cancellation.Done()")
156157
return nil
157158
case err := <-errC:
159+
fmt.Println("------- err = ", err)
158160
if err != nil {
159161
if err == context.DeadlineExceeded {
160162
c.logger.Info("Context deadline exceeded, updating visibility timeout")

x-pack/filebeat/input/s3/input.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package s3
66

77
import (
8+
"context"
89
"fmt"
910

1011
"github.com/aws/aws-sdk-go-v2/service/s3"
@@ -67,7 +68,12 @@ func (in *s3Input) Run(ctx v2.Context, pipeline beat.Pipeline) error {
6768

6869
defer collector.publisher.Close()
6970
collector.run()
70-
return ctx.Cancelation.Err()
71+
72+
if ctx.Cancelation.Err() == context.Canceled {
73+
return nil
74+
} else {
75+
return ctx.Cancelation.Err()
76+
}
7177
}
7278

7379
func (in *s3Input) createCollector(ctx v2.Context, pipeline beat.Pipeline) (*s3Collector, error) {

0 commit comments

Comments
 (0)