Skip to content

Add Ingest Pipeline Telemetry device#1416

Merged
b-deam merged 12 commits intoelastic:masterfrom
b-deam:ip-telemetry-device
Jan 27, 2022
Merged

Add Ingest Pipeline Telemetry device#1416
b-deam merged 12 commits intoelastic:masterfrom
b-deam:ip-telemetry-device

Conversation

@b-deam
Copy link
Copy Markdown
Member

@b-deam b-deam commented Jan 14, 2022

With this commit we had a Ingest Pipeline stats telemetry device. This
telemetry device utilises the node stats API, but rather than sampling
these statistics throughout the benchmark (i.e. node stats telemetry
device) we collect an output at the beginning and end, and report only the
delta.

The metrics store contains metrics at the cluster, node, pipeline, and processor levels under rally-metrics*, and summarised cluster level results under rally-results*.

TODO:

  • Add documentation
Sample output
$ esrally race --track-path=rally/tracks/ingest-pipelines --telemetry ingest-pipeline-stats --kill-running-processes

    ____        ____
   / __ \____ _/ / /_  __
  / /_/ / __ `/ / / / / /
 / _, _/ /_/ / / / /_/ /
/_/ |_|\__,_/_/_/\__, /
                /____/

[INFO] Race id is [4121d7ec-aee4-405e-a596-84dfa03a89e8]
[INFO] Preparing for race ...
[INFO] Racing on track [ingest-pipelines], challenge [bulk-to-pipeline-with-1001-failures] and car ['defaults'] with version [8.0.0-SNAPSHOT].

Running put-pipeline                                                           [100% done]
Running bulk                                                                   [100% done]

------------------------------------------------------
    _______             __   _____
   / ____(_)___  ____ _/ /  / ___/_________  ________
  / /_  / / __ \/ __ `/ /   \__ \/ ___/ __ \/ ___/ _ \
 / __/ / / / / / /_/ / /   ___/ / /__/ /_/ / /  /  __/
/_/   /_/_/ /_/\__,_/_/   /____/\___/\____/_/   \___/
------------------------------------------------------
            
|                                                         Metric |   Task |       Value |   Unit |
|---------------------------------------------------------------:|-------:|------------:|-------:|
|                     Cumulative indexing time of primary shards |        |   0.0370333 |    min |
|             Min cumulative indexing time across primary shards |        |   0.0370333 |    min |
|          Median cumulative indexing time across primary shards |        |   0.0370333 |    min |
|             Max cumulative indexing time across primary shards |        |   0.0370333 |    min |
|            Cumulative indexing throttle time of primary shards |        |           0 |    min |
|    Min cumulative indexing throttle time across primary shards |        |           0 |    min |
| Median cumulative indexing throttle time across primary shards |        |           0 |    min |
|    Max cumulative indexing throttle time across primary shards |        |           0 |    min |
|                        Cumulative merge time of primary shards |        |           0 |    min |
|                       Cumulative merge count of primary shards |        |           0 |        |
|                Min cumulative merge time across primary shards |        |           0 |    min |
|             Median cumulative merge time across primary shards |        |           0 |    min |
|                Max cumulative merge time across primary shards |        |           0 |    min |
|               Cumulative merge throttle time of primary shards |        |           0 |    min |
|       Min cumulative merge throttle time across primary shards |        |           0 |    min |
|    Median cumulative merge throttle time across primary shards |        |           0 |    min |
|       Max cumulative merge throttle time across primary shards |        |           0 |    min |
|                      Cumulative refresh time of primary shards |        |   0.0272833 |    min |
|                     Cumulative refresh count of primary shards |        |          14 |        |
|              Min cumulative refresh time across primary shards |        |   0.0272833 |    min |
|           Median cumulative refresh time across primary shards |        |   0.0272833 |    min |
|              Max cumulative refresh time across primary shards |        |   0.0272833 |    min |
|                        Cumulative flush time of primary shards |        |   0.0305667 |    min |
|                       Cumulative flush count of primary shards |        |           3 |        |
|                Min cumulative flush time across primary shards |        |   0.0305667 |    min |
|             Median cumulative flush time across primary shards |        |   0.0305667 |    min |
|                Max cumulative flush time across primary shards |        |   0.0305667 |    min |
|                                        Total Young Gen GC time |        |       0.061 |      s |
|                                       Total Young Gen GC count |        |           1 |        |
|                                          Total Old Gen GC time |        |           0 |      s |
|                                         Total Old Gen GC count |        |           0 |        |
|                                                     Store size |        |   0.0395972 |     GB |
|                                                  Translog size |        | 5.12227e-08 |     GB |
|                                         Heap used for segments |        |           0 |     MB |
|                                       Heap used for doc values |        |           0 |     MB |
|                                            Heap used for terms |        |           0 |     MB |
|                                            Heap used for norms |        |           0 |     MB |
|                                           Heap used for points |        |           0 |     MB |
|                                    Heap used for stored fields |        |           0 |     MB |
|                                                  Segment count |        |           7 |        |
+|                                    Total Ingest Pipeline count |        |        1001 |        |
+|                                     Total Ingest Pipeline time |        |          80 |     ms |
+|                                   Total Ingest Pipeline failed |        |        1001 |        |
|                                                 Min Throughput |   bulk |     3034.45 | docs/s |
|                                                Mean Throughput |   bulk |     3034.45 | docs/s |
|                                              Median Throughput |   bulk |     3034.45 | docs/s |
|                                                 Max Throughput |   bulk |     3034.45 | docs/s |
|                                        50th percentile latency |   bulk |     15.2215 |     ms |
|                                        90th percentile latency |   bulk |     38.0702 |     ms |
|                                       100th percentile latency |   bulk |     55.3849 |     ms |
|                                   50th percentile service time |   bulk |     15.2215 |     ms |
|                                   90th percentile service time |   bulk |     38.0702 |     ms |
|                                  100th percentile service time |   bulk |     55.3849 |     ms |
|                                                     error rate |   bulk |         100 |      % |

[WARNING] Error rate is 100.0 for operation 'bulk'. Please check the logs.

---------------------------------
[INFO] SUCCESS (took 525 seconds)
---------------------------------
Sample comparison
$ esrally compare --baseline  4121d7ec-aee4-405e-a596-84dfa03a89e8 --contender c6548660-67b7-4263-9591-4c16c04756bb

    ____        ____
   / __ \____ _/ / /_  __
  / /_/ / __ `/ / / / / /
 / _, _/ /_/ / / / /_/ /
/_/ |_|\__,_/_/_/\__, /
                /____/


Comparing baseline
  Race ID: 4121d7ec-aee4-405e-a596-84dfa03a89e8
  Race timestamp: 2022-01-14 05:26:20
  Challenge: bulk-to-pipeline-with-1001-failures
  Car: defaults

with contender
  Race ID: c6548660-67b7-4263-9591-4c16c04756bb
  Race timestamp: 2022-01-14 05:36:28
  Challenge: bulk-to-pipeline-with-1001-failures
  Car: defaults

------------------------------------------------------
    _______             __   _____
   / ____(_)___  ____ _/ /  / ___/_________  ________
  / /_  / / __ \/ __ `/ /   \__ \/ ___/ __ \/ ___/ _ \
 / __/ / / / / / /_/ / /   ___/ / /__/ /_/ / /  /  __/
/_/   /_/_/ /_/\__,_/_/   /____/\___/\____/_/   \___/
------------------------------------------------------
            
|                                                        Metric |   Task |    Baseline |   Contender |     Diff |   Unit |   Diff % |
|--------------------------------------------------------------:|-------:|------------:|------------:|---------:|-------:|---------:|
|                    Cumulative indexing time of primary shards |        |   0.0370333 |   0.0254167 | -0.01162 |    min |  -31.37% |
|             Min cumulative indexing time across primary shard |        |   0.0370333 |   0.0254167 | -0.01162 |    min |  -31.37% |
|          Median cumulative indexing time across primary shard |        |   0.0370333 |   0.0254167 | -0.01162 |    min |  -31.37% |
|             Max cumulative indexing time across primary shard |        |   0.0370333 |   0.0254167 | -0.01162 |    min |  -31.37% |
|           Cumulative indexing throttle time of primary shards |        |           0 |           0 |        0 |    min |    0.00% |
|    Min cumulative indexing throttle time across primary shard |        |           0 |           0 |        0 |    min |    0.00% |
| Median cumulative indexing throttle time across primary shard |        |           0 |           0 |        0 |    min |    0.00% |
|    Max cumulative indexing throttle time across primary shard |        |           0 |           0 |        0 |    min |    0.00% |
|                       Cumulative merge time of primary shards |        |           0 |           0 |        0 |    min |    0.00% |
|                      Cumulative merge count of primary shards |        |           0 |           0 |        0 |        |    0.00% |
|                Min cumulative merge time across primary shard |        |           0 |           0 |        0 |    min |    0.00% |
|             Median cumulative merge time across primary shard |        |           0 |           0 |        0 |    min |    0.00% |
|                Max cumulative merge time across primary shard |        |           0 |           0 |        0 |    min |    0.00% |
|              Cumulative merge throttle time of primary shards |        |           0 |           0 |        0 |    min |    0.00% |
|       Min cumulative merge throttle time across primary shard |        |           0 |           0 |        0 |    min |    0.00% |
|    Median cumulative merge throttle time across primary shard |        |           0 |           0 |        0 |    min |    0.00% |
|       Max cumulative merge throttle time across primary shard |        |           0 |           0 |        0 |    min |    0.00% |
|                     Cumulative refresh time of primary shards |        |   0.0272833 |     0.02825 |  0.00097 |    min |   +3.54% |
|                    Cumulative refresh count of primary shards |        |          14 |          13 |       -1 |        |   -7.14% |
|              Min cumulative refresh time across primary shard |        |   0.0272833 |     0.02825 |  0.00097 |    min |   +3.54% |
|           Median cumulative refresh time across primary shard |        |   0.0272833 |     0.02825 |  0.00097 |    min |   +3.54% |
|              Max cumulative refresh time across primary shard |        |   0.0272833 |     0.02825 |  0.00097 |    min |   +3.54% |
|                       Cumulative flush time of primary shards |        |   0.0305667 |      0.0214 | -0.00917 |    min |  -29.99% |
|                      Cumulative flush count of primary shards |        |           3 |           3 |        0 |        |    0.00% |
|                Min cumulative flush time across primary shard |        |   0.0305667 |      0.0214 | -0.00917 |    min |  -29.99% |
|             Median cumulative flush time across primary shard |        |   0.0305667 |      0.0214 | -0.00917 |    min |  -29.99% |
|                Max cumulative flush time across primary shard |        |   0.0305667 |      0.0214 | -0.00917 |    min |  -29.99% |
|                                       Total Young Gen GC time |        |       0.061 |       0.022 |   -0.039 |      s |  -63.93% |
|                                      Total Young Gen GC count |        |           1 |           1 |        0 |        |    0.00% |
|                                         Total Old Gen GC time |        |           0 |           0 |        0 |      s |    0.00% |
|                                        Total Old Gen GC count |        |           0 |           0 |        0 |        |    0.00% |
|                                                    Store size |        |   0.0395972 |   0.0395972 |        0 |     GB |    0.00% |
|                                                 Translog size |        | 5.12227e-08 | 5.12227e-08 |        0 |     GB |    0.00% |
|                                        Heap used for segments |        |           0 |           0 |        0 |     MB |    0.00% |
|                                      Heap used for doc values |        |           0 |           0 |        0 |     MB |    0.00% |
|                                           Heap used for terms |        |           0 |           0 |        0 |     MB |    0.00% |
|                                           Heap used for norms |        |           0 |           0 |        0 |     MB |    0.00% |
|                                          Heap used for points |        |           0 |           0 |        0 |     MB |    0.00% |
|                                   Heap used for stored fields |        |           0 |           0 |        0 |     MB |    0.00% |
|                                                 Segment count |        |           7 |           7 |        0 |        |    0.00% |
|                                   Total Ingest Pipeline count |        |        1001 |        1001 |        0 |        |    0.00% |
+|                                    Total Ingest Pipeline time |        |          80 |          30 |      -50 |     ms |  -62.50% |
|                                  Total Ingest Pipeline failed |        |        1001 |        1001 |        0 |        |    0.00% |
|                                                Min Throughput |   bulk |     3034.45 |      6486.2 |  3451.75 | docs/s | +113.75% |
|                                               Mean Throughput |   bulk |     3034.45 |      6486.2 |  3451.75 | docs/s | +113.75% |
|                                             Median Throughput |   bulk |     3034.45 |      6486.2 |  3451.75 | docs/s | +113.75% |
|                                                Max Throughput |   bulk |     3034.45 |      6486.2 |  3451.75 | docs/s | +113.75% |
|                                       50th percentile latency |   bulk |     15.2215 |     5.89986 | -9.32163 |     ms |  -61.24% |
|                                       90th percentile latency |   bulk |     38.0702 |     16.4493 | -21.6209 |     ms |  -56.79% |
|                                      100th percentile latency |   bulk |     55.3849 |     26.5535 | -28.8315 |     ms |  -52.06% |
|                                  50th percentile service time |   bulk |     15.2215 |     5.89986 | -9.32163 |     ms |  -61.24% |
|                                  90th percentile service time |   bulk |     38.0702 |     16.4493 | -21.6209 |     ms |  -56.79% |
|                                 100th percentile service time |   bulk |     55.3849 |     26.5535 | -28.8315 |     ms |  -52.06% |
|                                                    error rate |   bulk |         100 |         100 |        0 |      % |    0.00% |


-------------------------------
[INFO] SUCCESS (took 0 seconds)
-------------------------------
Sample track

ingest-pipelines.zip

Closes #1365

@b-deam b-deam added enhancement Improves the status quo :Metrics How metrics are stored, calculated or aggregated :Usability Makes Rally easier to use :Telemetry Telemetry Devices that gather additional metrics highlight A substantial improvement that is worth mentioning separately in release notes labels Jan 14, 2022
@b-deam b-deam self-assigned this Jan 14, 2022
Copy link
Copy Markdown
Contributor

@DJRickyB DJRickyB left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it so far! I tried it against the ingest-heavy Logging solution track, and poked around the rally-metrics-* data to get a better sense of how the dimensions were being persisted in the metric data and it looks good.

I also tried out running the benchmark multiple times against a persistent cluster and was happy to confirm the final stat is differential as you intended, great work. Looking forward to the documentation, please re-request my review at that point

Comment thread esrally/telemetry.py Outdated
Comment thread esrally/telemetry.py Outdated
Comment thread esrally/telemetry.py Outdated
Comment thread esrally/telemetry.py Outdated
Comment thread tests/telemetry_test.py Outdated
Comment thread tests/telemetry_test.py Outdated
Comment thread tests/telemetry_test.py Outdated
Comment thread tests/telemetry_test.py Outdated
Comment thread esrally/telemetry.py Outdated
Comment thread esrally/telemetry.py Outdated
@b-deam
Copy link
Copy Markdown
Member Author

b-deam commented Jan 17, 2022

Thanks @DJRickyB - I addressed the comments, added docs, and also added cluster metadata to each of the outputs (node, pipeline, processor levels).

@b-deam b-deam requested a review from DJRickyB January 17, 2022 00:29
Copy link
Copy Markdown
Member

@pquentin pquentin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two small nits I've noticed on my first read

Comment thread esrally/reporter.py Outdated
Comment thread esrally/reporter.py Outdated
Comment thread esrally/telemetry.py
self.metrics_store.put_doc(doc, level=MetaInfoScope.cluster, meta_data=data_stream_metadata)


class IngestPipelineStats(InternalTelemetryDevice):
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed with @pquentin - it probably makes most sense for this to be enabled by default given there's 0 risk of results skew with the current approach of collecting before and after a benchmark.

@pquentin pquentin self-requested a review January 18, 2022 05:53
Copy link
Copy Markdown
Member

@pquentin pquentin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for iterating. I tested it, it works great, thanks! I think we can merge after this last round.

Comment thread docs/telemetry.rst Outdated
Comment thread docs/telemetry.rst
Comment thread esrally/reporter.py Outdated
Comment thread esrally/telemetry.py Outdated
Comment thread esrally/telemetry.py Outdated
Comment thread esrally/telemetry.py
@b-deam b-deam requested a review from pquentin January 19, 2022 01:42
Comment thread esrally/reporter.py Outdated
def _report_ingest_pipeline_stats(self, stats):
return self._join(
self._line("Total Ingest Pipeline count", "", stats.ingest_pipeline_cluster_count, ""),
self._line("Total Ingest Pipeline time", "", stats.ingest_pipeline_cluster_time, "ms", convert.ms_to_seconds),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
self._line("Total Ingest Pipeline time", "", stats.ingest_pipeline_cluster_time, "ms", convert.ms_to_seconds),
self._line("Total Ingest Pipeline time", "", stats.ingest_pipeline_cluster_time, "s", convert.ms_to_seconds),

Comment thread esrally/telemetry.py
summaries = {}
for cluster_name in self.specified_cluster_names:
try:
ingest_stats = self.clients[cluster_name].nodes.stats(metric="ingest")
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just an FYI @sethmlarson - ingest is not documented as a valid param in the Python client docs:

metric – Limit the information returned to the specified metrics Valid choices: _all, breaker, fs, http, indices, jvm, os, process, thread_pool, transport, discovery, indexing_pressure

I can submit a PR to fix this, unless there's a specific reason it is undocumented.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@b-deam
Copy link
Copy Markdown
Member Author

b-deam commented Jan 20, 2022

Funny, after all this testing I didn't actually catch that the current logic flow will never report metrics for pipelines created as part of the track. Because they don't exist at the beginning, we end up skipping them.. e.g.:

$ cat ~/.rally/logs/rally.log | grep -i ingest
2022-01-20 04:21:39,814 ActorAddr-(T|:36035)/PID:29921 esrally.telemetry INFO Gathering Ingest Pipeline stats at benchmark start
2022-01-20 05:18:35,694 ActorAddr-(T|:36035)/PID:29921 esrally.telemetry INFO Gathering Ingest Pipeline stats at benchmark end
2022-01-20 05:18:35,702 ActorAddr-(T|:36035)/PID:29921 esrally.telemetry WARNING Cannot determine Ingest Pipeline stats for logs-k8-app (pipeline was not defined at the of the benchmark).
2022-01-20 05:18:35,703 ActorAddr-(T|:36035)/PID:29921 esrally.telemetry WARNING Cannot determine Ingest Pipeline stats for logs-system.syslog-1.6.4 (pipeline was not defined at the of the benchmark).
2022-01-20 05:18:35,703 ActorAddr-(T|:36035)/PID:29921 esrally.telemetry WARNING Cannot determine Ingest Pipeline stats for logs-redis.slowlog-1.1.0 (pipeline was not defined at the of the benchmark).
[...]

Luckily this should be a relatively simple fix - I managed to get a quick local test working with:

$ git diff
diff --git a/esrally/telemetry.py b/esrally/telemetry.py
index 1f19bcd..05c6357 100644
--- a/esrally/telemetry.py
+++ b/esrally/telemetry.py
@@ -1450,21 +1450,12 @@ class IngestPipelineStats(InternalTelemetryDevice):
                     )
                     continue
                 for summary_name, stats in summaries.items():
-                    if summary_name not in self.start_stats[cluster_name][node_name]:
-                        self.logger.warning("'%s' is not an expected field for Ingest Pipeline stats (skipping)", summary_name)
-                        continue
                     if summary_name == "total":
                         # The top level "total" contains stats for the node as a whole,
                         # each node will have exactly one top level "total" key
                         self._record_node_level_pipeline_stats(stats, cluster_name, node_name)
                     elif summary_name == "pipelines":
                         for pipeline_name, pipeline in stats.items():
-                            if pipeline_name not in self.start_stats[cluster_name][node_name]["pipelines"]:
-                                self.logger.warning(
-                                    "Cannot determine Ingest Pipeline stats for %s (pipeline was not defined at the of the benchmark).",
-                                    pipeline_name,
-                                )
-                                continue
                             self._record_pipeline_level_processor_stats(pipeline, pipeline_name, cluster_name, node_name)
 
             self._record_cluster_level_pipeline_stats(cluster_name)
@@ -1500,15 +1491,13 @@ class IngestPipelineStats(InternalTelemetryDevice):
 
     def _record_pipeline_level_processor_stats(self, pipeline, pipeline_name, cluster_name, node_name):
         for processor_name, processor_stats in pipeline.items():
-            start_stats_processors = self.start_stats[cluster_name][node_name]["pipelines"][pipeline_name]
+            try:
+                start_stats_processors = self.start_stats[cluster_name][node_name]["pipelines"][pipeline_name]
+            except KeyError:
+                # Pipeline was likely created by the track itself
+                start_stats_processors = {}
+                start_stats_processors[processor_name] = {}
 
-            if processor_name not in start_stats_processors:
-                self.logger.warning(
-                    "Cannot determine Ingest Pipeline stats in %s for %s (processor was not defined at the start of the benchmark).",
-                    pipeline_name,
-                    processor_name,
-                )
-                continue
             # We have an individual processor obj, which contains the stats for each individual processor
             if processor_name != "total":

And to test (using the attached track):

$ curl -XDELETE '127.0.0.1:39200/_ingest/pipeline/*' && esrally race --track-path=~/perf/rally/tracks/ingest-pipelines --kill-running-processes

IMO this should also be an additional unit test.

@pquentin pquentin dismissed their stale review January 26, 2022 12:47

I added code to the PR and my review blocks merges

@pquentin
Copy link
Copy Markdown
Member

Thanks Brad, I integrated your change and added a unit test, as discussed offline. This finally fixed things on esbench where the first run does not have data. Since we have two dimensions here (pipeline/processor), it's a nice candidate for a Lens treemap. Here's an example:

image

We can clearly see which pipelines and which processors are slow!

@b-deam
Copy link
Copy Markdown
Member Author

b-deam commented Jan 26, 2022

@pquentin - nice! Thank you for your help.

Are we happy to merge this?

@pquentin
Copy link
Copy Markdown
Member

@b-deam If you're happy, I'm happy :D

@b-deam b-deam merged commit 8025076 into elastic:master Jan 27, 2022
b-deam added a commit that referenced this pull request Feb 3, 2022
#1416 introduced the Ingest Pipeline stats telemetry device,
but did not update our summary report docs to include the new metrics.

This commit adds the missing documentation.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement Improves the status quo highlight A substantial improvement that is worth mentioning separately in release notes :Metrics How metrics are stored, calculated or aggregated :Telemetry Telemetry Devices that gather additional metrics :Usability Makes Rally easier to use

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Measure overhead of ingest pipelines

4 participants