Skip to content

Commit 764045e

Browse files
author
Andrea Spacca
authored
[Filebeat] - S3 Input - Add support for only iterating/accessing only… (#28252)
* [Filebeat] - S3 Input - Add support for only iterating/accessing only specific folders or datapaths
1 parent 4147b5a commit 764045e

28 files changed

Lines changed: 195 additions & 30 deletions

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -315,6 +315,7 @@ for a few releases. Please use other tools provided by Elastic to fetch data fro
315315
- Update indentation for azure filebeat configuration. {pull}26604[26604]
316316
- Update Sophos xg module pipeline to deal with missing `date` and `time` fields. {pull}27834[27834]
317317
- sophos/xg fileset: Add missing pipeline for System Health logs. {pull}27827[27827] {issue}27826[27826]
318+
- Add support for passing a prefix on S3 bucket list mode for AWS-S3 input {pull}28252[28252] {issue}27965[27965]
318319
- Resolve issue with @timestamp for defender_atp. {pull}28272[28272]
319320
- Tolerate faults when Windows Event Log session is interrupted {issue}27947[27947] {pull}28191[28191]
320321
- Add support for username in cisco asa security negotiation logs {pull}26975[26975]

filebeat/docs/modules/aws.asciidoc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ Example config:
5050
enabled: false
5151
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
5252
#var.bucket_arn: 'arn:aws:s3:::mybucket'
53+
#var.bucket_list_prefix: 'prefix'
5354
#var.bucket_list_interval: 300s
5455
#var.number_of_workers: 5
5556
#var.shared_credential_file: /etc/filebeat/aws_credentials
@@ -67,6 +68,7 @@ Example config:
6768
enabled: false
6869
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
6970
#var.bucket_arn: 'arn:aws:s3:::mybucket'
71+
#var.bucket_list_prefix: 'prefix'
7072
#var.bucket_list_interval: 300s
7173
#var.number_of_workers: 5
7274
#var.shared_credential_file: /etc/filebeat/aws_credentials
@@ -84,6 +86,7 @@ Example config:
8486
enabled: false
8587
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
8688
#var.bucket_arn: 'arn:aws:s3:::mybucket'
89+
#var.bucket_list_prefix: 'prefix'
8790
#var.bucket_list_interval: 300s
8891
#var.number_of_workers: 5
8992
#var.shared_credential_file: /etc/filebeat/aws_credentials
@@ -101,6 +104,7 @@ Example config:
101104
enabled: false
102105
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
103106
#var.bucket_arn: 'arn:aws:s3:::mybucket'
107+
#var.bucket_list_prefix: 'prefix'
104108
#var.bucket_list_interval: 300s
105109
#var.number_of_workers: 5
106110
#var.shared_credential_file: /etc/filebeat/aws_credentials
@@ -118,6 +122,7 @@ Example config:
118122
enabled: false
119123
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
120124
#var.bucket_arn: 'arn:aws:s3:::mybucket'
125+
#var.bucket_list_prefix: 'prefix'
121126
#var.bucket_list_interval: 300s
122127
#var.number_of_workers: 5
123128
#var.shared_credential_file: /etc/filebeat/aws_credentials
@@ -135,6 +140,7 @@ Example config:
135140
enabled: false
136141
#var.queue_url: https://sqs.myregion.amazonaws.com/123456/myqueue
137142
#var.bucket_arn: 'arn:aws:s3:::mybucket'
143+
#var.bucket_list_prefix: 'prefix'
138144
#var.bucket_list_interval: 300s
139145
#var.number_of_workers: 5
140146
#var.shared_credential_file: /etc/filebeat/aws_credentials
@@ -178,6 +184,10 @@ Use to vertically scale the input.
178184

179185
Wait interval between completion of a list request to the S3 bucket and beginning of the next one. Default to be 120 seconds.
180186

187+
*`var.bucket_list_prefix`*::
188+
189+
Prefix to apply for the list request to the S3 bucket. Default empty.
190+
181191
*`var.endpoint`*::
182192

183193
Custom endpoint used to access AWS APIs.

x-pack/filebeat/filebeat.reference.yml

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,9 @@ filebeat.modules:
105105
# AWS S3 bucket arn
106106
#var.bucket_arn: 'arn:aws:s3:::mybucket'
107107

108+
# AWS S3 list prefix
109+
#var.bucket_list_prefix: 'prefix'
110+
108111
# Bucket list interval on S3 bucket
109112
#var.bucket_list_interval: 300s
110113

@@ -166,6 +169,9 @@ filebeat.modules:
166169
# AWS S3 bucket arn
167170
#var.bucket_arn: 'arn:aws:s3:::mybucket'
168171

172+
# AWS S3 list prefix
173+
#var.bucket_list_prefix: 'prefix'
174+
169175
# Bucket list interval on S3 bucket
170176
#var.bucket_list_interval: 300s
171177

@@ -215,6 +221,9 @@ filebeat.modules:
215221
# AWS S3 bucket arn
216222
#var.bucket_arn: 'arn:aws:s3:::mybucket'
217223

224+
# AWS S3 list prefix
225+
#var.bucket_list_prefix: 'prefix'
226+
218227
# Bucket list interval on S3 bucket
219228
#var.bucket_list_interval: 300s
220229

@@ -264,6 +273,9 @@ filebeat.modules:
264273
# AWS S3 bucket arn
265274
#var.bucket_arn: 'arn:aws:s3:::mybucket'
266275

276+
# AWS S3 list prefix
277+
#var.bucket_list_prefix: 'prefix'
278+
267279
# Bucket list interval on S3 bucket
268280
#var.bucket_list_interval: 300s
269281

@@ -313,6 +325,9 @@ filebeat.modules:
313325
# AWS S3 bucket arn
314326
#var.bucket_arn: 'arn:aws:s3:::mybucket'
315327

328+
# AWS S3 list prefix
329+
#var.bucket_list_prefix: 'prefix'
330+
316331
# Bucket list interval on S3 bucket
317332
#var.bucket_list_interval: 300s
318333

@@ -362,6 +377,9 @@ filebeat.modules:
362377
# AWS S3 bucket arn
363378
#var.bucket_arn: 'arn:aws:s3:::mybucket'
364379

380+
# AWS S3 list prefix
381+
#var.bucket_list_prefix: 'prefix'
382+
365383
# Bucket list interval on S3 bucket
366384
#var.bucket_list_interval: 300s
367385

x-pack/filebeat/input/awss3/config.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ type config struct {
2929
QueueURL string `config:"queue_url"`
3030
BucketARN string `config:"bucket_arn"`
3131
BucketListInterval time.Duration `config:"bucket_list_interval"`
32+
BucketListPrefix string `config:"bucket_list_prefix"`
3233
NumberOfWorkers int `config:"number_of_workers"`
3334
AWSConfig awscommon.ConfigAWS `config:",inline"`
3435
FileSelectors []fileSelectorConfig `config:"file_selectors"`
@@ -40,6 +41,7 @@ func defaultConfig() config {
4041
APITimeout: 120 * time.Second,
4142
VisibilityTimeout: 300 * time.Second,
4243
BucketListInterval: 120 * time.Second,
44+
BucketListPrefix: "",
4345
SQSWaitTime: 20 * time.Second,
4446
SQSMaxReceiveCount: 5,
4547
FIPSEnabled: false,

x-pack/filebeat/input/awss3/config_test.go

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -28,13 +28,15 @@ func TestConfig(t *testing.T) {
2828
parserConf := parser.Config{}
2929
require.NoError(t, parserConf.Unpack(common.MustNewConfigFrom("")))
3030
return config{
31-
QueueURL: quequeURL,
32-
BucketARN: s3Bucket,
33-
APITimeout: 120 * time.Second,
34-
VisibilityTimeout: 300 * time.Second,
35-
SQSMaxReceiveCount: 5,
36-
SQSWaitTime: 20 * time.Second,
37-
BucketListInterval: 120 * time.Second,
31+
QueueURL: quequeURL,
32+
BucketARN: s3Bucket,
33+
APITimeout: 120 * time.Second,
34+
VisibilityTimeout: 300 * time.Second,
35+
SQSMaxReceiveCount: 5,
36+
SQSWaitTime: 20 * time.Second,
37+
BucketListInterval: 120 * time.Second,
38+
BucketListPrefix: "",
39+
3840
FIPSEnabled: false,
3941
MaxNumberOfMessages: 5,
4042
ReaderConfig: readerConfig{

x-pack/filebeat/input/awss3/input.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -215,6 +215,7 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli
215215
log := ctx.Logger.With("bucket_arn", in.config.BucketARN)
216216
log.Infof("number_of_workers is set to %v.", in.config.NumberOfWorkers)
217217
log.Infof("bucket_list_interval is set to %v.", in.config.BucketListInterval)
218+
log.Infof("bucket_list_prefix is set to %v.", in.config.BucketListPrefix)
218219
log.Infof("AWS region is set to %v.", in.awsConfig.Region)
219220
log.Debugf("AWS S3 service name is %v.", s3ServiceName)
220221

@@ -233,6 +234,7 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli
233234
states,
234235
persistentStore,
235236
in.config.BucketARN,
237+
in.config.BucketListPrefix,
236238
in.awsConfig.Region,
237239
in.config.NumberOfWorkers,
238240
in.config.BucketListInterval)

x-pack/filebeat/input/awss3/input_benchmark_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ func (c constantS3) GetObject(ctx context.Context, bucket, key string) (*s3.GetO
134134
return newS3GetObjectResponse(c.filename, c.data, c.contentType), nil
135135
}
136136

137-
func (c constantS3) ListObjectsPaginator(bucket string) s3Pager {
137+
func (c constantS3) ListObjectsPaginator(bucket, prefix string) s3Pager {
138138
return c.pagerConstant
139139
}
140140

@@ -277,7 +277,7 @@ func benchmarkInputS3(t *testing.T, numberOfWorkers int) testing.BenchmarkResult
277277
}
278278

279279
s3EventHandlerFactory := newS3ObjectProcessorFactory(log.Named("s3"), metrics, s3API, client, conf.FileSelectors)
280-
s3Poller := newS3Poller(logp.NewLogger(inputName), metrics, s3API, s3EventHandlerFactory, newStates(inputCtx), store, "bucket", "region", numberOfWorkers, time.Second)
280+
s3Poller := newS3Poller(logp.NewLogger(inputName), metrics, s3API, s3EventHandlerFactory, newStates(inputCtx), store, "bucket", "key-", "region", numberOfWorkers, time.Second)
281281

282282
ctx, cancel := context.WithCancel(context.Background())
283283
b.Cleanup(cancel)

x-pack/filebeat/input/awss3/input_integration_test.go

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -384,3 +384,52 @@ func TestGetRegionForBucketARN(t *testing.T) {
384384
regionName, err := getRegionForBucketARN(context.Background(), s3Client, tfConfig.BucketName)
385385
assert.Equal(t, tfConfig.AWSRegion, regionName)
386386
}
387+
388+
func TestPaginatorListPrefix(t *testing.T) {
389+
logp.TestingSetup()
390+
391+
// Terraform is used to setup S3 and must be executed manually.
392+
tfConfig := getTerraformOutputs(t)
393+
394+
uploadS3TestFiles(t, tfConfig.AWSRegion, tfConfig.BucketName,
395+
"testdata/events-array.json",
396+
"testdata/invalid.json",
397+
"testdata/log.json",
398+
"testdata/log.ndjson",
399+
"testdata/multiline.json",
400+
"testdata/multiline.json.gz",
401+
"testdata/multiline.txt",
402+
"testdata/log.txt", // Skipped (no match).
403+
)
404+
405+
awsConfig, err := external.LoadDefaultAWSConfig()
406+
awsConfig.Region = tfConfig.AWSRegion
407+
if err != nil {
408+
t.Fatal(err)
409+
}
410+
411+
s3Client := s3.New(awscommon.EnrichAWSConfigWithEndpoint("", "s3", "", awsConfig))
412+
413+
s3API := &awsS3API{
414+
client: s3Client,
415+
}
416+
417+
var objects []string
418+
paginator := s3API.ListObjectsPaginator(tfConfig.BucketName, "log")
419+
for paginator.Next(context.Background()) {
420+
page := paginator.CurrentPage()
421+
for _, object := range page.Contents {
422+
objects = append(objects, *object.Key)
423+
}
424+
}
425+
426+
assert.NoError(t, paginator.Err())
427+
428+
expected := []string{
429+
"log.json",
430+
"log.ndjson",
431+
"log.txt",
432+
}
433+
434+
assert.Equal(t, expected, objects)
435+
}

x-pack/filebeat/input/awss3/interfaces.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ type s3Getter interface {
6666
}
6767

6868
type s3Lister interface {
69-
ListObjectsPaginator(bucket string) s3Pager
69+
ListObjectsPaginator(bucket, prefix string) s3Pager
7070
}
7171

7272
type s3Pager interface {
@@ -204,9 +204,10 @@ func (a *awsS3API) GetObject(ctx context.Context, bucket, key string) (*s3.GetOb
204204
return resp, nil
205205
}
206206

207-
func (a *awsS3API) ListObjectsPaginator(bucket string) s3Pager {
207+
func (a *awsS3API) ListObjectsPaginator(bucket, prefix string) s3Pager {
208208
req := a.client.ListObjectsRequest(&s3.ListObjectsInput{
209209
Bucket: awssdk.String(bucket),
210+
Prefix: awssdk.String(prefix),
210211
})
211212

212213
pager := s3.NewListObjectsPaginator(req)

x-pack/filebeat/input/awss3/mock_interfaces_test.go

Lines changed: 8 additions & 8 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)