Skip to content

Commit cf9a1da

Browse files
[Monitoring] Fix inaccuracies in logstash pipeline listing metrics (#55868)
* Change how we fetch pipeline listing metrics to match what other charts show * Fix tests * Fix tests Co-authored-by: Elastic Machine <elasticmachine@users.noreply.github.com>
1 parent a3c6bd9 commit cf9a1da

9 files changed

Lines changed: 151 additions & 311 deletions

File tree

x-pack/legacy/plugins/monitoring/server/lib/cluster/get_clusters_from_request.js

Lines changed: 24 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ import { checkCcrEnabled } from '../elasticsearch/ccr';
3434
import { getStandaloneClusterDefinition, hasStandaloneClusters } from '../standalone_clusters';
3535
import { getLogTypes } from '../logs';
3636
import { isInCodePath } from './is_in_code_path';
37+
import { getLogstashPipelineIds } from '../logstash/get_pipeline_ids';
3738

3839
/**
3940
* Get all clusters or the cluster associated with {@code clusterUuid} when it is defined.
@@ -53,6 +54,8 @@ export async function getClustersFromRequest(
5354
filebeatIndexPattern,
5455
} = indexPatterns;
5556

57+
const config = req.server.config();
58+
const size = config.get('xpack.monitoring.max_bucket_size');
5659
const isStandaloneCluster = clusterUuid === STANDALONE_CLUSTER_CLUSTER_UUID;
5760

5861
let clusters = [];
@@ -158,25 +161,27 @@ export async function getClustersFromRequest(
158161
});
159162

160163
// add logstash data
161-
const logstashes = isInCodePath(codePaths, [CODE_PATH_LOGSTASH])
162-
? await getLogstashForClusters(req, lsIndexPattern, clusters)
163-
: [];
164-
165-
const clusterPipelineNodesCount = isInCodePath(codePaths, [CODE_PATH_LOGSTASH])
166-
? await getPipelines(req, lsIndexPattern, null, ['logstash_cluster_pipeline_nodes_count'])
167-
: [];
168-
169-
// add the logstash data to each cluster
170-
logstashes.forEach(logstash => {
171-
const clusterIndex = findIndex(clusters, { cluster_uuid: logstash.clusterUuid });
172-
173-
// withhold LS overview stats until pipeline metrics have at least one full bucket
174-
if (logstash.clusterUuid === req.params.clusterUuid && clusterPipelineNodesCount.length === 0) {
175-
logstash.stats = {};
176-
}
177-
178-
set(clusters[clusterIndex], 'logstash', logstash.stats);
179-
});
164+
if (isInCodePath(codePaths, [CODE_PATH_LOGSTASH])) {
165+
const logstashes = await getLogstashForClusters(req, lsIndexPattern, clusters);
166+
const pipelines = await getLogstashPipelineIds(req, lsIndexPattern, { clusterUuid }, size);
167+
const clusterPipelineNodesCount = await getPipelines(req, lsIndexPattern, pipelines, [
168+
'logstash_cluster_pipeline_nodes_count',
169+
]);
170+
// add the logstash data to each cluster
171+
logstashes.forEach(logstash => {
172+
const clusterIndex = findIndex(clusters, { cluster_uuid: logstash.clusterUuid });
173+
174+
// withhold LS overview stats until pipeline metrics have at least one full bucket
175+
if (
176+
logstash.clusterUuid === req.params.clusterUuid &&
177+
clusterPipelineNodesCount.length === 0
178+
) {
179+
logstash.stats = {};
180+
}
181+
182+
set(clusters[clusterIndex], 'logstash', logstash.stats);
183+
});
184+
}
180185

181186
// add beats data
182187
const beatsByCluster = isInCodePath(codePaths, [CODE_PATH_BEATS])
@@ -199,7 +204,6 @@ export async function getClustersFromRequest(
199204
// check ccr configuration
200205
const isCcrEnabled = await checkCcrEnabled(req, esIndexPattern);
201206

202-
const config = req.server.config();
203207
const kibanaUuid = config.get('server.uuid');
204208

205209
return getClustersSummary(req.server, clusters, kibanaUuid, isCcrEnabled);

x-pack/legacy/plugins/monitoring/server/lib/logstash/__tests__/get_pipelines.js

Lines changed: 17 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,15 @@
55
*/
66

77
import expect from '@kbn/expect';
8-
import { handleGetPipelinesResponse, processPipelinesAPIResponse } from '../get_pipelines';
8+
import { processPipelinesAPIResponse } from '../get_pipelines';
99

1010
describe('processPipelinesAPIResponse', () => {
1111
let response;
1212
beforeEach(() => {
1313
response = {
1414
pipelines: [
1515
{
16+
id: 1,
1617
metrics: {
1718
throughput_for_cluster: {
1819
data: [
@@ -22,8 +23,8 @@ describe('processPipelinesAPIResponse', () => {
2223
},
2324
nodes_count_for_cluster: {
2425
data: [
25-
[1513721903, 3],
26-
[1513722162, 2],
26+
[1513721903, { 1: 5 }],
27+
[1513722162, { 1: 10 }],
2728
],
2829
},
2930
},
@@ -32,96 +33,27 @@ describe('processPipelinesAPIResponse', () => {
3233
};
3334
});
3435

35-
it('normalizes the metric keys', () => {
36-
processPipelinesAPIResponse(response, 'throughput_for_cluster', 'nodes_count_for_cluster').then(
37-
processedResponse => {
38-
expect(processedResponse.pipelines[0].metrics.throughput).to.eql(
39-
response.pipelines[0].metrics.throughput_for_cluster
40-
);
41-
expect(processedResponse.pipelines[0].metrics.nodesCount).to.eql(
42-
response.pipelines[0].metrics.nodes_count_for_cluster
43-
);
44-
}
36+
it('normalizes the metric keys', async () => {
37+
const processedResponse = await processPipelinesAPIResponse(
38+
response,
39+
'throughput_for_cluster',
40+
'nodes_count_for_cluster'
41+
);
42+
expect(processedResponse.pipelines[0].metrics.throughput).to.eql(
43+
response.pipelines[0].metrics.throughput_for_cluster
4544
);
45+
expect(processedResponse.pipelines[0].metrics.nodesCount.data[0][0]).to.eql(1513721903);
46+
expect(processedResponse.pipelines[0].metrics.nodesCount.data[0][1]).to.eql(5);
47+
expect(processedResponse.pipelines[0].metrics.nodesCount.data[1][0]).to.eql(1513722162);
48+
expect(processedResponse.pipelines[0].metrics.nodesCount.data[1][1]).to.eql(10);
4649
});
4750

4851
it('computes the latest metrics', () => {
4952
processPipelinesAPIResponse(response, 'throughput_for_cluster', 'nodes_count_for_cluster').then(
5053
processedResponse => {
5154
expect(processedResponse.pipelines[0].latestThroughput).to.eql(23);
52-
expect(processedResponse.pipelines[0].latestNodesCount).to.eql(2);
55+
expect(processedResponse.pipelines[0].latestNodesCount).to.eql(10);
5356
}
5457
);
5558
});
5659
});
57-
58-
describe('get_pipelines', () => {
59-
let fetchPipelinesWithMetricsResult;
60-
61-
describe('fetchPipelinesWithMetrics result contains no pipelines', () => {
62-
beforeEach(() => {
63-
fetchPipelinesWithMetricsResult = {
64-
logstash_pipeline_throughput: [
65-
{
66-
data: [],
67-
},
68-
],
69-
logstash_pipeline_nodes_count: [
70-
{
71-
data: [],
72-
},
73-
],
74-
};
75-
});
76-
77-
it('returns an empty array', () => {
78-
const result = handleGetPipelinesResponse(fetchPipelinesWithMetricsResult);
79-
expect(result).to.eql([]);
80-
});
81-
});
82-
83-
describe('fetchPipelinesWithMetrics result contains pipelines', () => {
84-
beforeEach(() => {
85-
fetchPipelinesWithMetricsResult = {
86-
logstash_pipeline_throughput: [
87-
{
88-
data: [[1513123151000, { apache_logs: 231, logstash_tweets: 34 }]],
89-
},
90-
],
91-
logstash_pipeline_nodes_count: [
92-
{
93-
data: [[1513123151000, { apache_logs: 3, logstash_tweets: 1 }]],
94-
},
95-
],
96-
};
97-
});
98-
99-
it('returns the correct structure for a non-empty response', () => {
100-
const result = handleGetPipelinesResponse(fetchPipelinesWithMetricsResult);
101-
expect(result).to.eql([
102-
{
103-
id: 'apache_logs',
104-
metrics: {
105-
logstash_pipeline_throughput: {
106-
data: [[1513123151000, 231]],
107-
},
108-
logstash_pipeline_nodes_count: {
109-
data: [[1513123151000, 3]],
110-
},
111-
},
112-
},
113-
{
114-
id: 'logstash_tweets',
115-
metrics: {
116-
logstash_pipeline_throughput: {
117-
data: [[1513123151000, 34]],
118-
},
119-
logstash_pipeline_nodes_count: {
120-
data: [[1513123151000, 1]],
121-
},
122-
},
123-
},
124-
]);
125-
});
126-
});
127-
});

x-pack/legacy/plugins/monitoring/server/lib/logstash/get_paginated_pipelines.js

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import { get } from 'lodash';
88
import { filter } from '../pagination/filter';
99
import { getLogstashPipelineIds } from './get_pipeline_ids';
10-
import { handleGetPipelinesResponse } from './get_pipelines';
1110
import { sortPipelines } from './sort_pipelines';
1211
import { paginate } from '../pagination/paginate';
1312
import { getMetrics } from '../details/get_metrics';
@@ -51,19 +50,33 @@ export async function getPaginatedPipelines(
5150
// the necessary sort - we only need the last bucket of data so we
5251
// fetch the last two buckets of data (to ensure we have a single full bucekt),
5352
// then return the value from that last bucket
54-
const metricSeriesData = await getMetrics(
55-
req,
56-
lsIndexPattern,
57-
metricSet,
58-
[],
59-
{ pageOfPipelines: pipelines },
60-
2
61-
);
62-
const pipelineAggregationsData = handleGetPipelinesResponse(
63-
metricSeriesData,
64-
pipelines.map(p => p.id)
53+
const metricSeriesData = Object.values(
54+
await Promise.all(
55+
pipelines.map(pipeline => {
56+
return new Promise(async resolve => {
57+
const data = await getMetrics(
58+
req,
59+
lsIndexPattern,
60+
metricSet,
61+
[],
62+
{
63+
pipeline,
64+
},
65+
2
66+
);
67+
68+
resolve({
69+
id: pipeline.id,
70+
metrics: Object.keys(data).reduce((accum, metricName) => {
71+
accum[metricName] = data[metricName][0];
72+
return accum;
73+
}, {}),
74+
});
75+
});
76+
})
77+
)
6578
);
66-
for (const pipelineAggregationData of pipelineAggregationsData) {
79+
for (const pipelineAggregationData of metricSeriesData) {
6780
for (const pipeline of pipelines) {
6881
if (pipelineAggregationData.id === pipeline.id) {
6982
for (const metric of metricSet) {

x-pack/legacy/plugins/monitoring/server/lib/logstash/get_pipeline_ids.js

Lines changed: 19 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
* you may not use this file except in compliance with the Elastic License.
55
*/
66
import moment from 'moment';
7-
import { get, uniq } from 'lodash';
7+
import { get } from 'lodash';
88
import { createQuery } from '../create_query';
99
import { LogstashMetric } from '../metrics';
1010

@@ -26,7 +26,7 @@ export async function getLogstashPipelineIds(
2626
index: logstashIndexPattern,
2727
size: 0,
2828
ignoreUnavailable: true,
29-
filterPath: ['aggregations.nested_context.composite_data.buckets'],
29+
filterPath: ['aggregations.nest.id.buckets'],
3030
body: {
3131
query: createQuery({
3232
start,
@@ -36,37 +36,28 @@ export async function getLogstashPipelineIds(
3636
filters,
3737
}),
3838
aggs: {
39-
nested_context: {
39+
nest: {
4040
nested: {
4141
path: 'logstash_stats.pipelines',
4242
},
4343
aggs: {
44-
composite_data: {
45-
composite: {
44+
id: {
45+
terms: {
46+
field: 'logstash_stats.pipelines.id',
4647
size,
47-
sources: [
48-
{
49-
id: {
50-
terms: {
51-
field: 'logstash_stats.pipelines.id',
52-
},
53-
},
54-
},
55-
{
56-
hash: {
57-
terms: {
58-
field: 'logstash_stats.pipelines.hash',
59-
},
60-
},
61-
},
62-
{
63-
ephemeral_id: {
48+
},
49+
aggs: {
50+
unnest: {
51+
reverse_nested: {},
52+
aggs: {
53+
nodes: {
6454
terms: {
65-
field: 'logstash_stats.pipelines.ephemeral_id',
55+
field: 'logstash_stats.logstash.uuid',
56+
size,
6657
},
6758
},
6859
},
69-
],
60+
},
7061
},
7162
},
7263
},
@@ -77,8 +68,8 @@ export async function getLogstashPipelineIds(
7768

7869
const { callWithRequest } = req.server.plugins.elasticsearch.getCluster('monitoring');
7970
const response = await callWithRequest(req, 'search', params);
80-
const data = get(response, 'aggregations.nested_context.composite_data.buckets', []).map(
81-
bucket => bucket.key
82-
);
83-
return uniq(data, item => item.id);
71+
return get(response, 'aggregations.nest.id.buckets', []).map(bucket => ({
72+
id: bucket.key,
73+
nodeIds: get(bucket, 'unnest.nodes.buckets', []).map(item => item.key),
74+
}));
8475
}

0 commit comments

Comments
 (0)