Skip to content

Commit deece39

Browse files
authored
Fix awscloudwatch worker allocation (#38953)
Fix a bug in cloudwatch worker allocation that could cause data loss (#38918). The previous behavior wasn't really tested, since worker tasks were computed in cloudwatchPoller's polling loop which required live AWS connections. So in addition to the basic logical fix, I did some refactoring to cloudwatchPoller that makes the task iteration visible to unit tests.
1 parent e53eb0c commit deece39

File tree

5 files changed

+316
-230
lines changed

5 files changed

+316
-230
lines changed

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -132,6 +132,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
132132
- Fix panic when more than 32767 pipeline clients are active. {issue}38197[38197] {pull}38556[38556]
133133
- Fix filestream's registry GC: registry entries are now removed from the in-memory and disk store when they're older than the set TTL {issue}36761[36761] {pull}38488[38488]
134134
- [threatintel] MISP splitting fix for empty responses {issue}38739[38739] {pull}38917[38917]
135+
- Fix a bug in cloudwatch task allocation that could skip some logs {issue}38918[38918] {pull}38953[38953]
135136
- Prevent GCP Pub/Sub input blockage by increasing default value of `max_outstanding_messages` {issue}35029[35029] {pull}38985[38985]
136137
- entity-analytics input: Improve structured logging. {pull}38990[38990]
137138
- Fix config validation for CEL and HTTPJSON inputs when using password grant authentication and `client.id` or `client.secret` are not present. {pull}38962[38962]

x-pack/filebeat/input/awscloudwatch/cloudwatch.go

Lines changed: 104 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -14,61 +14,69 @@ import (
1414
awssdk "github.com/aws/aws-sdk-go-v2/aws"
1515
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
1616

17-
awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
1817
"github.com/elastic/elastic-agent-libs/logp"
1918
)
2019

2120
type cloudwatchPoller struct {
22-
numberOfWorkers int
23-
apiSleep time.Duration
21+
config config
2422
region string
25-
logStreams []*string
26-
logStreamPrefix string
27-
startTime int64
28-
endTime int64
29-
workerSem *awscommon.Sem
3023
log *logp.Logger
3124
metrics *inputMetrics
3225
workersListingMap *sync.Map
3326
workersProcessingMap *sync.Map
27+
28+
// When a worker is ready for its next task, it should
29+
// send to workRequestChan and then read from workResponseChan.
30+
// The worker can cancel the request based on other context
31+
// cancellations, but if the write succeeds it _must_ read from
32+
// workResponseChan to avoid deadlocking the main loop.
33+
workRequestChan chan struct{}
34+
workResponseChan chan workResponse
35+
36+
workerWg sync.WaitGroup
37+
}
38+
39+
type workResponse struct {
40+
logGroup string
41+
startTime, endTime time.Time
3442
}
3543

3644
func newCloudwatchPoller(log *logp.Logger, metrics *inputMetrics,
37-
awsRegion string, apiSleep time.Duration,
38-
numberOfWorkers int, logStreams []*string, logStreamPrefix string) *cloudwatchPoller {
45+
awsRegion string, config config) *cloudwatchPoller {
3946
if metrics == nil {
4047
metrics = newInputMetrics("", nil)
4148
}
4249

4350
return &cloudwatchPoller{
44-
numberOfWorkers: numberOfWorkers,
45-
apiSleep: apiSleep,
46-
region: awsRegion,
47-
logStreams: logStreams,
48-
logStreamPrefix: logStreamPrefix,
49-
startTime: int64(0),
50-
endTime: int64(0),
51-
workerSem: awscommon.NewSem(numberOfWorkers),
5251
log: log,
5352
metrics: metrics,
53+
region: awsRegion,
54+
config: config,
5455
workersListingMap: new(sync.Map),
5556
workersProcessingMap: new(sync.Map),
57+
// workRequestChan is unbuffered to guarantee that
58+
// the worker and main loop agree whether a request
59+
// was sent. workerResponseChan is buffered so the
60+
// main loop doesn't have to block on the workers
61+
// while distributing new data.
62+
workRequestChan: make(chan struct{}),
63+
workResponseChan: make(chan workResponse, 10),
5664
}
5765
}
5866

59-
func (p *cloudwatchPoller) run(svc *cloudwatchlogs.Client, logGroup string, startTime int64, endTime int64, logProcessor *logProcessor) {
67+
func (p *cloudwatchPoller) run(svc *cloudwatchlogs.Client, logGroup string, startTime, endTime time.Time, logProcessor *logProcessor) {
6068
err := p.getLogEventsFromCloudWatch(svc, logGroup, startTime, endTime, logProcessor)
6169
if err != nil {
6270
var errRequestCanceled *awssdk.RequestCanceledError
6371
if errors.As(err, &errRequestCanceled) {
64-
p.log.Error("getLogEventsFromCloudWatch failed with RequestCanceledError: ", err)
72+
p.log.Error("getLogEventsFromCloudWatch failed with RequestCanceledError: ", errRequestCanceled)
6573
}
6674
p.log.Error("getLogEventsFromCloudWatch failed: ", err)
6775
}
6876
}
6977

7078
// getLogEventsFromCloudWatch uses FilterLogEvents API to collect logs from CloudWatch
71-
func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client, logGroup string, startTime int64, endTime int64, logProcessor *logProcessor) error {
79+
func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client, logGroup string, startTime, endTime time.Time, logProcessor *logProcessor) error {
7280
// construct FilterLogEventsInput
7381
filterLogEventsInput := p.constructFilterLogEventsInput(startTime, endTime, logGroup)
7482
paginator := cloudwatchlogs.NewFilterLogEventsPaginator(svc, filterLogEventsInput)
@@ -83,8 +91,8 @@ func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client
8391
p.metrics.logEventsReceivedTotal.Add(uint64(len(logEvents)))
8492

8593
// This sleep is to avoid hitting the FilterLogEvents API limit(5 transactions per second (TPS)/account/Region).
86-
p.log.Debugf("sleeping for %v before making FilterLogEvents API call again", p.apiSleep)
87-
time.Sleep(p.apiSleep)
94+
p.log.Debugf("sleeping for %v before making FilterLogEvents API call again", p.config.APISleep)
95+
time.Sleep(p.config.APISleep)
8896
p.log.Debug("done sleeping")
8997

9098
p.log.Debugf("Processing #%v events", len(logEvents))
@@ -93,21 +101,87 @@ func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client
93101
return nil
94102
}
95103

96-
func (p *cloudwatchPoller) constructFilterLogEventsInput(startTime int64, endTime int64, logGroup string) *cloudwatchlogs.FilterLogEventsInput {
104+
func (p *cloudwatchPoller) constructFilterLogEventsInput(startTime, endTime time.Time, logGroup string) *cloudwatchlogs.FilterLogEventsInput {
97105
filterLogEventsInput := &cloudwatchlogs.FilterLogEventsInput{
98106
LogGroupName: awssdk.String(logGroup),
99-
StartTime: awssdk.Int64(startTime),
100-
EndTime: awssdk.Int64(endTime),
107+
StartTime: awssdk.Int64(startTime.UnixNano() / int64(time.Millisecond)),
108+
EndTime: awssdk.Int64(endTime.UnixNano() / int64(time.Millisecond)),
101109
}
102110

103-
if len(p.logStreams) > 0 {
104-
for _, stream := range p.logStreams {
111+
if len(p.config.LogStreams) > 0 {
112+
for _, stream := range p.config.LogStreams {
105113
filterLogEventsInput.LogStreamNames = append(filterLogEventsInput.LogStreamNames, *stream)
106114
}
107115
}
108116

109-
if p.logStreamPrefix != "" {
110-
filterLogEventsInput.LogStreamNamePrefix = awssdk.String(p.logStreamPrefix)
117+
if p.config.LogStreamPrefix != "" {
118+
filterLogEventsInput.LogStreamNamePrefix = awssdk.String(p.config.LogStreamPrefix)
111119
}
112120
return filterLogEventsInput
113121
}
122+
123+
func (p *cloudwatchPoller) startWorkers(
124+
ctx context.Context,
125+
svc *cloudwatchlogs.Client,
126+
logProcessor *logProcessor,
127+
) {
128+
for i := 0; i < p.config.NumberOfWorkers; i++ {
129+
p.workerWg.Add(1)
130+
go func() {
131+
defer p.workerWg.Done()
132+
for {
133+
var work workResponse
134+
select {
135+
case <-ctx.Done():
136+
return
137+
case p.workRequestChan <- struct{}{}:
138+
work = <-p.workResponseChan
139+
}
140+
141+
p.log.Infof("aws-cloudwatch input worker for log group: '%v' has started", work.logGroup)
142+
p.run(svc, work.logGroup, work.startTime, work.endTime, logProcessor)
143+
p.log.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", work.logGroup)
144+
}
145+
}()
146+
}
147+
}
148+
149+
// receive implements the main run loop that distributes tasks to the worker
150+
// goroutines. It accepts a "clock" callback (which on a live input should
151+
// equal time.Now) to allow deterministic unit tests.
152+
func (p *cloudwatchPoller) receive(ctx context.Context, logGroupNames []string, clock func() time.Time) {
153+
defer p.workerWg.Wait()
154+
// startTime and endTime are the bounds of the current scanning interval.
155+
// If we're starting at the end of the logs, advance the start time to the
156+
// most recent scan window
157+
var startTime time.Time
158+
endTime := clock().Add(-p.config.Latency)
159+
if p.config.StartPosition == "end" {
160+
startTime = endTime.Add(-p.config.ScanFrequency)
161+
}
162+
for ctx.Err() == nil {
163+
for _, lg := range logGroupNames {
164+
select {
165+
case <-ctx.Done():
166+
return
167+
case <-p.workRequestChan:
168+
p.workResponseChan <- workResponse{
169+
logGroup: lg,
170+
startTime: startTime,
171+
endTime: endTime,
172+
}
173+
}
174+
}
175+
176+
// Delay for ScanFrequency after finishing a time span
177+
p.log.Debugf("sleeping for %v before checking new logs", p.config.ScanFrequency)
178+
select {
179+
case <-time.After(p.config.ScanFrequency):
180+
case <-ctx.Done():
181+
}
182+
p.log.Debug("done sleeping")
183+
184+
// Advance to the next time span
185+
startTime, endTime = endTime, clock().Add(-p.config.Latency)
186+
}
187+
}

0 commit comments

Comments
 (0)