@@ -14,61 +14,69 @@ import (
1414 awssdk "github.com/aws/aws-sdk-go-v2/aws"
1515 "github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
1616
17- awscommon "github.com/elastic/beats/v7/x-pack/libbeat/common/aws"
1817 "github.com/elastic/elastic-agent-libs/logp"
1918)
2019
2120type cloudwatchPoller struct {
22- numberOfWorkers int
23- apiSleep time.Duration
21+ config config
2422 region string
25- logStreams []* string
26- logStreamPrefix string
27- startTime int64
28- endTime int64
29- workerSem * awscommon.Sem
3023 log * logp.Logger
3124 metrics * inputMetrics
3225 workersListingMap * sync.Map
3326 workersProcessingMap * sync.Map
27+
28+ // When a worker is ready for its next task, it should
29+ // send to workRequestChan and then read from workResponseChan.
30+ // The worker can cancel the request based on other context
31+ // cancellations, but if the write succeeds it _must_ read from
32+ // workResponseChan to avoid deadlocking the main loop.
33+ workRequestChan chan struct {}
34+ workResponseChan chan workResponse
35+
36+ workerWg sync.WaitGroup
37+ }
38+
39+ type workResponse struct {
40+ logGroup string
41+ startTime , endTime time.Time
3442}
3543
3644func newCloudwatchPoller (log * logp.Logger , metrics * inputMetrics ,
37- awsRegion string , apiSleep time.Duration ,
38- numberOfWorkers int , logStreams []* string , logStreamPrefix string ) * cloudwatchPoller {
45+ awsRegion string , config config ) * cloudwatchPoller {
3946 if metrics == nil {
4047 metrics = newInputMetrics ("" , nil )
4148 }
4249
4350 return & cloudwatchPoller {
44- numberOfWorkers : numberOfWorkers ,
45- apiSleep : apiSleep ,
46- region : awsRegion ,
47- logStreams : logStreams ,
48- logStreamPrefix : logStreamPrefix ,
49- startTime : int64 (0 ),
50- endTime : int64 (0 ),
51- workerSem : awscommon .NewSem (numberOfWorkers ),
5251 log : log ,
5352 metrics : metrics ,
53+ region : awsRegion ,
54+ config : config ,
5455 workersListingMap : new (sync.Map ),
5556 workersProcessingMap : new (sync.Map ),
57+ // workRequestChan is unbuffered to guarantee that
58+ // the worker and main loop agree whether a request
59+ // was sent. workerResponseChan is buffered so the
60+ // main loop doesn't have to block on the workers
61+ // while distributing new data.
62+ workRequestChan : make (chan struct {}),
63+ workResponseChan : make (chan workResponse , 10 ),
5664 }
5765}
5866
59- func (p * cloudwatchPoller ) run (svc * cloudwatchlogs.Client , logGroup string , startTime int64 , endTime int64 , logProcessor * logProcessor ) {
67+ func (p * cloudwatchPoller ) run (svc * cloudwatchlogs.Client , logGroup string , startTime , endTime time. Time , logProcessor * logProcessor ) {
6068 err := p .getLogEventsFromCloudWatch (svc , logGroup , startTime , endTime , logProcessor )
6169 if err != nil {
6270 var errRequestCanceled * awssdk.RequestCanceledError
6371 if errors .As (err , & errRequestCanceled ) {
64- p .log .Error ("getLogEventsFromCloudWatch failed with RequestCanceledError: " , err )
72+ p .log .Error ("getLogEventsFromCloudWatch failed with RequestCanceledError: " , errRequestCanceled )
6573 }
6674 p .log .Error ("getLogEventsFromCloudWatch failed: " , err )
6775 }
6876}
6977
7078// getLogEventsFromCloudWatch uses FilterLogEvents API to collect logs from CloudWatch
71- func (p * cloudwatchPoller ) getLogEventsFromCloudWatch (svc * cloudwatchlogs.Client , logGroup string , startTime int64 , endTime int64 , logProcessor * logProcessor ) error {
79+ func (p * cloudwatchPoller ) getLogEventsFromCloudWatch (svc * cloudwatchlogs.Client , logGroup string , startTime , endTime time. Time , logProcessor * logProcessor ) error {
7280 // construct FilterLogEventsInput
7381 filterLogEventsInput := p .constructFilterLogEventsInput (startTime , endTime , logGroup )
7482 paginator := cloudwatchlogs .NewFilterLogEventsPaginator (svc , filterLogEventsInput )
@@ -83,8 +91,8 @@ func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client
8391 p .metrics .logEventsReceivedTotal .Add (uint64 (len (logEvents )))
8492
8593 // This sleep is to avoid hitting the FilterLogEvents API limit(5 transactions per second (TPS)/account/Region).
86- p .log .Debugf ("sleeping for %v before making FilterLogEvents API call again" , p .apiSleep )
87- time .Sleep (p .apiSleep )
94+ p .log .Debugf ("sleeping for %v before making FilterLogEvents API call again" , p .config . APISleep )
95+ time .Sleep (p .config . APISleep )
8896 p .log .Debug ("done sleeping" )
8997
9098 p .log .Debugf ("Processing #%v events" , len (logEvents ))
@@ -93,21 +101,87 @@ func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client
93101 return nil
94102}
95103
96- func (p * cloudwatchPoller ) constructFilterLogEventsInput (startTime int64 , endTime int64 , logGroup string ) * cloudwatchlogs.FilterLogEventsInput {
104+ func (p * cloudwatchPoller ) constructFilterLogEventsInput (startTime , endTime time. Time , logGroup string ) * cloudwatchlogs.FilterLogEventsInput {
97105 filterLogEventsInput := & cloudwatchlogs.FilterLogEventsInput {
98106 LogGroupName : awssdk .String (logGroup ),
99- StartTime : awssdk .Int64 (startTime ),
100- EndTime : awssdk .Int64 (endTime ),
107+ StartTime : awssdk .Int64 (startTime . UnixNano () / int64 ( time . Millisecond ) ),
108+ EndTime : awssdk .Int64 (endTime . UnixNano () / int64 ( time . Millisecond ) ),
101109 }
102110
103- if len (p .logStreams ) > 0 {
104- for _ , stream := range p .logStreams {
111+ if len (p .config . LogStreams ) > 0 {
112+ for _ , stream := range p .config . LogStreams {
105113 filterLogEventsInput .LogStreamNames = append (filterLogEventsInput .LogStreamNames , * stream )
106114 }
107115 }
108116
109- if p .logStreamPrefix != "" {
110- filterLogEventsInput .LogStreamNamePrefix = awssdk .String (p .logStreamPrefix )
117+ if p .config . LogStreamPrefix != "" {
118+ filterLogEventsInput .LogStreamNamePrefix = awssdk .String (p .config . LogStreamPrefix )
111119 }
112120 return filterLogEventsInput
113121}
122+
123+ func (p * cloudwatchPoller ) startWorkers (
124+ ctx context.Context ,
125+ svc * cloudwatchlogs.Client ,
126+ logProcessor * logProcessor ,
127+ ) {
128+ for i := 0 ; i < p .config .NumberOfWorkers ; i ++ {
129+ p .workerWg .Add (1 )
130+ go func () {
131+ defer p .workerWg .Done ()
132+ for {
133+ var work workResponse
134+ select {
135+ case <- ctx .Done ():
136+ return
137+ case p .workRequestChan <- struct {}{}:
138+ work = <- p .workResponseChan
139+ }
140+
141+ p .log .Infof ("aws-cloudwatch input worker for log group: '%v' has started" , work .logGroup )
142+ p .run (svc , work .logGroup , work .startTime , work .endTime , logProcessor )
143+ p .log .Infof ("aws-cloudwatch input worker for log group '%v' has stopped." , work .logGroup )
144+ }
145+ }()
146+ }
147+ }
148+
149+ // receive implements the main run loop that distributes tasks to the worker
150+ // goroutines. It accepts a "clock" callback (which on a live input should
151+ // equal time.Now) to allow deterministic unit tests.
152+ func (p * cloudwatchPoller ) receive (ctx context.Context , logGroupNames []string , clock func () time.Time ) {
153+ defer p .workerWg .Wait ()
154+ // startTime and endTime are the bounds of the current scanning interval.
155+ // If we're starting at the end of the logs, advance the start time to the
156+ // most recent scan window
157+ var startTime time.Time
158+ endTime := clock ().Add (- p .config .Latency )
159+ if p .config .StartPosition == "end" {
160+ startTime = endTime .Add (- p .config .ScanFrequency )
161+ }
162+ for ctx .Err () == nil {
163+ for _ , lg := range logGroupNames {
164+ select {
165+ case <- ctx .Done ():
166+ return
167+ case <- p .workRequestChan :
168+ p .workResponseChan <- workResponse {
169+ logGroup : lg ,
170+ startTime : startTime ,
171+ endTime : endTime ,
172+ }
173+ }
174+ }
175+
176+ // Delay for ScanFrequency after finishing a time span
177+ p .log .Debugf ("sleeping for %v before checking new logs" , p .config .ScanFrequency )
178+ select {
179+ case <- time .After (p .config .ScanFrequency ):
180+ case <- ctx .Done ():
181+ }
182+ p .log .Debug ("done sleeping" )
183+
184+ // Advance to the next time span
185+ startTime , endTime = endTime , clock ().Add (- p .config .Latency )
186+ }
187+ }
0 commit comments