Skip to content

Commit 84bf434

Browse files
authored
Enable require_alias for Bulk requests for all actions when target is a write alias (#29879)
## What does this PR do? This PR adds support for requiring alias when using ILM. From now on a `Selector` can tell Elasticsearch client if the target we are shipping events to is an alias or an index. By default, we consider everything an index, and only consider a target an alias when ILM is enabled. The feature is only supported since ES 7.10, so if the user tries to connect to an older version, we cannot help them with this parameter. ## Why is it important? We see issues around ILM sometimes where users have deleted their write alias causing running beats instances to auto-create an index where the write alias should (with auto-mappings to boot, since the template won't be applied).
1 parent 4c4400e commit 84bf434

File tree

9 files changed

+77
-19
lines changed

9 files changed

+77
-19
lines changed

CHANGELOG.next.asciidoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
3636

3737
*Affecting all Beats*
3838

39+
- Enable `require_alias` for Bulk requests for all actions when target is a write alias. {issue}27874[27874] {pull}29879[29879]
40+
3941

4042
*Auditbeat*
4143

libbeat/esleg/eslegclient/bulkapi.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -51,10 +51,11 @@ type BulkDeleteAction struct {
5151
}
5252

5353
type BulkMeta struct {
54-
Index string `json:"_index" struct:"_index"`
55-
DocType string `json:"_type,omitempty" struct:"_type,omitempty"`
56-
Pipeline string `json:"pipeline,omitempty" struct:"pipeline,omitempty"`
57-
ID string `json:"_id,omitempty" struct:"_id,omitempty"`
54+
Index string `json:"_index" struct:"_index"`
55+
DocType string `json:"_type,omitempty" struct:"_type,omitempty"`
56+
Pipeline string `json:"pipeline,omitempty" struct:"pipeline,omitempty"`
57+
ID string `json:"_id,omitempty" struct:"_id,omitempty"`
58+
RequireAlias bool `json:"require_alias,omitempty" struct:"require_alias,omitempty"`
5859
}
5960

6061
type bulkRequest struct {

libbeat/idxmgmt/std.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,13 +337,17 @@ func (s *ilmIndexSelector) Select(evt *beat.Event) (string, error) {
337337
return idx, err
338338
}
339339

340+
func (s ilmIndexSelector) IsAlias() bool { return true }
341+
340342
func (s indexSelector) Select(evt *beat.Event) (string, error) {
341343
if idx := getEventCustomIndex(evt, s.beatInfo); idx != "" {
342344
return idx, nil
343345
}
344346
return s.sel.Select(evt)
345347
}
346348

349+
func (s indexSelector) IsAlias() bool { return false }
350+
347351
func getEventCustomIndex(evt *beat.Event, beatInfo beat.Info) string {
348352
if len(evt.Meta) == 0 {
349353
return ""

libbeat/outputs/elasticsearch/client.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -317,6 +317,10 @@ func (client *Client) createEventBulkMeta(version common.Version, event *beat.Ev
317317
ID: id,
318318
}
319319

320+
if isRequireAliasSupported(version) {
321+
meta.RequireAlias = client.index.IsAlias()
322+
}
323+
320324
if opType == events.OpTypeDelete {
321325
if id != "" {
322326
return eslegclient.BulkDeleteAction{Delete: meta}, nil
@@ -333,6 +337,10 @@ func (client *Client) createEventBulkMeta(version common.Version, event *beat.Ev
333337
return eslegclient.BulkIndexAction{Index: meta}, nil
334338
}
335339

340+
func isRequireAliasSupported(version common.Version) bool {
341+
return !version.LessThan(common.MustNewVersion("7.10.0"))
342+
}
343+
336344
func (client *Client) getPipeline(event *beat.Event) (string, error) {
337345
if event.Meta != nil {
338346
pipeline, err := events.GetMetaStringValue(*event, events.FieldMetaPipeline)

libbeat/outputs/elasticsearch/client_integration_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -420,7 +420,8 @@ func connectTestEs(t *testing.T, cfg interface{}, stats outputs.Observer) (outpu
420420
}
421421

422422
info := beat.Info{Beat: "libbeat"}
423-
im, _ := idxmgmt.DefaultSupport(nil, info, nil)
423+
// ILM must be disabled otherwise custom index settings are ignored.
424+
im, _ := idxmgmt.DefaultSupport(nil, info, disabledILMConfig())
424425
output, err := makeES(im, info, stats, config)
425426
if err != nil {
426427
t.Fatal(err)
@@ -438,6 +439,10 @@ func connectTestEs(t *testing.T, cfg interface{}, stats outputs.Observer) (outpu
438439
return client, client
439440
}
440441

442+
func disabledILMConfig() *common.Config {
443+
return common.MustNewConfigFrom(map[string]interface{}{"setup": map[string]interface{}{"ilm": map[string]interface{}{"enabled": false}}})
444+
}
445+
441446
// setupRoleMapping sets up role mapping for the Kerberos user beats@ELASTIC
442447
func setupRoleMapping(t *testing.T, host string) error {
443448
_, client := connectTestEsWithoutStats(t, map[string]interface{}{

libbeat/outputs/elasticsearch/client_test.go

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -409,22 +409,49 @@ func TestClientWithHeaders(t *testing.T) {
409409

410410
func TestBulkEncodeEvents(t *testing.T) {
411411
cases := map[string]struct {
412-
version string
413-
docType string
414-
config common.MapStr
415-
events []common.MapStr
412+
version string
413+
docType string
414+
config common.MapStr
415+
ilmConfig *common.Config
416+
isAlias bool
417+
events []common.MapStr
416418
}{
417419
"6.x": {
418-
version: "6.8.0",
419-
docType: "doc",
420-
config: common.MapStr{},
421-
events: []common.MapStr{{"message": "test"}},
420+
version: "6.8.0",
421+
docType: "doc",
422+
config: common.MapStr{},
423+
ilmConfig: common.NewConfig(),
424+
events: []common.MapStr{{"message": "test"}},
422425
},
423-
"latest": {
424-
version: version.GetDefaultVersion(),
425-
docType: "",
426-
config: common.MapStr{},
427-
events: []common.MapStr{{"message": "test"}},
426+
"require_alias not supported": {
427+
version: "7.9.0",
428+
docType: "",
429+
config: common.MapStr{},
430+
ilmConfig: common.NewConfig(),
431+
events: []common.MapStr{{"message": "test"}},
432+
},
433+
"require_alias is supported": {
434+
version: "7.10.0",
435+
docType: "",
436+
config: common.MapStr{},
437+
ilmConfig: common.NewConfig(),
438+
isAlias: true,
439+
events: []common.MapStr{{"message": "test"}},
440+
},
441+
"latest with ILM": {
442+
version: version.GetDefaultVersion(),
443+
docType: "",
444+
config: common.MapStr{},
445+
ilmConfig: common.NewConfig(),
446+
isAlias: true,
447+
events: []common.MapStr{{"message": "test"}},
448+
},
449+
"latest without ILM": {
450+
version: version.GetDefaultVersion(),
451+
docType: "",
452+
config: common.MapStr{},
453+
ilmConfig: disabledILMConfig(),
454+
events: []common.MapStr{{"message": "test"}},
428455
},
429456
}
430457

@@ -437,7 +464,7 @@ func TestBulkEncodeEvents(t *testing.T) {
437464
Version: test.version,
438465
}
439466

440-
im, err := idxmgmt.DefaultSupport(nil, info, common.NewConfig())
467+
im, err := idxmgmt.DefaultSupport(nil, info, test.ilmConfig)
441468
require.NoError(t, err)
442469

443470
index, pipeline, err := buildSelectors(im, info, cfg)
@@ -479,6 +506,7 @@ func TestBulkEncodeEvents(t *testing.T) {
479506
}
480507

481508
assert.NotEqual(t, "", meta.Index)
509+
assert.Equal(t, test.isAlias, meta.RequireAlias)
482510
assert.Equal(t, test.docType, meta.DocType)
483511
}
484512

@@ -487,6 +515,10 @@ func TestBulkEncodeEvents(t *testing.T) {
487515
}
488516
}
489517

518+
func disabledILMConfig() *common.Config {
519+
return common.MustNewConfigFrom(map[string]interface{}{"setup": map[string]interface{}{"ilm": map[string]interface{}{"enabled": false}}})
520+
}
521+
490522
func TestBulkEncodeEventsWithOpType(t *testing.T) {
491523
cases := []common.MapStr{
492524
{"_id": "111", "op_type": e.OpTypeIndex, "message": "test 1", "bulkIndex": 0},

libbeat/outputs/elasticsearch/death_letter_selector.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,3 +34,5 @@ func (d DeadLetterSelector) Select(event *beat.Event) (string, error) {
3434
}
3535
return d.Selector.Select(event)
3636
}
37+
38+
func (d DeadLetterSelector) IsAlias() bool { return false }

libbeat/outputs/outil/select.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,8 @@ func (s Selector) Select(evt *beat.Event) (string, error) {
8787
return s.sel.sel(evt)
8888
}
8989

90+
func (s Selector) IsAlias() bool { return false }
91+
9092
// IsEmpty checks if the selector is not configured and will always return an empty string.
9193
func (s Selector) IsEmpty() bool {
9294
return s.sel == nilSelector || s.sel == nil

libbeat/outputs/output_reg.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,10 @@ type IndexManager interface {
4343
}
4444

4545
// IndexSelector is used to find the index name an event shall be indexed to.
46+
// It also used to check if during indexing required_alias should be set.
4647
type IndexSelector interface {
4748
Select(event *beat.Event) (string, error)
49+
IsAlias() bool
4850
}
4951

5052
// Group configures and combines multiple clients into load-balanced group of clients

0 commit comments

Comments
 (0)