Skip to content

Commit cf13781

Browse files
authored
Fix handling of custom endpoints in AWS input (#41504)
Fix custom endpoint selection in the S3/SQS input (#39718) by porting @strawgate's 8.14 fix (#39709) to main. In addition to the previous fixes, this simplifies the logic for detecting queue region, since the 8.14 version still had some broken cases caused by requiring over-strict endpoint matching, and it was concluded (talking to @strawgate) that there's no advantage to rejecting standard region format from queue URLs just because the endpoint URL is different (if there is a genuine mismatch in the queue and endpoint we'll learn it from the connection attempt, not from `getRegionFromQueueURL`).
1 parent 87687a5 commit cf13781

8 files changed

Lines changed: 229 additions & 43 deletions

File tree

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
172172
- Improve modification time handling for entities and entity deletion logic in the Active Directory entityanalytics input. {pull}41179[41179]
173173
- Journald input now can read events from all boots {issue}41083[41083] {pull}41244[41244]
174174
- Fix double encoding of client_secret in the Entity Analytics input's Azure Active Directory provider {pull}41393[41393]
175+
- Fix errors in SQS host resolution in the `aws-s3` input when using custom (non-AWS) endpoints. {pull}41504[41504]
175176

176177
*Heartbeat*
177178

x-pack/filebeat/input/awss3/config.go

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package awss3
77
import (
88
"errors"
99
"fmt"
10+
"net/url"
1011
"time"
1112

1213
awssdk "github.com/aws/aws-sdk-go-v2/aws"
@@ -106,6 +107,13 @@ func (c *config) Validate() error {
106107
if c.ProviderOverride != "" && c.NonAWSBucketName == "" {
107108
return errors.New("provider can only be overridden when polling non-AWS S3 services")
108109
}
110+
if c.AWSConfig.Endpoint != "" {
111+
// Make sure the given endpoint can be parsed
112+
_, err := url.Parse(c.AWSConfig.Endpoint)
113+
if err != nil {
114+
return fmt.Errorf("failed to parse endpoint: %w", err)
115+
}
116+
}
109117
if c.BackupConfig.NonAWSBackupToBucketName != "" && c.NonAWSBucketName == "" {
110118
return errors.New("backup to non-AWS bucket can only be used for non-AWS sources")
111119
}
@@ -245,14 +253,18 @@ func (c config) getBucketARN() string {
245253
// options struct.
246254
// Should be provided as a parameter to s3.NewFromConfig.
247255
func (c config) s3ConfigModifier(o *s3.Options) {
248-
if c.NonAWSBucketName != "" {
249-
//nolint:staticcheck // haven't migrated to the new interface yet
250-
o.EndpointResolver = nonAWSBucketResolver{endpoint: c.AWSConfig.Endpoint}
251-
}
252-
253256
if c.AWSConfig.FIPSEnabled {
254257
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
255258
}
259+
// Apply slightly different endpoint resolvers depending on whether we're in S3 or SQS mode.
260+
if c.AWSConfig.Endpoint != "" {
261+
//nolint:staticcheck // haven't migrated to the new interface yet
262+
o.EndpointResolver = s3.EndpointResolverFromURL(c.AWSConfig.Endpoint,
263+
func(e *awssdk.Endpoint) {
264+
// The S3 hostname is immutable in bucket polling mode, mutable otherwise.
265+
e.HostnameImmutable = (c.getBucketARN() != "")
266+
})
267+
}
256268
o.UsePathStyle = c.PathStyle
257269

258270
o.Retryer = retry.NewStandard(func(so *retry.StandardOptions) {
@@ -269,6 +281,9 @@ func (c config) sqsConfigModifier(o *sqs.Options) {
269281
if c.AWSConfig.FIPSEnabled {
270282
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
271283
}
284+
if c.AWSConfig.Endpoint != "" {
285+
o.EndpointResolver = sqs.EndpointResolverFromURL(c.AWSConfig.Endpoint)
286+
}
272287
}
273288

274289
func (c config) getFileSelectors() []fileSelectorConfig {

x-pack/filebeat/input/awss3/input.go

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@ package awss3
77
import (
88
"fmt"
99

10-
awssdk "github.com/aws/aws-sdk-go-v2/aws"
11-
1210
"github.com/elastic/beats/v7/filebeat/beater"
1311
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
1412
"github.com/elastic/beats/v7/libbeat/feature"
@@ -48,15 +46,10 @@ func (im *s3InputManager) Create(cfg *conf.C) (v2.Input, error) {
4846
return nil, fmt.Errorf("initializing AWS config: %w", err)
4947
}
5048

51-
if config.AWSConfig.Endpoint != "" {
52-
// Add a custom endpointResolver to the awsConfig so that all the requests are routed to this endpoint
53-
awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) {
54-
return awssdk.Endpoint{
55-
PartitionID: "aws",
56-
URL: config.AWSConfig.Endpoint,
57-
SigningRegion: awsConfig.Region,
58-
}, nil
59-
})
49+
// The awsConfig now contains the region from the credential profile or default region
50+
// if the region is explicitly set in the config, then it wins
51+
if config.RegionName != "" {
52+
awsConfig.Region = config.RegionName
6053
}
6154

6255
if config.QueueURL != "" {

x-pack/filebeat/input/awss3/input_integration_test.go

Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,174 @@ func TestInputRunSQSOnLocalstack(t *testing.T) {
269269
assert.EqualValues(t, 0.0, s3Input.metrics.sqsWorkerUtilization.Get()) // Workers are reset after processing and hence utilization should be 0 at the end
270270
}
271271

272+
func TestInputRunSQSWithConfig(t *testing.T) {
273+
tests := []struct {
274+
name string
275+
queue_url string
276+
endpoint string
277+
region string
278+
default_region string
279+
want string
280+
wantErr error
281+
}{
282+
{
283+
name: "no region",
284+
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
285+
want: "us-east-1",
286+
},
287+
{
288+
name: "no region but with long endpoint",
289+
queue_url: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
290+
endpoint: "https://s3.us-east-1.abc.xyz",
291+
want: "us-east-1",
292+
},
293+
{
294+
name: "no region but with short endpoint",
295+
queue_url: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
296+
endpoint: "https://abc.xyz",
297+
want: "us-east-1",
298+
},
299+
{
300+
name: "no region custom queue domain",
301+
queue_url: "https://sqs.us-east-1.xyz.abc/627959692251/test-s3-logs",
302+
wantErr: errBadQueueURL,
303+
},
304+
{
305+
name: "region",
306+
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
307+
region: "us-west-2",
308+
want: "us-west-2",
309+
},
310+
{
311+
name: "default_region",
312+
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
313+
default_region: "us-west-2",
314+
want: "us-west-2",
315+
},
316+
{
317+
name: "region and default_region",
318+
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
319+
region: "us-east-2",
320+
default_region: "us-east-3",
321+
want: "us-east-2",
322+
},
323+
{
324+
name: "short_endpoint",
325+
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
326+
endpoint: "https://amazonaws.com",
327+
want: "us-east-1",
328+
},
329+
{
330+
name: "long_endpoint",
331+
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
332+
endpoint: "https://s3.us-east-1.amazonaws.com",
333+
want: "us-east-1",
334+
},
335+
{
336+
name: "region and custom short_endpoint",
337+
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
338+
region: "us-west-2",
339+
endpoint: "https://.elastic.co",
340+
want: "us-west-2",
341+
},
342+
{
343+
name: "region and custom long_endpoint",
344+
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
345+
region: "us-west-2",
346+
endpoint: "https://s3.us-east-1.elastic.co",
347+
want: "us-west-2",
348+
},
349+
{
350+
name: "region and short_endpoint",
351+
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
352+
region: "us-west-2",
353+
endpoint: "https://amazonaws.com",
354+
want: "us-west-2",
355+
},
356+
{
357+
name: "region and long_endpoint",
358+
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
359+
region: "us-west-2",
360+
endpoint: "https://s3.us-east-1.amazonaws.com",
361+
want: "us-west-2",
362+
},
363+
{
364+
name: "region and default region and short_endpoint",
365+
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
366+
region: "us-west-2",
367+
default_region: "us-east-1",
368+
endpoint: "https://amazonaws.com",
369+
want: "us-west-2",
370+
},
371+
{
372+
name: "region and default region and long_endpoint",
373+
queue_url: "https://sqs.us-east-1.amazonaws.com/627959692251/test-s3-logs",
374+
region: "us-west-2",
375+
default_region: "us-east-1",
376+
endpoint: "https://s3.us-east-1.amazonaws.com",
377+
want: "us-west-2",
378+
},
379+
}
380+
381+
for _, test := range tests {
382+
logp.TestingSetup()
383+
384+
// Create a filebeat config using the provided test parameters
385+
config := ""
386+
if test.queue_url != "" {
387+
config += fmt.Sprintf("queue_url: %s \n", test.queue_url)
388+
}
389+
if test.region != "" {
390+
config += fmt.Sprintf("region: %s \n", test.region)
391+
}
392+
if test.default_region != "" {
393+
config += fmt.Sprintf("default_region: %s \n", test.default_region)
394+
}
395+
if test.endpoint != "" {
396+
config += fmt.Sprintf("endpoint: %s \n", test.endpoint)
397+
}
398+
399+
s3Input := createInput(t, conf.MustNewConfigFrom(config))
400+
401+
inputCtx, cancel := newV2Context()
402+
t.Cleanup(cancel)
403+
time.AfterFunc(5*time.Second, func() {
404+
cancel()
405+
})
406+
407+
var errGroup errgroup.Group
408+
errGroup.Go(func() error {
409+
return s3Input.Run(inputCtx, &fakePipeline{})
410+
})
411+
412+
if err := errGroup.Wait(); err != nil {
413+
// assert that err == test.wantErr
414+
if test.wantErr != nil {
415+
continue
416+
}
417+
// Print the test name to help identify the failing test
418+
t.Fatal(test.name, err)
419+
}
420+
421+
// If the endpoint starts with s3, the endpoint resolver should be null at this point
422+
// If the endpoint does not start with s3, the endpointresolverwithoptions should be set
423+
// If the endpoint is not set, the endpoint resolver should be null
424+
if test.endpoint == "" {
425+
assert.Nil(t, s3Input.awsConfig.EndpointResolver, test.name)
426+
assert.Nil(t, s3Input.awsConfig.EndpointResolverWithOptions, test.name)
427+
} else if strings.HasPrefix(test.endpoint, "https://s3") {
428+
// S3 resolvers are added later in the code than this integration test covers
429+
assert.Nil(t, s3Input.awsConfig.EndpointResolver, test.name)
430+
assert.Nil(t, s3Input.awsConfig.EndpointResolverWithOptions, test.name)
431+
} else { // If the endpoint is specified but is not s3
432+
assert.Nil(t, s3Input.awsConfig.EndpointResolver, test.name)
433+
assert.NotNil(t, s3Input.awsConfig.EndpointResolverWithOptions, test.name)
434+
}
435+
436+
assert.EqualValues(t, test.want, s3Input.awsConfig.Region, test.name)
437+
}
438+
}
439+
272440
func TestInputRunSQS(t *testing.T) {
273441
logp.TestingSetup()
274442

x-pack/filebeat/input/awss3/input_test.go

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,20 @@ func TestRegionSelection(t *testing.T) {
8888
want: "us-west-3",
8989
},
9090
{
91-
name: "abc.xyz_and_domain_with_blank_endpoint",
91+
name: "abc.xyz_and_domain_with_matching_endpoint_and_scheme",
9292
queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
93+
endpoint: "https://abc.xyz",
94+
want: "us-east-1",
95+
},
96+
{
97+
name: "abc.xyz_and_domain_with_matching_url_endpoint",
98+
queueURL: "https://sqs.us-east-1.abc.xyz/627959692251/test-s3-logs",
99+
endpoint: "https://s3.us-east-1.abc.xyz",
100+
want: "us-east-1",
101+
},
102+
{
103+
name: "abc.xyz_and_no_region_term",
104+
queueURL: "https://sqs.abc.xyz/627959692251/test-s3-logs",
93105
wantErr: errBadQueueURL,
94106
},
95107
{
@@ -118,7 +130,7 @@ func TestRegionSelection(t *testing.T) {
118130
{
119131
name: "non_aws_vpce_without_endpoint",
120132
queueURL: "https://vpce-test.sqs.us-east-1.vpce.abc.xyz/12345678912/sqs-queue",
121-
wantErr: errBadQueueURL,
133+
want: "us-east-1",
122134
},
123135
{
124136
name: "non_aws_vpce_with_region_override",

x-pack/filebeat/input/awss3/s3.go

Lines changed: 0 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -110,12 +110,3 @@ func getProviderFromDomain(endpoint string, ProviderOverride string) string {
110110
}
111111
return "unknown"
112112
}
113-
114-
type nonAWSBucketResolver struct {
115-
endpoint string
116-
}
117-
118-
func (n nonAWSBucketResolver) ResolveEndpoint(region string, options s3.EndpointResolverOptions) (awssdk.Endpoint, error) {
119-
//nolint:staticcheck // haven't migrated to the new interface yet
120-
return awssdk.Endpoint{URL: n.endpoint, SigningRegion: region, HostnameImmutable: true, Source: awssdk.EndpointSourceCustom}, nil
121-
}

x-pack/filebeat/input/awss3/sqs.go

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,29 +31,30 @@ const (
3131

3232
var errBadQueueURL = errors.New("QueueURL is not in format: https://sqs.{REGION_ENDPOINT}.{ENDPOINT}/{ACCOUNT_NUMBER}/{QUEUE_NAME} or https://{VPC_ENDPOINT}.sqs.{REGION_ENDPOINT}.vpce.{ENDPOINT}/{ACCOUNT_NUMBER}/{QUEUE_NAME}")
3333

34-
func getRegionFromQueueURL(queueURL, endpoint string) string {
34+
func getRegionFromQueueURL(queueURL string) string {
3535
// get region from queueURL
36+
// Example for custom domain queue: https://sqs.us-east-1.abc.xyz/12345678912/test-s3-logs
3637
// Example for sqs queue: https://sqs.us-east-1.amazonaws.com/12345678912/test-s3-logs
3738
// Example for vpce: https://vpce-test.sqs.us-east-1.vpce.amazonaws.com/12345678912/sqs-queue
39+
// We use a simple heuristic that works for all essential cases:
40+
// - If queue hostname is sqs.X.*, return region X
41+
// - If queue hostname is X.sqs.Y.*, return region Y
42+
// Hosts that don't follow this convention need the input config to
43+
// specify a custom endpoint and an explicit region.
3844
u, err := url.Parse(queueURL)
3945
if err != nil {
4046
return ""
4147
}
48+
hostSplit := strings.SplitN(u.Hostname(), ".", 5)
4249

43-
// check for sqs queue url
44-
host := strings.SplitN(u.Host, ".", 3)
45-
if len(host) == 3 && host[0] == "sqs" {
46-
if host[2] == endpoint || (endpoint == "" && strings.HasPrefix(host[2], "amazonaws.")) {
47-
return host[1]
48-
}
50+
// check for sqs-style queue url
51+
if len(hostSplit) >= 4 && hostSplit[0] == "sqs" {
52+
return hostSplit[1]
4953
}
5054

51-
// check for vpce url
52-
host = strings.SplitN(u.Host, ".", 5)
53-
if len(host) == 5 && host[1] == "sqs" {
54-
if host[4] == endpoint || (endpoint == "" && strings.HasPrefix(host[4], "amazonaws.")) {
55-
return host[2]
56-
}
55+
// check for vpce-style url
56+
if len(hostSplit) == 5 && hostSplit[1] == "sqs" {
57+
return hostSplit[2]
5758
}
5859

5960
return ""

x-pack/filebeat/input/awss3/sqs_input.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -88,14 +88,19 @@ func (in *sqsReaderInput) setup(
8888
in.log = inputContext.Logger.With("queue_url", in.config.QueueURL)
8989
in.pipeline = pipeline
9090

91-
in.detectedRegion = getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint)
91+
in.detectedRegion = getRegionFromQueueURL(in.config.QueueURL)
9292
if in.config.RegionName != "" {
93+
// Configured region always takes precedence
9394
in.awsConfig.Region = in.config.RegionName
9495
} else if in.detectedRegion != "" {
96+
// Only use detected region if there is no explicit region configured.
9597
in.awsConfig.Region = in.detectedRegion
98+
} else if in.config.AWSConfig.DefaultRegion != "" {
99+
// If we can't find anything else, fall back on the default.
100+
in.awsConfig.Region = in.config.AWSConfig.DefaultRegion
96101
} else {
97-
// If we can't get a region from the config or the URL, return an error.
98-
return fmt.Errorf("failed to get AWS region from queue_url: %w", errBadQueueURL)
102+
// If we can't find a usable region, return an error
103+
return fmt.Errorf("region not specified and failed to get AWS region from queue_url: %w", errBadQueueURL)
99104
}
100105

101106
in.sqs = &awsSQSAPI{

0 commit comments

Comments
 (0)