Skip to content

Commit 42e50cf

Browse files
feat: Introduce lastSync start position to AWS CloudWatch input backed by state registry (#43251)
* add state registry support for cloudwatch input Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co> finalize AWS CW lastSync start position usage Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co> update changelog next Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co> review changes Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co> review changes - improve state store id, fix timestamp and return nil for error Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co> documentation update with md migration Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co> # Conflicts: # x-pack/filebeat/input/awscloudwatch/config.go * review change: use ackers for confirmed delivery prior to saving state Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co> * review: documentation updates Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co> * review changes : ctx handling, shutdown improvement and race condition handling Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co> * review : fix ctx condition Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co> --------- Signed-off-by: Kavindu Dodanduwa <kavindu.dodanduwa@elastic.co>
1 parent 08e4973 commit 42e50cf

18 files changed

Lines changed: 911 additions & 161 deletions

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -447,6 +447,7 @@ otherwise no tag is added. {issue}42208[42208] {pull}42403[42403]
447447
- Fix handling of ADC (Application Default Credentials) metadata server credentials in HTTPJSON input. {issue}44349[44349] {pull}44436[44436]
448448
- Fix handling of ADC (Application Default Credentials) metadata server credentials in CEL input. {issue}44349[44349] {pull}44571[44571]
449449
- Added support for specifying custom content-types and encodings in azureblobstorage input. {issue}44330[44330] {pull}44402[44402]
450+
- Introduce lastSync start position to AWS CloudWatch input backed by state registry. {pull}43251[43251]
450451

451452
*Auditbeat*
452453

NOTICE.txt

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59608,6 +59608,37 @@ Licence type (autodetected): BSD-3-Clause
5960859608

5960959609
No licence file provided.
5961059610

59611+
--------------------------------------------------------------------------------
59612+
Dependency : github.com/segmentio/fasthash
59613+
Version: v1.0.3
59614+
Licence type (autodetected): MIT
59615+
--------------------------------------------------------------------------------
59616+
59617+
Contents of probable licence file $GOMODCACHE/github.com/segmentio/fasthash@v1.0.3/LICENSE:
59618+
59619+
MIT License
59620+
59621+
Copyright (c) 2017 Segment
59622+
59623+
Permission is hereby granted, free of charge, to any person obtaining a copy
59624+
of this software and associated documentation files (the "Software"), to deal
59625+
in the Software without restriction, including without limitation the rights
59626+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
59627+
copies of the Software, and to permit persons to whom the Software is
59628+
furnished to do so, subject to the following conditions:
59629+
59630+
The above copyright notice and this permission notice shall be included in all
59631+
copies or substantial portions of the Software.
59632+
59633+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
59634+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
59635+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
59636+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
59637+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
59638+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
59639+
SOFTWARE.
59640+
59641+
5961159642
--------------------------------------------------------------------------------
5961259643
Dependency : github.com/sergi/go-diff
5961359644
Version: v1.3.1

docs/reference/filebeat/filebeat-input-aws-cloudwatch.md

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -77,22 +77,29 @@ A string to filter the results to include only log events from log streams that
7777

7878
### `start_position` [_start_position]
7979

80-
`start_position` allows user to specify if this input should read log files from the `beginning` or from the `end`.
80+
`start_position` allows the user to specify if this input should read log files starting from the `beginning`, the `end`, or from the last known successful sync (`lastSync`).
8181

82-
* `beginning`: reads from the beginning of the log group (default).
83-
* `end`: read only new messages from current time minus `scan_frequency` going forward
82+
* `beginning`: Read messages starting from the beginning of the log group (default).
83+
* `end`: Read messages starting from the current time minus `scan_frequency`.
84+
* `lastSync`: Read messages starting from the last known sync time, if available. If there is no last known sync, then
85+
fall back to the default mode (`beginning`). This value is stored in the registry, so it persists across restarts.
8486

85-
For example, with `scan_frequency` equals to `30s` and current timestamp is `2020-06-24 12:00:00`:
87+
For example, in the case where `scan_frequency: 30s` and the current timestamp is `2020-06-24 12:00:00`:
8688

87-
* with `start_position = beginning`:
89+
* If `start_position: beginning`, reading starts from the earliest possible timestamp of unix epoch zero value:
8890

89-
* first iteration: startTime=0, endTime=2020-06-24 12:00:00
90-
* second iteration: startTime=2020-06-24 12:00:00, endTime=2020-06-24 12:00:30
91+
* First read: `startTime=0`, `endTime=2020-06-24 12:00:00`
92+
* Next read: `startTime=2020-06-24 12:00:00`, `endTime=2020-06-24 12:00:30`
9193

92-
* with `start_position = end`:
94+
* If `start_position: end`, reading starts with a look back that equals the `scan_frequency`:
9395

94-
* first iteration: startTime=2020-06-24 11:59:30, endTime=2020-06-24 12:00:00
95-
* second iteration: startTime=2020-06-24 12:00:00, endTime=2020-06-24 12:00:30
96+
* First read: `startTime=2020-06-24 11:59:30`, `endTime=2020-06-24 12:00:00`
97+
* Next read: `startTime=2020-06-24 12:00:00`, `endTime=2020-06-24 12:00:30`
98+
99+
* If `start_position: lastSync`, reading starts from the last known sync timestamp. Assuming the last sync timestamp stored in the registry is `2020-06-23 12:00:00`:
100+
101+
* First read: `startTime=2020-06-23 12:00:00`, `endTime=2020-06-24 12:00:00`
102+
* Next read: `startTime=2020-06-24 12:00:00`, `endTime=2020-06-24 12:00:30`
96103

97104

98105

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -390,6 +390,7 @@ require (
390390
github.com/prometheus/client_golang v1.22.0 // indirect
391391
github.com/rogpeppe/go-internal v1.13.1 // indirect
392392
github.com/rs/cors v1.11.1 // indirect
393+
github.com/segmentio/fasthash v1.0.3 // indirect
393394
github.com/sergi/go-diff v1.3.1 // indirect
394395
github.com/sirupsen/logrus v1.9.3 // indirect
395396
github.com/spiffe/go-spiffe/v2 v2.5.0 // indirect

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -908,6 +908,8 @@ github.com/samuel/go-parser v0.0.0-20130731160455-ca8abbf65d0e h1:hUGyBE/4CXRPTh
908908
github.com/samuel/go-parser v0.0.0-20130731160455-ca8abbf65d0e/go.mod h1:Sb6li54lXV0yYEjI4wX8cucdQ9gqUJV3+Ngg3l9g30I=
909909
github.com/samuel/go-thrift v0.0.0-20140522043831-2187045faa54 h1:jbchLJWyhKcmOjkbC4zDvT/n5EEd7g6hnnF760rEyRA=
910910
github.com/samuel/go-thrift v0.0.0-20140522043831-2187045faa54/go.mod h1:Vrkh1pnjV9Bl8c3P9zH0/D4NlOHWP5d4/hF4YTULaec=
911+
github.com/segmentio/fasthash v1.0.3 h1:EI9+KE1EwvMLBWwjpRDc+fEM+prwxDYbslddQGtrmhM=
912+
github.com/segmentio/fasthash v1.0.3/go.mod h1:waKX8l2N8yckOgmSsXJi7x1ZfdKZ4x7KRMzBtS3oedY=
911913
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
912914
github.com/sergi/go-diff v1.3.1 h1:xkr+Oxo4BOQKmkn/B9eMK0g5Kg/983T9DqqPHwYqD+8=
913915
github.com/sergi/go-diff v1.3.1/go.mod h1:aMJSSKb2lpPvRNec0+w3fl7LP9IOFzdc9Pa4NFbPK1I=

x-pack/filebeat/input/awscloudwatch/cloudwatch.go

Lines changed: 37 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,22 @@ package awscloudwatch
66

77
import (
88
"context"
9-
"errors"
10-
"fmt"
119
"sync"
1210
"time"
1311

14-
awssdk "github.com/aws/aws-sdk-go-v2/aws"
1512
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
1613

14+
"github.com/elastic/beats/v7/libbeat/beat"
1715
"github.com/elastic/elastic-agent-libs/logp"
1816
)
1917

2018
type cloudwatchPoller struct {
21-
config config
22-
region string
23-
log *logp.Logger
24-
metrics *inputMetrics
19+
config config
20+
region string
21+
log *logp.Logger
22+
metrics *inputMetrics
23+
stateHandler *stateHandler
24+
2525
workersListingMap *sync.Map
2626
workersProcessingMap *sync.Map
2727

@@ -42,7 +42,7 @@ type workResponse struct {
4242
}
4343

4444
func newCloudwatchPoller(log *logp.Logger, metrics *inputMetrics,
45-
awsRegion string, config config) *cloudwatchPoller {
45+
awsRegion string, config config, stateHandler *stateHandler) *cloudwatchPoller {
4646
if metrics == nil {
4747
metrics = newInputMetrics("", nil)
4848
}
@@ -52,6 +52,7 @@ func newCloudwatchPoller(log *logp.Logger, metrics *inputMetrics,
5252
metrics: metrics,
5353
region: awsRegion,
5454
config: config,
55+
stateHandler: stateHandler,
5556
workersListingMap: new(sync.Map),
5657
workersProcessingMap: new(sync.Map),
5758
// workRequestChan is unbuffered to guarantee that
@@ -64,85 +65,18 @@ func newCloudwatchPoller(log *logp.Logger, metrics *inputMetrics,
6465
}
6566
}
6667

67-
func (p *cloudwatchPoller) run(svc *cloudwatchlogs.Client, logGroupId string, startTime, endTime time.Time, logProcessor *logProcessor) {
68-
err := p.getLogEventsFromCloudWatch(svc, logGroupId, startTime, endTime, logProcessor)
69-
if err != nil {
70-
var errRequestCanceled *awssdk.RequestCanceledError
71-
if errors.As(err, &errRequestCanceled) {
72-
p.log.Error("getLogEventsFromCloudWatch failed with RequestCanceledError: ", errRequestCanceled)
73-
}
74-
p.log.Error("getLogEventsFromCloudWatch failed: ", err)
75-
}
76-
}
77-
78-
// getLogEventsFromCloudWatch uses FilterLogEvents API to collect logs from CloudWatch
79-
func (p *cloudwatchPoller) getLogEventsFromCloudWatch(svc *cloudwatchlogs.Client, logGroupId string, startTime, endTime time.Time, logProcessor *logProcessor) error {
80-
// construct FilterLogEventsInput
81-
filterLogEventsInput := p.constructFilterLogEventsInput(startTime, endTime, logGroupId)
82-
paginator := cloudwatchlogs.NewFilterLogEventsPaginator(svc, filterLogEventsInput)
83-
for paginator.HasMorePages() {
84-
filterLogEventsOutput, err := paginator.NextPage(context.TODO())
85-
if err != nil {
86-
return fmt.Errorf("error FilterLogEvents with Paginator: %w", err)
87-
}
88-
89-
p.metrics.apiCallsTotal.Inc()
90-
logEvents := filterLogEventsOutput.Events
91-
p.metrics.logEventsReceivedTotal.Add(uint64(len(logEvents)))
92-
93-
// This sleep is to avoid hitting the FilterLogEvents API limit(5 transactions per second (TPS)/account/Region).
94-
p.log.Debugf("sleeping for %v before making FilterLogEvents API call again", p.config.APISleep)
95-
time.Sleep(p.config.APISleep)
96-
p.log.Debug("done sleeping")
97-
98-
p.log.Debugf("Processing #%v events", len(logEvents))
99-
logProcessor.processLogEvents(logEvents, logGroupId, p.region)
100-
}
101-
return nil
102-
}
103-
104-
func (p *cloudwatchPoller) constructFilterLogEventsInput(startTime, endTime time.Time, logGroupId string) *cloudwatchlogs.FilterLogEventsInput {
105-
p.log.Debugf("FilterLogEventsInput for log group: '%s' with startTime = '%v' and endTime = '%v'", logGroupId, unixMsFromTime(startTime), unixMsFromTime(endTime))
106-
filterLogEventsInput := &cloudwatchlogs.FilterLogEventsInput{
107-
LogGroupIdentifier: awssdk.String(logGroupId),
108-
StartTime: awssdk.Int64(unixMsFromTime(startTime)),
109-
EndTime: awssdk.Int64(unixMsFromTime(endTime)),
110-
}
111-
112-
if len(p.config.LogStreams) > 0 {
113-
for _, stream := range p.config.LogStreams {
114-
filterLogEventsInput.LogStreamNames = append(filterLogEventsInput.LogStreamNames, *stream)
115-
}
116-
}
117-
118-
if p.config.LogStreamPrefix != "" {
119-
filterLogEventsInput.LogStreamNamePrefix = awssdk.String(p.config.LogStreamPrefix)
120-
}
121-
return filterLogEventsInput
122-
}
123-
124-
func (p *cloudwatchPoller) startWorkers(
125-
ctx context.Context,
126-
svc *cloudwatchlogs.Client,
127-
logProcessor *logProcessor,
128-
) {
68+
func (p *cloudwatchPoller) startWorkers(ctx context.Context, svc *cloudwatchlogs.Client, pipeline beat.Pipeline) {
12969
for i := 0; i < p.config.NumberOfWorkers; i++ {
13070
p.workerWg.Add(1)
13171
go func() {
13272
defer p.workerWg.Done()
133-
for {
134-
var work workResponse
135-
select {
136-
case <-ctx.Done():
137-
return
138-
case p.workRequestChan <- struct{}{}:
139-
work = <-p.workResponseChan
140-
}
14173

142-
p.log.Infof("aws-cloudwatch input worker for log group: '%v' has started", work.logGroupId)
143-
p.run(svc, work.logGroupId, work.startTime, work.endTime, logProcessor)
144-
p.log.Infof("aws-cloudwatch input worker for log group '%v' has stopped.", work.logGroupId)
74+
worker, err := newCWWorker(p.config, p.region, p.metrics, svc, pipeline, p.log)
75+
if err != nil {
76+
p.log.Error("Error creating CloudWatch worker: ", err)
77+
return
14578
}
79+
worker.Start(ctx, p.workRequestChan, p.workResponseChan, p.stateHandler)
14680
}()
14781
}
14882
}
@@ -152,15 +86,33 @@ func (p *cloudwatchPoller) startWorkers(
15286
// equal time.Now) to allow deterministic unit tests.
15387
func (p *cloudwatchPoller) receive(ctx context.Context, logGroupIDs []string, clock func() time.Time) {
15488
defer p.workerWg.Wait()
89+
15590
// startTime and endTime are the bounds of the current scanning interval.
156-
// If we're starting at the end of the logs, advance the start time to the
157-
// most recent scan window
158-
var startTime time.Time
15991
endTime := clock().Add(-p.config.Latency)
160-
if p.config.StartPosition == "end" {
92+
93+
var startTime time.Time
94+
// If we're starting at the end of the logs, advance the start time to the most recent scan window
95+
if p.config.StartPosition == end {
16196
startTime = endTime.Add(-p.config.ScanFrequency)
16297
}
98+
99+
if p.config.StartPosition == beginning {
100+
startTime = time.Unix(0, 0)
101+
}
102+
103+
if p.config.StartPosition == lastSync {
104+
state, err := p.stateHandler.GetState()
105+
if err != nil {
106+
p.log.Errorf("error retrieving state from stateHandler: %v, falling back to %s", err, beginning)
107+
startTime = time.Unix(0, 0)
108+
} else {
109+
startTime = time.UnixMilli(state.LastSyncEpoch)
110+
}
111+
}
112+
163113
for ctx.Err() == nil {
114+
p.stateHandler.WorkRegister(endTime.UnixMilli(), len(logGroupIDs))
115+
164116
for _, lg := range logGroupIDs {
165117
select {
166118
case <-ctx.Done():

x-pack/filebeat/input/awscloudwatch/cloudwatch_test.go

Lines changed: 12 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,6 @@ import (
99
"testing"
1010
"time"
1111

12-
awssdk "github.com/aws/aws-sdk-go-v2/aws"
13-
"github.com/aws/aws-sdk-go-v2/service/cloudwatchlogs"
1412
"github.com/stretchr/testify/assert"
1513

1614
"github.com/elastic/elastic-agent-libs/logp"
@@ -40,7 +38,7 @@ type receiveTestCase struct {
4038
func TestReceive(t *testing.T) {
4139
// We use a mocked clock so scan frequency can be any positive value.
4240
const defaultScanFrequency = time.Microsecond
43-
t0 := time.Time{}
41+
t0 := time.Unix(0, 0)
4442
t1 := t0.Add(time.Hour)
4543
t2 := t1.Add(time.Minute)
4644
t3 := t2.Add(time.Hour)
@@ -107,7 +105,7 @@ func TestReceive(t *testing.T) {
107105
logGroupIDs: []string{"a"},
108106
startTime: t1,
109107
configOverrides: func(c *config) {
110-
c.StartPosition = "end"
108+
c.StartPosition = end
111109
},
112110
steps: []receiveTestStep{
113111
{
@@ -128,7 +126,7 @@ func TestReceive(t *testing.T) {
128126
logGroupIDs: []string{"a", "b"},
129127
startTime: t1,
130128
configOverrides: func(c *config) {
131-
c.StartPosition = "end"
129+
c.StartPosition = end
132130
c.Latency = time.Second
133131
},
134132
steps: []receiveTestStep{
@@ -176,16 +174,24 @@ func TestReceive(t *testing.T) {
176174
clock := &clock{}
177175
for stepIndex, test := range testCases {
178176
ctx, cancel := context.WithCancel(context.Background())
177+
178+
cfg := defaultConfig()
179+
cfg.LogGroupName = "LogGroup"
180+
181+
handler, err := newStateHandler(nil, cfg, createTestInputStore())
182+
assert.Nil(t, err)
183+
179184
p := &cloudwatchPoller{
180185
workRequestChan: make(chan struct{}),
181186
// Unlike the live cwPoller, we make workResponseChan unbuffered,
182187
// so we can guarantee that clock updates happen when cwPoller has already
183188
// decided on its output
184189
workResponseChan: make(chan workResponse),
185190
log: logp.NewLogger("test"),
191+
stateHandler: handler,
186192
}
187193

188-
p.config = defaultConfig()
194+
p.config = cfg
189195
p.config.ScanFrequency = defaultScanFrequency
190196
if test.configOverrides != nil {
191197
test.configOverrides(&p.config)
@@ -207,41 +213,3 @@ func TestReceive(t *testing.T) {
207213
cancel()
208214
}
209215
}
210-
211-
type filterLogEventsTestCase struct {
212-
name string
213-
logGroupId string
214-
startTime time.Time
215-
endTime time.Time
216-
expected *cloudwatchlogs.FilterLogEventsInput
217-
}
218-
219-
func TestFilterLogEventsInput(t *testing.T) {
220-
now, _ := time.Parse(time.RFC3339, "2024-07-12T13:00:00+00:00")
221-
id := "myLogGroup"
222-
223-
testCases := []filterLogEventsTestCase{
224-
{
225-
name: "StartPosition: beginning, first iteration",
226-
logGroupId: id,
227-
// The zero value of type time.Time{} is January 1, year 1, 00:00:00.000000000 UTC
228-
// Events with a timestamp before the time - January 1, 1970, 00:00:00 UTC are not returned by AWS API
229-
// make sure zero value of time.Time{} was converted
230-
startTime: time.Time{},
231-
endTime: now,
232-
expected: &cloudwatchlogs.FilterLogEventsInput{
233-
LogGroupIdentifier: awssdk.String(id),
234-
StartTime: awssdk.Int64(0),
235-
EndTime: awssdk.Int64(1720789200000),
236-
},
237-
},
238-
}
239-
for _, test := range testCases {
240-
p := cloudwatchPoller{
241-
log: logp.NewLogger("test"),
242-
}
243-
result := p.constructFilterLogEventsInput(test.startTime, test.endTime, test.logGroupId)
244-
assert.Equal(t, test.expected, result)
245-
}
246-
247-
}

0 commit comments

Comments
 (0)