Skip to content

Commit 16c753c

Browse files
authored
Additional stats fields for Elasticsearch (#41944)
* Perform an additional _settings API call for Elasticsearch module * Added filter_path for cluster state & index settings fetch * Added index creation version
1 parent f9a9b32 commit 16c753c

9 files changed

Lines changed: 163 additions & 13 deletions

File tree

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
416416
- Only watch metadata for ReplicaSets in metricbeat k8s module {pull}41289[41289]
417417
- Add support for region/zone for Vertex AI service in GCP module {pull}41551[41551]
418418
- Add support for location label as an optional configuration parameter in GCP metrics metricset. {issue}41550[41550] {pull}41626[41626]
419+
- Added `tier_preference`, `creation_date` and `version` fields to the `elasticsearch.index` metricset. {pull}41944[41944]
419420

420421
*Metricbeat*
421422
- Add benchmark module {pull}41801[41801]

metricbeat/docs/fields.asciidoc

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32161,6 +32161,27 @@ type: keyword
3216132161

3216232162
--
3216332163

32164+
*`elasticsearch.index.tier_preference`*::
32165+
+
32166+
--
32167+
type: keyword
32168+
32169+
--
32170+
32171+
*`elasticsearch.index.creation_date`*::
32172+
+
32173+
--
32174+
type: date
32175+
32176+
--
32177+
32178+
*`elasticsearch.index.version`*::
32179+
+
32180+
--
32181+
type: keyword
32182+
32183+
--
32184+
3216432185
*`elasticsearch.index.name`*::
3216532186
+
3216632187
--

metricbeat/module/elasticsearch/cluster_stats/data.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -231,7 +231,7 @@ func eventMapping(r mb.ReporterV2, httpClient *helper.HTTP, info elasticsearch.I
231231
}
232232

233233
clusterStateMetrics := []string{"version", "master_node", "nodes", "routing_table"}
234-
clusterState, err := elasticsearch.GetClusterState(httpClient, httpClient.GetURI(), clusterStateMetrics)
234+
clusterState, err := elasticsearch.GetClusterState(httpClient, httpClient.GetURI(), clusterStateMetrics, []string{})
235235
if err != nil {
236236
return fmt.Errorf("failed to get cluster state from Elasticsearch: %w", err)
237237
}

metricbeat/module/elasticsearch/elasticsearch.go

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -288,13 +288,21 @@ func GetLicense(http *helper.HTTP, resetURI string) (*License, error) {
288288
}
289289

290290
// GetClusterState returns cluster state information.
291-
func GetClusterState(http *helper.HTTP, resetURI string, metrics []string) (mapstr.M, error) {
291+
func GetClusterState(http *helper.HTTP, resetURI string, metrics []string, filterPaths []string) (mapstr.M, error) {
292+
queryParams := []string{"local=true"}
292293
clusterStateURI := "_cluster/state"
293294
if len(metrics) > 0 {
294295
clusterStateURI += "/" + strings.Join(metrics, ",")
295296
}
296297

297-
content, err := fetchPath(http, resetURI, clusterStateURI, "local=true")
298+
if len(filterPaths) > 0 {
299+
filterPathQueryParam := "filter_path=" + strings.Join(filterPaths, ",")
300+
queryParams = append(queryParams, filterPathQueryParam)
301+
}
302+
303+
queryString := strings.Join(queryParams, "&")
304+
305+
content, err := fetchPath(http, resetURI, clusterStateURI, queryString)
298306
if err != nil {
299307
return nil, err
300308
}
@@ -304,6 +312,28 @@ func GetClusterState(http *helper.HTTP, resetURI string, metrics []string) (maps
304312
return clusterState, err
305313
}
306314

315+
func GetIndexSettings(http *helper.HTTP, resetURI string, indexPattern string, filterPaths []string) (mapstr.M, error) {
316+
317+
queryParams := []string{"local=true", "expand_wildcards=hidden,all"}
318+
indicesSettingsURI := indexPattern + "/_settings"
319+
320+
if len(filterPaths) > 0 {
321+
filterPathQueryParam := "filter_path=" + strings.Join(filterPaths, ",")
322+
queryParams = append(queryParams, filterPathQueryParam)
323+
}
324+
325+
queryString := strings.Join(queryParams, "&")
326+
327+
content, err := fetchPath(http, resetURI, indicesSettingsURI, queryString)
328+
if err != nil {
329+
return nil, err
330+
}
331+
332+
var indicesSettings map[string]interface{}
333+
err = json.Unmarshal(content, &indicesSettings)
334+
return indicesSettings, err
335+
}
336+
307337
// GetClusterSettingsWithDefaults returns cluster settings.
308338
func GetClusterSettingsWithDefaults(http *helper.HTTP, resetURI string, filterPaths []string) (mapstr.M, error) {
309339
return GetClusterSettings(http, resetURI, true, filterPaths)

metricbeat/module/elasticsearch/fields.go

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

metricbeat/module/elasticsearch/index/_meta/data.json

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -144,6 +144,9 @@
144144
}
145145
},
146146
"status": "green",
147+
"tier_preference": "data_content",
148+
"creation_date": 1731657995821,
149+
"version": "8505000",
147150
"hidden": true,
148151
"shards": {
149152
"total": 1,

metricbeat/module/elasticsearch/index/_meta/fields.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,12 @@
1717
type: keyword
1818
- name: status
1919
type: keyword
20+
- name: tier_preference
21+
type: keyword
22+
- name: creation_date
23+
type: date
24+
- name: version
25+
type: keyword
2026
- name: name
2127
type: keyword
2228
description: >

metricbeat/module/elasticsearch/index/data.go

Lines changed: 97 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ package index
2020
import (
2121
"encoding/json"
2222
"fmt"
23+
"strconv"
2324

2425
"github.com/joeshaw/multierror"
2526

@@ -40,9 +41,12 @@ type Index struct {
4041
Primaries primaries `json:"primaries"`
4142
Total total `json:"total"`
4243

43-
Index string `json:"index"`
44-
Status string `json:"status"`
45-
Shards shardStats `json:"shards"`
44+
Index string `json:"index"`
45+
Status string `json:"status"`
46+
TierPreference string `json:"tier_preference"`
47+
CreationDate int `json:"creation_date"`
48+
Version string `json:"version"`
49+
Shards shardStats `json:"shards"`
4650
}
4751

4852
type primaries struct {
@@ -180,11 +184,19 @@ type bulkStats struct {
180184

181185
func eventsMapping(r mb.ReporterV2, httpClient *helper.HTTP, info elasticsearch.Info, content []byte, isXpack bool) error {
182186
clusterStateMetrics := []string{"routing_table"}
183-
clusterState, err := elasticsearch.GetClusterState(httpClient, httpClient.GetURI(), clusterStateMetrics)
187+
clusterStateFilterPaths := []string{"routing_table"}
188+
clusterState, err := elasticsearch.GetClusterState(httpClient, httpClient.GetURI(), clusterStateMetrics, clusterStateFilterPaths)
184189
if err != nil {
185190
return fmt.Errorf("failure retrieving cluster state from Elasticsearch: %w", err)
186191
}
187192

193+
indicesSettingsPattern := "*,.*"
194+
indicesSettingsFilterPaths := []string{"*.settings.index.creation_date", "*.settings.index.**._tier_preference", "*.settings.index.version.created"}
195+
indicesSettings, err := elasticsearch.GetIndexSettings(httpClient, httpClient.GetURI(), indicesSettingsPattern, indicesSettingsFilterPaths)
196+
if err != nil {
197+
return fmt.Errorf("failure retrieving index settings from Elasticsearch: %w", err)
198+
}
199+
188200
var indicesStats stats
189201
if err := parseAPIResponse(content, &indicesStats); err != nil {
190202
return fmt.Errorf("failure parsing Indices Stats Elasticsearch API response: %w", err)
@@ -204,6 +216,12 @@ func eventsMapping(r mb.ReporterV2, httpClient *helper.HTTP, info elasticsearch.
204216
continue
205217
}
206218

219+
err = addIndexSettings(&idx, indicesSettings)
220+
if err != nil {
221+
errs = append(errs, fmt.Errorf("failure adding index settings: %w", err))
222+
continue
223+
}
224+
207225
event.ModuleFields.Put("cluster.id", info.ClusterID)
208226
event.ModuleFields.Put("cluster.name", info.ClusterName)
209227

@@ -271,6 +289,63 @@ func addClusterStateFields(idx *Index, clusterState mapstr.M) error {
271289
return nil
272290
}
273291

292+
func addIndexSettings(idx *Index, indicesSettings mapstr.M) error {
293+
294+
// Recover the index settings for our specific index
295+
indexSettingsValue, err := indicesSettings.GetValue(idx.Index)
296+
if err != nil {
297+
return fmt.Errorf("failed to get index settings for index %s: %w", idx.Index, err)
298+
}
299+
300+
indexSettings, ok := indexSettingsValue.(map[string]interface{})
301+
if !ok {
302+
return fmt.Errorf("index settings is not a map for index: %s", idx.Index)
303+
}
304+
305+
indexCreationDate, err := getIndexSettingForIndex(indexSettings, idx.Index, "index.creation_date")
306+
if err != nil {
307+
return fmt.Errorf("failed to get index creation date: %w", err)
308+
}
309+
310+
idx.CreationDate, err = strconv.Atoi(indexCreationDate)
311+
if err != nil {
312+
return fmt.Errorf("failed to convert index creation date to int: %w", err)
313+
}
314+
315+
indexTierPreference, err := getIndexSettingForIndex(indexSettings, idx.Index, "index.routing.allocation.require._tier_preference")
316+
if err != nil {
317+
indexTierPreference, err = getIndexSettingForIndex(indexSettings, idx.Index, "index.routing.allocation.include._tier_preference")
318+
if err != nil {
319+
return fmt.Errorf("failed to get index tier preference: %w", err)
320+
}
321+
}
322+
323+
idx.TierPreference = indexTierPreference
324+
325+
indexVersion, err := getIndexSettingForIndex(indexSettings, idx.Index, "index.version.created")
326+
if err != nil {
327+
return fmt.Errorf("failed to get index version: %w", err)
328+
}
329+
330+
idx.Version = indexVersion
331+
332+
return nil
333+
}
334+
335+
func getIndexSettingForIndex(indexSettings mapstr.M, index, settingKey string) (string, error) {
336+
fieldKey := "settings." + settingKey
337+
value, err := indexSettings.GetValue(fieldKey)
338+
if err != nil {
339+
return "", fmt.Errorf("'"+fieldKey+"': %w", err)
340+
}
341+
342+
setting, ok := value.(string)
343+
if !ok {
344+
return "", elastic.MakeErrorForMissingField(fieldKey, elastic.Elasticsearch)
345+
}
346+
return setting, nil
347+
}
348+
274349
func getClusterStateMetricForIndex(clusterState mapstr.M, index, metricKey string) (mapstr.M, error) {
275350
fieldKey := metricKey + ".indices." + index
276351
value, err := clusterState.GetValue(fieldKey)
@@ -308,8 +383,15 @@ func getIndexStatus(shards map[string]interface{}) (string, error) {
308383

309384
shard := mapstr.M(s)
310385

311-
isPrimary := shard["primary"].(bool)
312-
state := shard["state"].(string)
386+
isPrimary, ok := shard["primary"].(bool)
387+
if !ok {
388+
return "", fmt.Errorf("%v.shards[%v].primary is not a boolean", indexName, shardIdx)
389+
}
390+
391+
state, ok := shard["state"].(string)
392+
if !ok {
393+
return "", fmt.Errorf("%v.shards[%v].state is not a string", indexName, shardIdx)
394+
}
313395

314396
if isPrimary {
315397
areAllPrimariesStarted = areAllPrimariesStarted && (state == "STARTED")
@@ -357,8 +439,15 @@ func getIndexShardStats(shards mapstr.M) (*shardStats, error) {
357439

358440
shard := mapstr.M(s)
359441

360-
isPrimary := shard["primary"].(bool)
361-
state := shard["state"].(string)
442+
isPrimary, ok := shard["primary"].(bool)
443+
if !ok {
444+
return nil, fmt.Errorf("%v.shards[%v].primary is not a boolean", indexName, shardIdx)
445+
}
446+
447+
state, ok := shard["state"].(string)
448+
if !ok {
449+
return nil, fmt.Errorf("%v.shards[%v].state is not a string", indexName, shardIdx)
450+
}
362451

363452
if isPrimary {
364453
primaries++

metricbeat/tests/system/test_base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ def test_index_management(self):
5959
assert len(es.cat.templates(name='metricbeat-*', h='name')) > 0
6060

6161
@unittest.skipUnless(INTEGRATION_TESTS, "integration test")
62-
@pytest.mark.timeout(8*60, func_only=True)
62+
@pytest.mark.timeout(8 * 60, func_only=True)
6363
def test_dashboards(self):
6464
"""
6565
Test that the dashboards can be loaded with `setup --dashboards`

0 commit comments

Comments
 (0)