Skip to content

Commit 7ddcb1e

Browse files
author
Steffen Siering
authored
Elasticsearch index must be lowercase (#16081)
* Index names must be lowercase When indexing into Elasticsearch index names must always be lowercase. If the index or indices setting are configured to produce non-lowercase strings (e.g. by extracting part of the index name from the event contents), we need to normalize them to be lowercase. This change ensure that index names are always converted to lowercase. Static strings are converted to lowercase upfront, while dynamic strings will be post-processed. * update kafka/redis/LS output to guarantee lowercase index * add godoc
1 parent 90f79f9 commit 7ddcb1e

8 files changed

Lines changed: 208 additions & 165 deletions

File tree

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
5151
- Update replicaset group to apps/v1 {pull}15854[15802]
5252
- Fix issue where default go logger is not discarded when either * or stdout is selected. {issue}10251[10251] {pull}15708[15708]
5353
- Upgrade go-ucfg to latest v0.8.1. {pull}15937{15937}
54-
- Remove superfluous use of number_of_routing_shards setting from the default template. {pull}16038[16038]
54+
- Fix index names for indexing not always guaranteed to be lower case. {pull}16081[16081]
5555

5656
*Auditbeat*
5757

libbeat/outputs/kafka/client.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func newKafkaClient(
7979
hosts: hosts,
8080
topic: topic,
8181
key: key,
82-
index: index,
82+
index: strings.ToLower(index),
8383
codec: writer,
8484
config: *cfg,
8585
}

libbeat/outputs/logstash/config.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package logstash
1919

2020
import (
21+
"strings"
2122
"time"
2223

2324
"github.com/elastic/beats/libbeat/beat"
@@ -80,7 +81,7 @@ func readConfig(cfg *common.Config, info beat.Info) (*Config, error) {
8081
}
8182

8283
if c.Index == "" {
83-
c.Index = info.IndexPrefix
84+
c.Index = strings.ToLower(info.IndexPrefix)
8485
}
8586

8687
return &c, nil

libbeat/outputs/logstash/enc.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818
package logstash
1919

2020
import (
21+
"strings"
22+
2123
"github.com/elastic/beats/libbeat/beat"
2224
"github.com/elastic/beats/libbeat/outputs/codec/json"
2325
)
@@ -27,6 +29,7 @@ func makeLogstashEventEncoder(info beat.Info, escapeHTML bool, index string) fun
2729
Pretty: false,
2830
EscapeHTML: escapeHTML,
2931
})
32+
index = strings.ToLower(index)
3033
return func(event interface{}) (d []byte, err error) {
3134
d, err = enc.Encode(index, event.(*beat.Event))
3235
if err != nil {

libbeat/outputs/logstash/logstash_integration_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,8 @@ func esConnect(t *testing.T, index string) *esConnection {
9292

9393
host := getElasticsearchHost()
9494
indexFmt := fmtstr.MustCompileEvent(fmt.Sprintf("%s-%%{+yyyy.MM.dd}", index))
95-
indexSel := outil.MakeSelector(outil.FmtSelectorExpr(indexFmt, ""))
95+
indexFmtExpr, _ := outil.FmtSelectorExpr(indexFmt, "")
96+
indexSel := outil.MakeSelector(indexFmtExpr)
9697
index, _ = indexSel.Select(&beat.Event{
9798
Timestamp: ts,
9899
})

libbeat/outputs/outil/select.go

Lines changed: 80 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -19,17 +19,22 @@ package outil
1919

2020
import (
2121
"fmt"
22+
"strings"
2223

2324
"github.com/elastic/beats/libbeat/beat"
2425
"github.com/elastic/beats/libbeat/common"
2526
"github.com/elastic/beats/libbeat/common/fmtstr"
2627
"github.com/elastic/beats/libbeat/conditions"
2728
)
2829

30+
// Selector is used to produce a string based on the contents of a Beats event.
31+
// A selector supports multiple rules that need to be configured.
2932
type Selector struct {
3033
sel SelectorExpr
3134
}
3235

36+
// Settings configures how BuildSelectorFromConfig creates a Selector from
37+
// a given configuration object.
3338
type Settings struct {
3439
// single selector key and default option keyword
3540
Key string
@@ -44,6 +49,8 @@ type Settings struct {
4449
FailEmpty bool
4550
}
4651

52+
// SelectorExpr represents an expression object that can be composed with other
53+
// expressions in order to build a Selector.
4754
type SelectorExpr interface {
4855
sel(evt *beat.Event) (string, error)
4956
}
@@ -76,6 +83,7 @@ type mapSelector struct {
7683

7784
var nilSelector SelectorExpr = &emptySelector{}
7885

86+
// MakeSelector creates a selector from a set of selector expressions.
7987
func MakeSelector(es ...SelectorExpr) Selector {
8088
switch len(es) {
8189
case 0:
@@ -95,10 +103,12 @@ func (s Selector) Select(evt *beat.Event) (string, error) {
95103
return s.sel.sel(evt)
96104
}
97105

106+
// IsEmpty checks if the selector is not configured and will always return an empty string.
98107
func (s Selector) IsEmpty() bool {
99108
return s.sel == nilSelector || s.sel == nil
100109
}
101110

111+
// IsConst checks if the selector will always return the same string.
102112
func (s Selector) IsConst() bool {
103113
if s.sel == nilSelector {
104114
return true
@@ -108,6 +118,7 @@ func (s Selector) IsConst() bool {
108118
return ok
109119
}
110120

121+
// BuildSelectorFromConfig creates a selector from a configuration object.
111122
func BuildSelectorFromConfig(
112123
cfg *common.Config,
113124
settings Settings,
@@ -156,17 +167,13 @@ func BuildSelectorFromConfig(
156167
return Selector{}, fmt.Errorf("%v in %v", err, cfg.PathOf(key))
157168
}
158169

159-
if fmtstr.IsConst() {
160-
str, err := fmtstr.Run(nil)
161-
if err != nil {
162-
return Selector{}, err
163-
}
170+
fmtsel, err := FmtSelectorExpr(fmtstr, "")
171+
if err != nil {
172+
return Selector{}, fmt.Errorf("%v in %v", err, cfg.PathOf(key))
173+
}
164174

165-
if str != "" {
166-
sel = append(sel, ConstSelectorExpr(str))
167-
}
168-
} else {
169-
sel = append(sel, FmtSelectorExpr(fmtstr, ""))
175+
if fmtsel != nilSelector {
176+
sel = append(sel, fmtsel)
170177
}
171178
}
172179

@@ -183,35 +190,84 @@ func BuildSelectorFromConfig(
183190
return MakeSelector(sel...), nil
184191
}
185192

193+
// EmptySelectorExpr create a selector expression that returns an empty string.
186194
func EmptySelectorExpr() SelectorExpr {
187195
return nilSelector
188196
}
189197

198+
// ConstSelectorExpr creates a selector expression that always returns the configured string.
190199
func ConstSelectorExpr(s string) SelectorExpr {
191-
return &constSelector{s}
200+
if s == "" {
201+
return EmptySelectorExpr()
202+
}
203+
return &constSelector{strings.ToLower(s)}
192204
}
193205

194-
func FmtSelectorExpr(fmt *fmtstr.EventFormatString, fallback string) SelectorExpr {
195-
return &fmtSelector{*fmt, fallback}
206+
// FmtSelectorExpr creates a selector expression using a format string. If the
207+
// event can not be applied the default fallback constant string will be returned.
208+
func FmtSelectorExpr(fmt *fmtstr.EventFormatString, fallback string) (SelectorExpr, error) {
209+
if fmt.IsConst() {
210+
str, err := fmt.Run(nil)
211+
if err != nil {
212+
return nil, err
213+
}
214+
if str == "" {
215+
str = fallback
216+
}
217+
return ConstSelectorExpr(str), nil
218+
}
219+
220+
return &fmtSelector{*fmt, strings.ToLower(fallback)}, nil
196221
}
197222

223+
// ConcatSelectorExpr combines multiple expressions that are run one after the other.
224+
// The first expression that returns a string wins.
198225
func ConcatSelectorExpr(s ...SelectorExpr) SelectorExpr {
199226
return &listSelector{s}
200227
}
201228

229+
// ConditionalSelectorExpr executes the given expression only if the event
230+
// matches the given condition.
202231
func ConditionalSelectorExpr(
203232
s SelectorExpr,
204233
cond conditions.Condition,
205234
) SelectorExpr {
206235
return &condSelector{s, cond}
207236
}
208237

238+
// LookupSelectorExpr replaces the produced string with an table entry.
239+
// If there is no entry in the table the default fallback string will be reported.
209240
func LookupSelectorExpr(
210-
s SelectorExpr,
241+
evtfmt *fmtstr.EventFormatString,
211242
table map[string]string,
212243
fallback string,
213-
) SelectorExpr {
214-
return &mapSelector{s, fallback, table}
244+
) (SelectorExpr, error) {
245+
if evtfmt.IsConst() {
246+
str, err := evtfmt.Run(nil)
247+
if err != nil {
248+
return nil, err
249+
}
250+
251+
str = table[strings.ToLower(str)]
252+
if str == "" {
253+
str = fallback
254+
}
255+
return ConstSelectorExpr(str), nil
256+
}
257+
258+
return &mapSelector{
259+
from: &fmtSelector{f: *evtfmt},
260+
to: table,
261+
otherwise: fallback,
262+
}, nil
263+
}
264+
265+
func lowercaseTable(table map[string]string) map[string]string {
266+
tmp := make(map[string]string, len(table))
267+
for k, v := range table {
268+
tmp[strings.ToLower(k)] = strings.ToLower(v)
269+
}
270+
return tmp
215271
}
216272

217273
func buildSingle(cfg *common.Config, key string) (SelectorExpr, error) {
@@ -239,7 +295,7 @@ func buildSingle(cfg *common.Config, key string) (SelectorExpr, error) {
239295
if err != nil {
240296
return nil, err
241297
}
242-
otherwise = tmp
298+
otherwise = strings.ToLower(tmp)
243299
}
244300

245301
// 3. extract optional `mapping`
@@ -276,45 +332,14 @@ func buildSingle(cfg *common.Config, key string) (SelectorExpr, error) {
276332
// 5. build selector from available fields
277333
var sel SelectorExpr
278334
if len(mapping.Table) > 0 {
279-
if evtfmt.IsConst() {
280-
str, err := evtfmt.Run(nil)
281-
if err != nil {
282-
return nil, err
283-
}
284-
285-
str = mapping.Table[str]
286-
if str == "" {
287-
str = otherwise
288-
}
289-
290-
if str == "" {
291-
sel = nilSelector
292-
} else {
293-
sel = ConstSelectorExpr(str)
294-
}
295-
} else {
296-
sel = &mapSelector{
297-
from: FmtSelectorExpr(evtfmt, ""),
298-
to: mapping.Table,
299-
otherwise: otherwise,
300-
}
301-
}
335+
sel, err = LookupSelectorExpr(evtfmt, lowercaseTable(mapping.Table), otherwise)
302336
} else {
303-
if evtfmt.IsConst() {
304-
str, err := evtfmt.Run(nil)
305-
if err != nil {
306-
return nil, err
307-
}
308-
309-
if str == "" {
310-
sel = nilSelector
311-
} else {
312-
sel = ConstSelectorExpr(str)
313-
}
314-
} else {
315-
sel = FmtSelectorExpr(evtfmt, otherwise)
316-
}
337+
sel, err = FmtSelectorExpr(evtfmt, otherwise)
317338
}
339+
if err != nil {
340+
return nil, err
341+
}
342+
318343
if cond != nil && sel != nilSelector {
319344
sel = ConditionalSelectorExpr(sel, cond)
320345
}
@@ -363,7 +388,7 @@ func (s *fmtSelector) sel(evt *beat.Event) (string, error) {
363388
if n == "" {
364389
return s.otherwise, nil
365390
}
366-
return n, nil
391+
return strings.ToLower(n), nil
367392
}
368393

369394
func (s *mapSelector) sel(evt *beat.Event) (string, error) {

0 commit comments

Comments
 (0)