Skip to content

Commit ea8f10c

Browse files
author
kaiyan-sheng
authored
Support running multiple log groups in cloudwatch input (#29695)
1 parent a84302d commit ea8f10c

33 files changed

Lines changed: 907 additions & 311 deletions

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
111111
- Undo deletion of endpoint config from cloudtrail fileset in {pull}29415[29415]. {pull}29450[29450]
112112
- Make Cisco ASA and FTD modules conform to the ECS definition for event.outcome and event.type. {issue}29581[29581] {pull}29698[29698]
113113
- ibmmq: Fixed `@timestamp` not being populated with correct values. {pull}29773[29773]
114+
- Fix using log_group_name_prefix in aws-cloudwatch input. {pull}29695[29695]
114115
- aws-s3: Improve gzip detection to avoid false negatives. {issue}29968[29968]
115116

116117
*Heartbeat*

x-pack/filebeat/docs/inputs/input-aws-cloudwatch.asciidoc

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,18 @@ log_group_name is given.
5050
==== `log_group_name_prefix`
5151
The prefix for a group of log group names. Note: `region_name` is required when
5252
log_group_name_prefix is given. `log_group_name` and `log_group_name_prefix`
53-
cannot be given at the same time.
53+
cannot be given at the same time. The number of workers that will process the
54+
log groups under this prefix is set through the `number_of_workers` config.
5455

5556
[float]
5657
==== `region_name`
5758
Region that the specified log group or log group prefix belongs to.
5859

60+
[float]
61+
==== `number_of_workers`
62+
Number of workers that will process the log groups with the given `log_group_name_prefix`.
63+
Default value is 1.
64+
5965
[float]
6066
==== `log_streams`
6167
A list of strings of log streams names that Filebeat collect log events from.
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
terraform/
2+
outputs.yml
3+
*.tfstate*

x-pack/filebeat/input/awscloudwatch/_meta/terraform/.terraform.lock.hcl

Lines changed: 57 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# Terraform setup for AWS CloudWatch Input Integration Tests
2+
3+
This directory contains a Terraform module that creates the AWS resources needed
4+
for executing the integration tests for the `aws-cloudwatch` Filebeat input. It
5+
creates two CloudWatch log groups, and one log stream under each log group.
6+
7+
It outputs configuration information that is consumed by the tests to
8+
`outputs.yml`. The AWS resources are randomly named to prevent name collisions
9+
between multiple users.
10+
11+
### Usage
12+
13+
You must have the appropriate AWS environment variables for authentication set
14+
before running Terraform or the integration tests. The AWS key must be
15+
authorized to create and destroy AWS CloudWatch log groups.
16+
17+
1. Initialize a working directory containing Terraform configuration files.
18+
19+
`terraform init`
20+
21+
2. Execute terraform in this directory to create the resources. This will also
22+
write the `outputs.yml`. You can use `export TF_VAR_aws_region=NNNNN` in order
23+
to match the AWS region of the profile you are using.
24+
25+
`terraform apply`
26+
27+
28+
2. (Optional) View the output configuration.
29+
30+
```yaml
31+
"aws_region": "us-east-1"
32+
"log_group_name_1": "filebeat-cloudwatch-integtest-1-417koa"
33+
"log_group_name_2": "filebeat-cloudwatch-integtest-2-417koa"
34+
```
35+
36+
3. Execute the integration test.
37+
38+
```
39+
cd x-pack/filebeat/input/awss3
40+
go test -tags aws,integration -run TestInputRun.+ -v .
41+
```
42+
43+
4. Cleanup AWS resources. Execute terraform to delete the log groups created for
44+
testing.
45+
46+
`terraform destroy`
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
terraform {
2+
required_providers {
3+
aws = {
4+
source = "hashicorp/aws"
5+
version = "~> 3.52"
6+
}
7+
}
8+
}
9+
10+
provider "aws" {
11+
region = var.aws_region
12+
}
13+
14+
resource "random_string" "random" {
15+
length = 6
16+
special = false
17+
upper = false
18+
}
19+
20+
resource "aws_cloudwatch_log_group" "filebeat-integtest-1" {
21+
name = "filebeat-log-group-integtest-1-${random_string.random.result}"
22+
23+
tags = {
24+
Environment = "test"
25+
}
26+
}
27+
28+
resource "aws_cloudwatch_log_group" "filebeat-integtest-2" {
29+
name = "filebeat-log-group-integtest-2-${random_string.random.result}"
30+
31+
tags = {
32+
Environment = "test"
33+
}
34+
}
35+
36+
resource "aws_cloudwatch_log_stream" "filebeat-integtest-1" {
37+
name = "filebeat-log-stream-integtest-1-${random_string.random.result}"
38+
log_group_name = aws_cloudwatch_log_group.filebeat-integtest-1.name
39+
}
40+
41+
resource "aws_cloudwatch_log_stream" "filebeat-integtest-2" {
42+
name = "filebeat-log-stream-integtest-2-${random_string.random.result}"
43+
log_group_name = aws_cloudwatch_log_group.filebeat-integtest-2.name
44+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
resource "local_file" "secrets" {
2+
content = yamlencode({
3+
"log_group_name_1" : aws_cloudwatch_log_group.filebeat-integtest-1.name
4+
"log_group_name_2" : aws_cloudwatch_log_group.filebeat-integtest-2.name
5+
"log_stream_name_1" : aws_cloudwatch_log_stream.filebeat-integtest-1.name
6+
"log_stream_name_2" : aws_cloudwatch_log_stream.filebeat-integtest-2.name
7+
"aws_region" : var.aws_region
8+
})
9+
filename = "${path.module}/outputs.yml"
10+
file_permission = "0644"
11+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
variable "aws_region" {
2+
description = "AWS Region"
3+
type = string
4+
default = "us-east-1"
5+
}
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
5+
package awscloudwatch
6+
7+
import (
8+
"context"
9+
"sync"
10+
"time"
11+
12+
awssdk "github.com/aws/aws-sdk-go-v2/aws"
13+
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
14+
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs/cloudwatchlogsiface"
15+
"github.com/pkg/errors"
16+
17+
"github.com/elastic/beats/v7/libbeat/logp"
18+
"github.com/elastic/beats/v7/libbeat/monitoring"
19+
"github.com/elastic/beats/v7/libbeat/statestore"
20+
awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
21+
)
22+
23+
type cloudwatchPoller struct {
24+
numberOfWorkers int
25+
apiSleep time.Duration
26+
region string
27+
logStreams []string
28+
logStreamPrefix string
29+
startTime int64
30+
endTime int64
31+
prevEndTime int64
32+
workerSem *awscommon.Sem
33+
log *logp.Logger
34+
metrics *inputMetrics
35+
store *statestore.Store
36+
workersListingMap *sync.Map
37+
workersProcessingMap *sync.Map
38+
}
39+
40+
func newCloudwatchPoller(log *logp.Logger, metrics *inputMetrics,
41+
store *statestore.Store,
42+
awsRegion string, apiSleep time.Duration,
43+
numberOfWorkers int, logStreams []string, logStreamPrefix string) *cloudwatchPoller {
44+
if metrics == nil {
45+
metrics = newInputMetrics(monitoring.NewRegistry(), "")
46+
}
47+
48+
return &cloudwatchPoller{
49+
numberOfWorkers: numberOfWorkers,
50+
apiSleep: apiSleep,
51+
region: awsRegion,
52+
logStreams: logStreams,
53+
logStreamPrefix: logStreamPrefix,
54+
startTime: int64(0),
55+
endTime: int64(0),
56+
workerSem: awscommon.NewSem(numberOfWorkers),
57+
log: log,
58+
metrics: metrics,
59+
store: store,
60+
workersListingMap: new(sync.Map),
61+
workersProcessingMap: new(sync.Map),
62+
}
63+
}
64+
65+
func (p *cloudwatchPoller) run(svc cloudwatchlogsiface.ClientAPI, logGroup string, startTime int64, endTime int64, logProcessor *logProcessor) {
66+
err := p.getLogEventsFromCloudWatch(svc, logGroup, startTime, endTime, logProcessor)
67+
if err != nil {
68+
var err *awssdk.RequestCanceledError
69+
if errors.As(err, &err) {
70+
p.log.Error("getLogEventsFromCloudWatch failed with RequestCanceledError: ", err)
71+
}
72+
p.log.Error("getLogEventsFromCloudWatch failed: ", err)
73+
}
74+
}
75+
76+
// getLogEventsFromCloudWatch uses FilterLogEvents API to collect logs from CloudWatch
77+
func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc cloudwatchlogsiface.ClientAPI, logGroup string, startTime int64, endTime int64, logProcessor *logProcessor) error {
78+
// construct FilterLogEventsInput
79+
filterLogEventsInput := p.constructFilterLogEventsInput(startTime, endTime, logGroup)
80+
81+
// make API request
82+
req := svc.FilterLogEventsRequest(filterLogEventsInput)
83+
paginator := cloudwatchlogs.NewFilterLogEventsPaginator(req)
84+
for paginator.Next(context.TODO()) {
85+
page := paginator.CurrentPage()
86+
p.metrics.apiCallsTotal.Inc()
87+
88+
logEvents := page.Events
89+
p.metrics.logEventsReceivedTotal.Add(uint64(len(logEvents)))
90+
91+
// This sleep is to avoid hitting the FilterLogEvents API limit(5 transactions per second (TPS)/account/Region).
92+
p.log.Debugf("sleeping for %v before making FilterLogEvents API call again", p.apiSleep)
93+
time.Sleep(p.apiSleep)
94+
p.log.Debug("done sleeping")
95+
96+
p.log.Debugf("Processing #%v events", len(logEvents))
97+
err := logProcessor.processLogEvents(logEvents, logGroup, p.region)
98+
if err != nil {
99+
err = errors.Wrap(err, "processLogEvents failed")
100+
p.log.Error(err)
101+
}
102+
}
103+
104+
if err := paginator.Err(); err != nil {
105+
return errors.Wrap(err, "error FilterLogEvents with Paginator")
106+
}
107+
return nil
108+
}
109+
110+
func (p *cloudwatchPoller) constructFilterLogEventsInput(startTime int64, endTime int64, logGroup string) *cloudwatchlogs.FilterLogEventsInput {
111+
filterLogEventsInput := &cloudwatchlogs.FilterLogEventsInput{
112+
LogGroupName: awssdk.String(logGroup),
113+
StartTime: awssdk.Int64(startTime),
114+
EndTime: awssdk.Int64(endTime),
115+
Limit: awssdk.Int64(100),
116+
}
117+
118+
if len(p.logStreams) > 0 {
119+
filterLogEventsInput.LogStreamNames = p.logStreams
120+
}
121+
122+
if p.logStreamPrefix != "" {
123+
filterLogEventsInput.LogStreamNamePrefix = awssdk.String(p.logStreamPrefix)
124+
}
125+
return filterLogEventsInput
126+
}

x-pack/filebeat/input/awscloudwatch/config.go

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,18 +25,20 @@ type config struct {
2525
APITimeout time.Duration `config:"api_timeout" validate:"min=0,nonzero"`
2626
APISleep time.Duration `config:"api_sleep" validate:"min=0,nonzero"`
2727
Latency time.Duration `config:"latency"`
28-
AwsConfig awscommon.ConfigAWS `config:",inline"`
28+
NumberOfWorkers int `config:"number_of_workers"`
29+
AWSConfig awscommon.ConfigAWS `config:",inline"`
2930
}
3031

3132
func defaultConfig() config {
3233
return config{
3334
ForwarderConfig: harvester.ForwarderConfig{
3435
Type: "aws-cloudwatch",
3536
},
36-
StartPosition: "beginning",
37-
ScanFrequency: 10 * time.Second,
38-
APITimeout: 120 * time.Second,
39-
APISleep: 200 * time.Millisecond, // FilterLogEvents has a limit of 5 transactions per second (TPS)/account/Region: 1s / 5 = 200 ms
37+
StartPosition: "beginning",
38+
ScanFrequency: 10 * time.Second,
39+
APITimeout: 120 * time.Second,
40+
APISleep: 200 * time.Millisecond, // FilterLogEvents has a limit of 5 transactions per second (TPS)/account/Region: 1s / 5 = 200 ms
41+
NumberOfWorkers: 1,
4042
}
4143
}
4244

0 commit comments

Comments
 (0)