Add Ingest Pipeline Telemetry device#1416
Conversation
DJRickyB
left a comment
There was a problem hiding this comment.
I like it so far! I tried it against the ingest-heavy Logging solution track, and poked around the rally-metrics-* data to get a better sense of how the dimensions were being persisted in the metric data and it looks good.
I also tried out running the benchmark multiple times against a persistent cluster and was happy to confirm the final stat is differential as you intended, great work. Looking forward to the documentation, please re-request my review at that point
|
Thanks @DJRickyB - I addressed the comments, added docs, and also added cluster metadata to each of the outputs (node, pipeline, processor levels). |
pquentin
left a comment
There was a problem hiding this comment.
Two small nits I've noticed on my first read
| self.metrics_store.put_doc(doc, level=MetaInfoScope.cluster, meta_data=data_stream_metadata) | ||
|
|
||
|
|
||
| class IngestPipelineStats(InternalTelemetryDevice): |
There was a problem hiding this comment.
As discussed with @pquentin - it probably makes most sense for this to be enabled by default given there's 0 risk of results skew with the current approach of collecting before and after a benchmark.
pquentin
left a comment
There was a problem hiding this comment.
Thanks for iterating. I tested it, it works great, thanks! I think we can merge after this last round.
| def _report_ingest_pipeline_stats(self, stats): | ||
| return self._join( | ||
| self._line("Total Ingest Pipeline count", "", stats.ingest_pipeline_cluster_count, ""), | ||
| self._line("Total Ingest Pipeline time", "", stats.ingest_pipeline_cluster_time, "ms", convert.ms_to_seconds), |
There was a problem hiding this comment.
| self._line("Total Ingest Pipeline time", "", stats.ingest_pipeline_cluster_time, "ms", convert.ms_to_seconds), | |
| self._line("Total Ingest Pipeline time", "", stats.ingest_pipeline_cluster_time, "s", convert.ms_to_seconds), |
| summaries = {} | ||
| for cluster_name in self.specified_cluster_names: | ||
| try: | ||
| ingest_stats = self.clients[cluster_name].nodes.stats(metric="ingest") |
There was a problem hiding this comment.
Just an FYI @sethmlarson - ingest is not documented as a valid param in the Python client docs:
metric – Limit the information returned to the specified metrics Valid choices: _all, breaker, fs, http, indices, jvm, os, process, thread_pool, transport, discovery, indexing_pressure
I can submit a PR to fix this, unless there's a specific reason it is undocumented.
There was a problem hiding this comment.
Thanks for reporting this, can you make the change here? https://github.com/elastic/elasticsearch/blob/master/rest-api-spec/src/main/resources/rest-api-spec/api/nodes.stats.json#L40
|
Funny, after all this testing I didn't actually catch that the current logic flow will never report metrics for pipelines created as part of the track. Because they don't exist at the beginning, we end up skipping them.. e.g.: Luckily this should be a relatively simple fix - I managed to get a quick local test working with: $ git diff
diff --git a/esrally/telemetry.py b/esrally/telemetry.py
index 1f19bcd..05c6357 100644
--- a/esrally/telemetry.py
+++ b/esrally/telemetry.py
@@ -1450,21 +1450,12 @@ class IngestPipelineStats(InternalTelemetryDevice):
)
continue
for summary_name, stats in summaries.items():
- if summary_name not in self.start_stats[cluster_name][node_name]:
- self.logger.warning("'%s' is not an expected field for Ingest Pipeline stats (skipping)", summary_name)
- continue
if summary_name == "total":
# The top level "total" contains stats for the node as a whole,
# each node will have exactly one top level "total" key
self._record_node_level_pipeline_stats(stats, cluster_name, node_name)
elif summary_name == "pipelines":
for pipeline_name, pipeline in stats.items():
- if pipeline_name not in self.start_stats[cluster_name][node_name]["pipelines"]:
- self.logger.warning(
- "Cannot determine Ingest Pipeline stats for %s (pipeline was not defined at the of the benchmark).",
- pipeline_name,
- )
- continue
self._record_pipeline_level_processor_stats(pipeline, pipeline_name, cluster_name, node_name)
self._record_cluster_level_pipeline_stats(cluster_name)
@@ -1500,15 +1491,13 @@ class IngestPipelineStats(InternalTelemetryDevice):
def _record_pipeline_level_processor_stats(self, pipeline, pipeline_name, cluster_name, node_name):
for processor_name, processor_stats in pipeline.items():
- start_stats_processors = self.start_stats[cluster_name][node_name]["pipelines"][pipeline_name]
+ try:
+ start_stats_processors = self.start_stats[cluster_name][node_name]["pipelines"][pipeline_name]
+ except KeyError:
+ # Pipeline was likely created by the track itself
+ start_stats_processors = {}
+ start_stats_processors[processor_name] = {}
- if processor_name not in start_stats_processors:
- self.logger.warning(
- "Cannot determine Ingest Pipeline stats in %s for %s (processor was not defined at the start of the benchmark).",
- pipeline_name,
- processor_name,
- )
- continue
# We have an individual processor obj, which contains the stats for each individual processor
if processor_name != "total":And to test (using the attached track): IMO this should also be an additional unit test. |
I added code to the PR and my review blocks merges
|
Thanks Brad, I integrated your change and added a unit test, as discussed offline. This finally fixed things on esbench where the first run does not have data. Since we have two dimensions here (pipeline/processor), it's a nice candidate for a Lens treemap. Here's an example: We can clearly see which pipelines and which processors are slow! |
|
@pquentin - nice! Thank you for your help. Are we happy to merge this? |
|
@b-deam If you're happy, I'm happy :D |
#1416 introduced the Ingest Pipeline stats telemetry device, but did not update our summary report docs to include the new metrics. This commit adds the missing documentation.

With this commit we had a Ingest Pipeline stats telemetry device. This
telemetry device utilises the node stats API, but rather than sampling
these statistics throughout the benchmark (i.e. node stats telemetry
device) we collect an output at the beginning and end, and report only the
delta.
The metrics store contains metrics at the cluster, node, pipeline, and processor levels under
rally-metrics*, and summarised cluster level results underrally-results*.TODO:
Sample output
$ esrally race --track-path=rally/tracks/ingest-pipelines --telemetry ingest-pipeline-stats --kill-running-processes ____ ____ / __ \____ _/ / /_ __ / /_/ / __ `/ / / / / / / _, _/ /_/ / / / /_/ / /_/ |_|\__,_/_/_/\__, / /____/ [INFO] Race id is [4121d7ec-aee4-405e-a596-84dfa03a89e8] [INFO] Preparing for race ... [INFO] Racing on track [ingest-pipelines], challenge [bulk-to-pipeline-with-1001-failures] and car ['defaults'] with version [8.0.0-SNAPSHOT]. Running put-pipeline [100% done] Running bulk [100% done] ------------------------------------------------------ _______ __ _____ / ____(_)___ ____ _/ / / ___/_________ ________ / /_ / / __ \/ __ `/ / \__ \/ ___/ __ \/ ___/ _ \ / __/ / / / / / /_/ / / ___/ / /__/ /_/ / / / __/ /_/ /_/_/ /_/\__,_/_/ /____/\___/\____/_/ \___/ ------------------------------------------------------ | Metric | Task | Value | Unit | |---------------------------------------------------------------:|-------:|------------:|-------:| | Cumulative indexing time of primary shards | | 0.0370333 | min | | Min cumulative indexing time across primary shards | | 0.0370333 | min | | Median cumulative indexing time across primary shards | | 0.0370333 | min | | Max cumulative indexing time across primary shards | | 0.0370333 | min | | Cumulative indexing throttle time of primary shards | | 0 | min | | Min cumulative indexing throttle time across primary shards | | 0 | min | | Median cumulative indexing throttle time across primary shards | | 0 | min | | Max cumulative indexing throttle time across primary shards | | 0 | min | | Cumulative merge time of primary shards | | 0 | min | | Cumulative merge count of primary shards | | 0 | | | Min cumulative merge time across primary shards | | 0 | min | | Median cumulative merge time across primary shards | | 0 | min | | Max cumulative merge time across primary shards | | 0 | min | | Cumulative merge throttle time of primary shards | | 0 | min | | Min cumulative merge throttle time across primary shards | | 0 | min | | Median cumulative merge throttle time across primary shards | | 0 | min | | Max cumulative merge throttle time across primary shards | | 0 | min | | Cumulative refresh time of primary shards | | 0.0272833 | min | | Cumulative refresh count of primary shards | | 14 | | | Min cumulative refresh time across primary shards | | 0.0272833 | min | | Median cumulative refresh time across primary shards | | 0.0272833 | min | | Max cumulative refresh time across primary shards | | 0.0272833 | min | | Cumulative flush time of primary shards | | 0.0305667 | min | | Cumulative flush count of primary shards | | 3 | | | Min cumulative flush time across primary shards | | 0.0305667 | min | | Median cumulative flush time across primary shards | | 0.0305667 | min | | Max cumulative flush time across primary shards | | 0.0305667 | min | | Total Young Gen GC time | | 0.061 | s | | Total Young Gen GC count | | 1 | | | Total Old Gen GC time | | 0 | s | | Total Old Gen GC count | | 0 | | | Store size | | 0.0395972 | GB | | Translog size | | 5.12227e-08 | GB | | Heap used for segments | | 0 | MB | | Heap used for doc values | | 0 | MB | | Heap used for terms | | 0 | MB | | Heap used for norms | | 0 | MB | | Heap used for points | | 0 | MB | | Heap used for stored fields | | 0 | MB | | Segment count | | 7 | | +| Total Ingest Pipeline count | | 1001 | | +| Total Ingest Pipeline time | | 80 | ms | +| Total Ingest Pipeline failed | | 1001 | | | Min Throughput | bulk | 3034.45 | docs/s | | Mean Throughput | bulk | 3034.45 | docs/s | | Median Throughput | bulk | 3034.45 | docs/s | | Max Throughput | bulk | 3034.45 | docs/s | | 50th percentile latency | bulk | 15.2215 | ms | | 90th percentile latency | bulk | 38.0702 | ms | | 100th percentile latency | bulk | 55.3849 | ms | | 50th percentile service time | bulk | 15.2215 | ms | | 90th percentile service time | bulk | 38.0702 | ms | | 100th percentile service time | bulk | 55.3849 | ms | | error rate | bulk | 100 | % | [WARNING] Error rate is 100.0 for operation 'bulk'. Please check the logs. --------------------------------- [INFO] SUCCESS (took 525 seconds) ---------------------------------Sample comparison
$ esrally compare --baseline 4121d7ec-aee4-405e-a596-84dfa03a89e8 --contender c6548660-67b7-4263-9591-4c16c04756bb ____ ____ / __ \____ _/ / /_ __ / /_/ / __ `/ / / / / / / _, _/ /_/ / / / /_/ / /_/ |_|\__,_/_/_/\__, / /____/ Comparing baseline Race ID: 4121d7ec-aee4-405e-a596-84dfa03a89e8 Race timestamp: 2022-01-14 05:26:20 Challenge: bulk-to-pipeline-with-1001-failures Car: defaults with contender Race ID: c6548660-67b7-4263-9591-4c16c04756bb Race timestamp: 2022-01-14 05:36:28 Challenge: bulk-to-pipeline-with-1001-failures Car: defaults ------------------------------------------------------ _______ __ _____ / ____(_)___ ____ _/ / / ___/_________ ________ / /_ / / __ \/ __ `/ / \__ \/ ___/ __ \/ ___/ _ \ / __/ / / / / / /_/ / / ___/ / /__/ /_/ / / / __/ /_/ /_/_/ /_/\__,_/_/ /____/\___/\____/_/ \___/ ------------------------------------------------------ | Metric | Task | Baseline | Contender | Diff | Unit | Diff % | |--------------------------------------------------------------:|-------:|------------:|------------:|---------:|-------:|---------:| | Cumulative indexing time of primary shards | | 0.0370333 | 0.0254167 | -0.01162 | min | -31.37% | | Min cumulative indexing time across primary shard | | 0.0370333 | 0.0254167 | -0.01162 | min | -31.37% | | Median cumulative indexing time across primary shard | | 0.0370333 | 0.0254167 | -0.01162 | min | -31.37% | | Max cumulative indexing time across primary shard | | 0.0370333 | 0.0254167 | -0.01162 | min | -31.37% | | Cumulative indexing throttle time of primary shards | | 0 | 0 | 0 | min | 0.00% | | Min cumulative indexing throttle time across primary shard | | 0 | 0 | 0 | min | 0.00% | | Median cumulative indexing throttle time across primary shard | | 0 | 0 | 0 | min | 0.00% | | Max cumulative indexing throttle time across primary shard | | 0 | 0 | 0 | min | 0.00% | | Cumulative merge time of primary shards | | 0 | 0 | 0 | min | 0.00% | | Cumulative merge count of primary shards | | 0 | 0 | 0 | | 0.00% | | Min cumulative merge time across primary shard | | 0 | 0 | 0 | min | 0.00% | | Median cumulative merge time across primary shard | | 0 | 0 | 0 | min | 0.00% | | Max cumulative merge time across primary shard | | 0 | 0 | 0 | min | 0.00% | | Cumulative merge throttle time of primary shards | | 0 | 0 | 0 | min | 0.00% | | Min cumulative merge throttle time across primary shard | | 0 | 0 | 0 | min | 0.00% | | Median cumulative merge throttle time across primary shard | | 0 | 0 | 0 | min | 0.00% | | Max cumulative merge throttle time across primary shard | | 0 | 0 | 0 | min | 0.00% | | Cumulative refresh time of primary shards | | 0.0272833 | 0.02825 | 0.00097 | min | +3.54% | | Cumulative refresh count of primary shards | | 14 | 13 | -1 | | -7.14% | | Min cumulative refresh time across primary shard | | 0.0272833 | 0.02825 | 0.00097 | min | +3.54% | | Median cumulative refresh time across primary shard | | 0.0272833 | 0.02825 | 0.00097 | min | +3.54% | | Max cumulative refresh time across primary shard | | 0.0272833 | 0.02825 | 0.00097 | min | +3.54% | | Cumulative flush time of primary shards | | 0.0305667 | 0.0214 | -0.00917 | min | -29.99% | | Cumulative flush count of primary shards | | 3 | 3 | 0 | | 0.00% | | Min cumulative flush time across primary shard | | 0.0305667 | 0.0214 | -0.00917 | min | -29.99% | | Median cumulative flush time across primary shard | | 0.0305667 | 0.0214 | -0.00917 | min | -29.99% | | Max cumulative flush time across primary shard | | 0.0305667 | 0.0214 | -0.00917 | min | -29.99% | | Total Young Gen GC time | | 0.061 | 0.022 | -0.039 | s | -63.93% | | Total Young Gen GC count | | 1 | 1 | 0 | | 0.00% | | Total Old Gen GC time | | 0 | 0 | 0 | s | 0.00% | | Total Old Gen GC count | | 0 | 0 | 0 | | 0.00% | | Store size | | 0.0395972 | 0.0395972 | 0 | GB | 0.00% | | Translog size | | 5.12227e-08 | 5.12227e-08 | 0 | GB | 0.00% | | Heap used for segments | | 0 | 0 | 0 | MB | 0.00% | | Heap used for doc values | | 0 | 0 | 0 | MB | 0.00% | | Heap used for terms | | 0 | 0 | 0 | MB | 0.00% | | Heap used for norms | | 0 | 0 | 0 | MB | 0.00% | | Heap used for points | | 0 | 0 | 0 | MB | 0.00% | | Heap used for stored fields | | 0 | 0 | 0 | MB | 0.00% | | Segment count | | 7 | 7 | 0 | | 0.00% | | Total Ingest Pipeline count | | 1001 | 1001 | 0 | | 0.00% | +| Total Ingest Pipeline time | | 80 | 30 | -50 | ms | -62.50% | | Total Ingest Pipeline failed | | 1001 | 1001 | 0 | | 0.00% | | Min Throughput | bulk | 3034.45 | 6486.2 | 3451.75 | docs/s | +113.75% | | Mean Throughput | bulk | 3034.45 | 6486.2 | 3451.75 | docs/s | +113.75% | | Median Throughput | bulk | 3034.45 | 6486.2 | 3451.75 | docs/s | +113.75% | | Max Throughput | bulk | 3034.45 | 6486.2 | 3451.75 | docs/s | +113.75% | | 50th percentile latency | bulk | 15.2215 | 5.89986 | -9.32163 | ms | -61.24% | | 90th percentile latency | bulk | 38.0702 | 16.4493 | -21.6209 | ms | -56.79% | | 100th percentile latency | bulk | 55.3849 | 26.5535 | -28.8315 | ms | -52.06% | | 50th percentile service time | bulk | 15.2215 | 5.89986 | -9.32163 | ms | -61.24% | | 90th percentile service time | bulk | 38.0702 | 16.4493 | -21.6209 | ms | -56.79% | | 100th percentile service time | bulk | 55.3849 | 26.5535 | -28.8315 | ms | -52.06% | | error rate | bulk | 100 | 100 | 0 | % | 0.00% | ------------------------------- [INFO] SUCCESS (took 0 seconds) -------------------------------Sample track
ingest-pipelines.zip
Closes #1365