Skip to content

Commit ae25a20

Browse files
committed
Pass metadata using s3Info struct to avoid adding new func params
1 parent e8a0f65 commit ae25a20

2 files changed

Lines changed: 18 additions & 18 deletions

File tree

x-pack/filebeat/input/awss3/collector.go

Lines changed: 16 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ type s3Info struct {
6060
key string
6161
region string
6262
arn string
63+
meta map[string]interface{} // S3 object metadata.
6364
readerConfig
6465
}
6566

@@ -404,12 +405,12 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info,
404405
*resp.ContentType = info.readerConfig.ContentType
405406
}
406407

407-
meta := s3Metadata(resp, info.IncludeS3Metadata...)
408+
info.meta = s3Metadata(resp, info.IncludeS3Metadata...)
408409

409410
// Decode JSON documents when content-type is "application/json" or expand_event_list_from_field is given in config
410411
if resp.ContentType != nil && *resp.ContentType == "application/json" || info.ExpandEventListFromField != "" {
411412
decoder := json.NewDecoder(bodyReader)
412-
err := c.decodeJSON(decoder, objectHash, info, s3Ctx, meta)
413+
err := c.decodeJSON(decoder, objectHash, info, s3Ctx)
413414
if err != nil {
414415
return fmt.Errorf("decodeJSONWithKey failed for '%s' from S3 bucket '%s': %w", info.key, info.name, err)
415416
}
@@ -456,7 +457,7 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info,
456457
if err != nil {
457458
return fmt.Errorf("error reading message: %w", err)
458459
}
459-
event := createEvent(string(message.Content), offset, info, objectHash, s3Ctx, meta)
460+
event := createEvent(string(message.Content), offset, info, objectHash, s3Ctx)
460461
event.Fields.DeepUpdate(message.Fields)
461462
offset += int64(message.Bytes)
462463
if err = c.forwardEvent(event); err != nil {
@@ -466,7 +467,7 @@ func (c *s3Collector) createEventsFromS3Info(svc s3iface.ClientAPI, info s3Info,
466467
return nil
467468
}
468469

469-
func (c *s3Collector) decodeJSON(decoder *json.Decoder, objectHash string, s3Info s3Info, s3Ctx *s3Context, meta common.MapStr) error {
470+
func (c *s3Collector) decodeJSON(decoder *json.Decoder, objectHash string, s3Info s3Info, s3Ctx *s3Context) error {
470471
var offset int64
471472
for {
472473
var jsonFields interface{}
@@ -476,7 +477,7 @@ func (c *s3Collector) decodeJSON(decoder *json.Decoder, objectHash string, s3Inf
476477
}
477478

478479
if err == io.EOF {
479-
offsetNew, err := c.jsonFieldsType(jsonFields, offset, objectHash, s3Info, s3Ctx, meta)
480+
offsetNew, err := c.jsonFieldsType(jsonFields, offset, objectHash, s3Info, s3Ctx)
480481
if err != nil {
481482
return err
482483
}
@@ -488,14 +489,14 @@ func (c *s3Collector) decodeJSON(decoder *json.Decoder, objectHash string, s3Inf
488489
return nil
489490
}
490491

491-
offset, err = c.jsonFieldsType(jsonFields, offset, objectHash, s3Info, s3Ctx, meta)
492+
offset, err = c.jsonFieldsType(jsonFields, offset, objectHash, s3Info, s3Ctx)
492493
if err != nil {
493494
return err
494495
}
495496
}
496497
}
497498

498-
func (c *s3Collector) jsonFieldsType(jsonFields interface{}, offset int64, objectHash string, s3Info s3Info, s3Ctx *s3Context, meta common.MapStr) (int64, error) {
499+
func (c *s3Collector) jsonFieldsType(jsonFields interface{}, offset int64, objectHash string, s3Info s3Info, s3Ctx *s3Context) (int64, error) {
499500
switch f := jsonFields.(type) {
500501
case map[string][]interface{}:
501502
if s3Info.ExpandEventListFromField != "" {
@@ -506,7 +507,7 @@ func (c *s3Collector) jsonFieldsType(jsonFields interface{}, offset int64, objec
506507
return offset, err
507508
}
508509
for _, v := range textValues {
509-
offset, err := c.convertJSONToEvent(v, offset, objectHash, s3Info, s3Ctx, meta)
510+
offset, err := c.convertJSONToEvent(v, offset, objectHash, s3Info, s3Ctx)
510511
if err != nil {
511512
err = fmt.Errorf("convertJSONToEvent failed for '%s' from S3 bucket '%s': %w", s3Info.key, s3Info.name, err)
512513
c.logger.Error(err)
@@ -526,7 +527,7 @@ func (c *s3Collector) jsonFieldsType(jsonFields interface{}, offset int64, objec
526527

527528
valuesConverted := textValues.([]interface{})
528529
for _, textValue := range valuesConverted {
529-
offsetNew, err := c.convertJSONToEvent(textValue, offset, objectHash, s3Info, s3Ctx, meta)
530+
offsetNew, err := c.convertJSONToEvent(textValue, offset, objectHash, s3Info, s3Ctx)
530531
if err != nil {
531532
err = fmt.Errorf("convertJSONToEvent failed for '%s' from S3 bucket '%s': %w", s3Info.key, s3Info.name, err)
532533
c.logger.Error(err)
@@ -537,7 +538,7 @@ func (c *s3Collector) jsonFieldsType(jsonFields interface{}, offset int64, objec
537538
return offset, nil
538539
}
539540

540-
offset, err := c.convertJSONToEvent(f, offset, objectHash, s3Info, s3Ctx, meta)
541+
offset, err := c.convertJSONToEvent(f, offset, objectHash, s3Info, s3Ctx)
541542
if err != nil {
542543
err = fmt.Errorf("convertJSONToEvent failed for '%s' from S3 bucket '%s': %w", s3Info.key, s3Info.name, err)
543544
c.logger.Error(err)
@@ -548,12 +549,12 @@ func (c *s3Collector) jsonFieldsType(jsonFields interface{}, offset int64, objec
548549
return offset, nil
549550
}
550551

551-
func (c *s3Collector) convertJSONToEvent(jsonFields interface{}, offset int64, objectHash string, s3Info s3Info, s3Ctx *s3Context, meta common.MapStr) (int64, error) {
552+
func (c *s3Collector) convertJSONToEvent(jsonFields interface{}, offset int64, objectHash string, s3Info s3Info, s3Ctx *s3Context) (int64, error) {
552553
vJSON, _ := json.Marshal(jsonFields)
553554
logOriginal := string(vJSON)
554555
log := trimLogDelimiter(logOriginal)
555556
offset += int64(len(log))
556-
event := createEvent(log, offset, s3Info, objectHash, s3Ctx, meta)
557+
event := createEvent(log, offset, s3Info, objectHash, s3Ctx)
557558

558559
err := c.forwardEvent(event)
559560
if err != nil {
@@ -596,7 +597,7 @@ func trimLogDelimiter(log string) string {
596597
return strings.TrimSuffix(log, "\n")
597598
}
598599

599-
func createEvent(log string, offset int64, info s3Info, objectHash string, s3Ctx *s3Context, meta common.MapStr) beat.Event {
600+
func createEvent(log string, offset int64, info s3Info, objectHash string, s3Ctx *s3Context) beat.Event {
600601
s3Ctx.Inc()
601602

602603
event := beat.Event{
@@ -628,8 +629,8 @@ func createEvent(log string, offset int64, info s3Info, objectHash string, s3Ctx
628629
}
629630
event.SetID(objectID(objectHash, offset))
630631

631-
if len(meta) > 0 {
632-
event.Fields.Put("aws.s3.metadata", meta)
632+
if len(info.meta) > 0 {
633+
event.Fields.Put("aws.s3.metadata", info.meta)
633634
}
634635

635636
return event

x-pack/filebeat/input/awss3/collector_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ import (
2424
"github.com/stretchr/testify/assert"
2525

2626
"github.com/elastic/beats/v7/libbeat/beat"
27-
"github.com/elastic/beats/v7/libbeat/common"
2827
"github.com/elastic/beats/v7/libbeat/reader"
2928
"github.com/elastic/beats/v7/libbeat/reader/readfile"
3029
"github.com/elastic/beats/v7/libbeat/reader/readfile/encoding"
@@ -324,12 +323,12 @@ func TestCreateEvent(t *testing.T) {
324323
break
325324
}
326325
if err == io.EOF {
327-
event := createEvent(log, int64(len(log)), s3Info, s3ObjectHash, s3Context, common.MapStr{})
326+
event := createEvent(log, int64(len(log)), s3Info, s3ObjectHash, s3Context)
328327
events = append(events, event)
329328
break
330329
}
331330

332-
event := createEvent(log, int64(len(log)), s3Info, s3ObjectHash, s3Context, common.MapStr{})
331+
event := createEvent(log, int64(len(log)), s3Info, s3ObjectHash, s3Context)
333332
events = append(events, event)
334333
}
335334

0 commit comments

Comments
 (0)