Skip to content

Commit 791a601

Browse files
author
kaiyan-sheng
committed
Use aws sdk paginator for FilterLogEvents and GetMetricData (#26852)
(cherry picked from commit 683130a)
1 parent cda9e92 commit 791a601

4 files changed

Lines changed: 90 additions & 85 deletions

File tree

x-pack/filebeat/input/awscloudwatch/input.go

Lines changed: 18 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -222,57 +222,43 @@ func (in *awsCloudWatchInput) getLogGroupNames(svc cloudwatchlogsiface.ClientAPI
222222

223223
// getLogEventsFromCloudWatch uses FilterLogEvents API to collect logs from CloudWatch
224224
func (in *awsCloudWatchInput) getLogEventsFromCloudWatch(svc cloudwatchlogsiface.ClientAPI) error {
225-
ctx, cancelFn := context.WithTimeout(in.inputCtx, in.config.APITimeout)
226-
defer cancelFn()
227-
228-
init := true
229-
nextToken := ""
230225
currentTime := time.Now()
231226
startTime, endTime := getStartPosition(in.config.StartPosition, currentTime, in.prevEndTime, in.config.ScanFrequency)
232227
in.logger.Debugf("start_position = %s, startTime = %v, endTime = %v", in.config.StartPosition, time.Unix(startTime/1000, 0), time.Unix(endTime/1000, 0))
233228

234229
// overwrite prevEndTime using new endTime
235230
in.prevEndTime = endTime
236231

237-
for nextToken != "" || init {
238-
// construct FilterLogEventsInput
239-
filterLogEventsInput := in.constructFilterLogEventsInput(startTime, endTime, nextToken)
232+
// construct FilterLogEventsInput
233+
filterLogEventsInput := in.constructFilterLogEventsInput(startTime, endTime)
240234

241-
// make API request
242-
req := svc.FilterLogEventsRequest(filterLogEventsInput)
243-
resp, err := req.Send(ctx)
244-
if err != nil {
245-
in.logger.Error("failed FilterLogEventsRequest", err)
246-
return err
247-
}
248-
249-
// get token for next API call, if resp.NextToken is nil, nextToken set to ""
250-
nextToken = ""
251-
if resp.NextToken != nil {
252-
nextToken = *resp.NextToken
253-
}
235+
// make API request
236+
req := svc.FilterLogEventsRequest(filterLogEventsInput)
237+
paginator := cloudwatchlogs.NewFilterLogEventsPaginator(req)
238+
for paginator.Next(context.TODO()) {
239+
page := paginator.CurrentPage()
254240

255-
logEvents := resp.Events
241+
logEvents := page.Events
256242
in.logger.Debugf("Processing #%v events", len(logEvents))
257-
258-
err = in.processLogEvents(logEvents)
243+
err := in.processLogEvents(logEvents)
259244
if err != nil {
260245
err = errors.Wrap(err, "processLogEvents failed")
261246
in.logger.Error(err)
262-
cancelFn()
263247
}
248+
}
264249

265-
init = false
266-
267-
// This sleep is to avoid hitting the FilterLogEvents API limit(5 transactions per second (TPS)/account/Region).
268-
in.logger.Debugf("sleeping for %v before making FilterLogEvents API call again", in.config.APISleep)
269-
time.Sleep(in.config.APISleep)
270-
in.logger.Debug("done sleeping")
250+
if err := paginator.Err(); err != nil {
251+
return errors.Wrap(err, "error FilterLogEvents with Paginator")
271252
}
253+
254+
// This sleep is to avoid hitting the FilterLogEvents API limit(5 transactions per second (TPS)/account/Region).
255+
in.logger.Debugf("sleeping for %v before making FilterLogEvents API call again", in.config.APISleep)
256+
time.Sleep(in.config.APISleep)
257+
in.logger.Debug("done sleeping")
272258
return nil
273259
}
274260

275-
func (in *awsCloudWatchInput) constructFilterLogEventsInput(startTime int64, endTime int64, nextToken string) *cloudwatchlogs.FilterLogEventsInput {
261+
func (in *awsCloudWatchInput) constructFilterLogEventsInput(startTime int64, endTime int64) *cloudwatchlogs.FilterLogEventsInput {
276262
filterLogEventsInput := &cloudwatchlogs.FilterLogEventsInput{
277263
LogGroupName: awssdk.String(in.config.LogGroupName),
278264
StartTime: awssdk.Int64(startTime),
@@ -286,10 +272,6 @@ func (in *awsCloudWatchInput) constructFilterLogEventsInput(startTime int64, end
286272
if in.config.LogStreamPrefix != "" {
287273
filterLogEventsInput.LogStreamNamePrefix = awssdk.String(in.config.LogStreamPrefix)
288274
}
289-
290-
if nextToken != "" {
291-
filterLogEventsInput.NextToken = awssdk.String(nextToken)
292-
}
293275
return filterLogEventsInput
294276
}
295277

x-pack/metricbeat/module/aws/cloudwatch/cloudwatch_test.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1241,7 +1241,13 @@ func (m *MockCloudWatchClient) GetMetricDataRequest(input *cloudwatch.GetMetricD
12411241
httpReq, _ := http.NewRequest("", "", nil)
12421242

12431243
return cloudwatch.GetMetricDataRequest{
1244+
Input: input,
1245+
Copy: m.GetMetricDataRequest,
12441246
Request: &awssdk.Request{
1247+
Operation: &awssdk.Operation{
1248+
Name: "GetMetricData",
1249+
Paginator: nil,
1250+
},
12451251
Data: &cloudwatch.GetMetricDataOutput{
12461252
MetricDataResults: []cloudwatch.MetricDataResult{
12471253
{
@@ -1284,7 +1290,13 @@ func (m *MockCloudWatchClientWithoutDim) GetMetricDataRequest(input *cloudwatch.
12841290
httpReq, _ := http.NewRequest("", "", nil)
12851291

12861292
return cloudwatch.GetMetricDataRequest{
1293+
Input: input,
1294+
Copy: m.GetMetricDataRequest,
12871295
Request: &awssdk.Request{
1296+
Operation: &awssdk.Operation{
1297+
Name: "GetMetricData",
1298+
Paginator: nil,
1299+
},
12881300
Data: &cloudwatch.GetMetricDataOutput{
12891301
MetricDataResults: []cloudwatch.MetricDataResult{
12901302
{

x-pack/metricbeat/module/aws/utils.go

Lines changed: 37 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -39,69 +39,58 @@ func GetStartTimeEndTime(period time.Duration, latency time.Duration) (time.Time
3939
// to obtain statistical data.
4040
func GetListMetricsOutput(namespace string, regionName string, svcCloudwatch cloudwatchiface.ClientAPI) ([]cloudwatch.Metric, error) {
4141
var metricsTotal []cloudwatch.Metric
42-
init := true
4342
var nextToken *string
4443

45-
for init || nextToken != nil {
46-
init = false
47-
listMetricsInput := &cloudwatch.ListMetricsInput{
48-
NextToken: nextToken,
49-
}
50-
if namespace != "*" {
51-
listMetricsInput.Namespace = &namespace
52-
}
53-
reqListMetrics := svcCloudwatch.ListMetricsRequest(listMetricsInput)
54-
55-
// List metrics of a given namespace for each region
56-
listMetricsOutput, err := reqListMetrics.Send(context.TODO())
57-
if err != nil {
58-
return nil, errors.Wrap(err, "ListMetricsRequest failed, skipping region "+regionName)
59-
}
60-
metricsTotal = append(metricsTotal, listMetricsOutput.Metrics...)
61-
nextToken = listMetricsOutput.NextToken
44+
listMetricsInput := &cloudwatch.ListMetricsInput{
45+
NextToken: nextToken,
46+
}
47+
if namespace != "*" {
48+
listMetricsInput.Namespace = &namespace
6249
}
6350

64-
return metricsTotal, nil
65-
}
66-
67-
func getMetricDataPerRegion(metricDataQueries []cloudwatch.MetricDataQuery, nextToken *string, svc cloudwatchiface.ClientAPI, startTime time.Time, endTime time.Time) (*cloudwatch.GetMetricDataOutput, error) {
68-
getMetricDataInput := &cloudwatch.GetMetricDataInput{
69-
NextToken: nextToken,
70-
StartTime: &startTime,
71-
EndTime: &endTime,
72-
MetricDataQueries: metricDataQueries,
51+
// List metrics of a given namespace for each region
52+
req := svcCloudwatch.ListMetricsRequest(listMetricsInput)
53+
paginator := cloudwatch.NewListMetricsPaginator(req)
54+
for paginator.Next(context.TODO()) {
55+
page := paginator.CurrentPage()
56+
metricsTotal = append(metricsTotal, page.Metrics...)
7357
}
7458

75-
reqGetMetricData := svc.GetMetricDataRequest(getMetricDataInput)
76-
getMetricDataResponse, err := reqGetMetricData.Send(context.TODO())
77-
if err != nil {
78-
return nil, errors.Wrap(err, "Error GetMetricDataInput")
59+
if err := paginator.Err(); err != nil {
60+
return metricsTotal, errors.Wrap(err, "error ListMetrics with Paginator, skipping region "+regionName)
7961
}
80-
return getMetricDataResponse.GetMetricDataOutput, nil
62+
return metricsTotal, nil
8163
}
8264

8365
// GetMetricDataResults function uses MetricDataQueries to get metric data output.
8466
func GetMetricDataResults(metricDataQueries []cloudwatch.MetricDataQuery, svc cloudwatchiface.ClientAPI, startTime time.Time, endTime time.Time) ([]cloudwatch.MetricDataResult, error) {
85-
init := true
8667
maxQuerySize := 100
8768
getMetricDataOutput := &cloudwatch.GetMetricDataOutput{NextToken: nil}
88-
for init || getMetricDataOutput.NextToken != nil {
89-
init = false
90-
// Split metricDataQueries into smaller slices that length no longer than 100.
91-
// 100 is defined in maxQuerySize.
92-
// To avoid ValidationError: The collection MetricDataQueries must not have a size greater than 100.
93-
for i := 0; i < len(metricDataQueries); i += maxQuerySize {
94-
metricDataQueriesPartial := metricDataQueries[i:int(math.Min(float64(i+maxQuerySize), float64(len(metricDataQueries))))]
95-
if len(metricDataQueriesPartial) == 0 {
96-
return getMetricDataOutput.MetricDataResults, nil
97-
}
9869

99-
output, err := getMetricDataPerRegion(metricDataQueriesPartial, getMetricDataOutput.NextToken, svc, startTime, endTime)
100-
if err != nil {
101-
return getMetricDataOutput.MetricDataResults, errors.Wrap(err, "getMetricDataPerRegion failed")
102-
}
70+
// Split metricDataQueries into smaller slices that length no longer than 100.
71+
// 100 is defined in maxQuerySize.
72+
// To avoid ValidationError: The collection MetricDataQueries must not have a size greater than 100.
73+
for i := 0; i < len(metricDataQueries); i += maxQuerySize {
74+
metricDataQueriesPartial := metricDataQueries[i:int(math.Min(float64(i+maxQuerySize), float64(len(metricDataQueries))))]
75+
if len(metricDataQueriesPartial) == 0 {
76+
return getMetricDataOutput.MetricDataResults, nil
77+
}
78+
79+
getMetricDataInput := &cloudwatch.GetMetricDataInput{
80+
StartTime: &startTime,
81+
EndTime: &endTime,
82+
MetricDataQueries: metricDataQueriesPartial,
83+
}
84+
85+
req := svc.GetMetricDataRequest(getMetricDataInput)
86+
paginator := cloudwatch.NewGetMetricDataPaginator(req)
87+
for paginator.Next(context.TODO()) {
88+
page := paginator.CurrentPage()
89+
getMetricDataOutput.MetricDataResults = append(getMetricDataOutput.MetricDataResults, page.MetricDataResults...)
90+
}
10391

104-
getMetricDataOutput.MetricDataResults = append(getMetricDataOutput.MetricDataResults, output.MetricDataResults...)
92+
if err := paginator.Err(); err != nil {
93+
return getMetricDataOutput.MetricDataResults, errors.Wrap(err, "error GetMetricData with Paginator")
10594
}
10695
}
10796
return getMetricDataOutput.MetricDataResults, nil

x-pack/metricbeat/module/aws/utils_test.go

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package aws
66

77
import (
8+
"context"
89
"fmt"
910
"net/http"
1011
"testing"
@@ -58,7 +59,13 @@ func (m *MockCloudWatchClient) ListMetricsRequest(input *cloudwatch.ListMetricsI
5859
}
5960
httpReq, _ := http.NewRequest("", "", nil)
6061
return cloudwatch.ListMetricsRequest{
62+
Input: input,
63+
Copy: m.ListMetricsRequest,
6164
Request: &awssdk.Request{
65+
Operation: &awssdk.Operation{
66+
Name: "ListMetrics",
67+
Paginator: nil,
68+
},
6269
Data: &cloudwatch.ListMetricsOutput{
6370
Metrics: []cloudwatch.Metric{
6471
{
@@ -81,7 +88,13 @@ func (m *MockCloudWatchClient) GetMetricDataRequest(input *cloudwatch.GetMetricD
8188
httpReq, _ := http.NewRequest("", "", nil)
8289

8390
return cloudwatch.GetMetricDataRequest{
91+
Input: input,
92+
Copy: m.GetMetricDataRequest,
8493
Request: &awssdk.Request{
94+
Operation: &awssdk.Operation{
95+
Name: "GetMetricData",
96+
Paginator: nil,
97+
},
8598
Data: &cloudwatch.GetMetricDataOutput{
8699
MetricDataResults: []cloudwatch.MetricDataResult{
87100
{
@@ -210,7 +223,16 @@ func TestGetMetricDataPerRegion(t *testing.T) {
210223

211224
mockSvc := &MockCloudWatchClient{}
212225
var metricDataQueries []cloudwatch.MetricDataQuery
213-
getMetricDataOutput, err := getMetricDataPerRegion(metricDataQueries, nil, mockSvc, startTime, endTime)
226+
227+
getMetricDataInput := &cloudwatch.GetMetricDataInput{
228+
NextToken: nil,
229+
StartTime: &startTime,
230+
EndTime: &endTime,
231+
MetricDataQueries: metricDataQueries,
232+
}
233+
234+
reqGetMetricData := mockSvc.GetMetricDataRequest(getMetricDataInput)
235+
getMetricDataOutput, err := reqGetMetricData.Send(context.TODO())
214236
if err != nil {
215237
fmt.Println("failed getMetricDataPerRegion: ", err)
216238
t.FailNow()

0 commit comments

Comments
 (0)