Elasticsearch version (bin/elasticsearch --version): 7.9.0
JVM version (java -version): bundled 14.0.1
Description of the problem including expected versus actual behavior:
In an ongoing project, we are using continuous transforms with date_histogram in pivot for metrics aggregation from raw data over time. We also use ILM policies for target index rollover and old metrics data removal.
We noticed that duplicate records for the same timestamp and same values of other pivot 'dimensions' are generated by transform after target index rollover. This behaviour is incorrect, no such duplicates should be generated.
Logs and source code analysis shows that:
-
Transform implementation causes date_histogram to produce buckets for an interval intersecting right bound of time range processed by current checkpoint. This causes transform to produce incomplete aggregates for this interval, as source documents are filtered based on time range. Such 'incomplete' records are then inserted into target index.
-
Transform implementation also rounds down the left bound of processed time range to the nearest date_histogram interval when computing aggregates. This causes 'incomplete' records produced by one checkpoint to be overwritten by 'complete' ones generated by the next checkpoint, leading to multiple upserts per checkpoint.
-
When index rollover occurs between two checkpoints, these upserts become inserts into newly created index (via write alias), leading to data duplication.
Steps to reproduce:
-
Setup Elasticsearch artifacts as described in setup_artifacts.txt
-
Extract provided data_generator.zip and run data generation script (requires Python 3 and elasticsearch-py):
python3 data_generator.py
-
Create index patterns raw_data-* and agg_data-* in Kibana for easy visualization. Switch to Discover view and select agg_data-* index pattern.
-
Wait for index rollover and subsequent transform execution / checkpoint. Notice significant increase in number of documents in one 1m interval near rollover timestamp (it will be shifted to the past due to transform frequency and delay).

-
Explore 1m interval with increased document count. Notice that nearly all {@timestamp, x, y} combinations occur twice in this interval, with different _index field values. Notice that documents created in newer index will always have count greater or equal to duplicated document in older index due to upserts (as mentioned in problem description).

-
For more in-depth analysis, enable trace logging for org.elasticsearch.xpack.transform. Capture and examine logs of transform execution around index rollover. Notice how generated queries first produce 'incomplete' records and then attempt to overwrite them with complete data during next execution. Cross-examine logs with source code.
Proposed solution:
My approach would be to eliminate 'incomplete' records from the whole process. This should solve duplication issue and also prevent upserts that currently occur for every checkpoint. One way to achieve this is to ensure that the right bound of time range processed within checkpoint is rounded down to nearest date_histogram interval.
As an experiment, I have implemented a custom CheckpointProvider that produces checkpoints aligned to given interval. I have modified TransformService so that this IntervalBasedCheckpointProvider is instantiated whenever date_histogram is present in pivot definition, using histogram's interval. Current simplified implementation supports only fixed intervals.
Initial results are encouraging - no duplicates are generated when running reproduction scenario. It also seems to work correctly for our project's metrics aggregation transforms. However, adding a new CheckpointProvider seems somewhat excessive / not entirely correct. Maybe introducing fine-grained modifications in query building logic of TransformIndexer and related classes would be a better idea?
Since my experience with Elasticsearch source code is limited, any feedback / suggestions would be greatly appreciated.
Elasticsearch version (
bin/elasticsearch --version): 7.9.0JVM version (
java -version): bundled 14.0.1Description of the problem including expected versus actual behavior:
In an ongoing project, we are using continuous transforms with
date_histogramin pivot for metrics aggregation from raw data over time. We also use ILM policies for target index rollover and old metrics data removal.We noticed that duplicate records for the same timestamp and same values of other pivot 'dimensions' are generated by transform after target index rollover. This behaviour is incorrect, no such duplicates should be generated.
Logs and source code analysis shows that:
Transform implementation causes
date_histogramto produce buckets for an interval intersecting right bound of time range processed by current checkpoint. This causes transform to produce incomplete aggregates for this interval, as source documents are filtered based on time range. Such 'incomplete' records are then inserted into target index.Transform implementation also rounds down the left bound of processed time range to the nearest
date_histograminterval when computing aggregates. This causes 'incomplete' records produced by one checkpoint to be overwritten by 'complete' ones generated by the next checkpoint, leading to multiple upserts per checkpoint.When index rollover occurs between two checkpoints, these upserts become inserts into newly created index (via write alias), leading to data duplication.
Steps to reproduce:
Setup Elasticsearch artifacts as described in setup_artifacts.txt
Extract provided data_generator.zip and run data generation script (requires Python 3 and elasticsearch-py):
python3 data_generator.pyCreate index patterns
raw_data-*andagg_data-*in Kibana for easy visualization. Switch toDiscoverview and selectagg_data-*index pattern.Wait for index rollover and subsequent transform execution / checkpoint. Notice significant increase in number of documents in one

1minterval near rollover timestamp (it will be shifted to the past due to transform frequency and delay).Explore

1minterval with increased document count. Notice that nearly all{@timestamp, x, y}combinations occur twice in this interval, with different_indexfield values. Notice that documents created in newer index will always havecountgreater or equal to duplicated document in older index due to upserts (as mentioned in problem description).For more in-depth analysis, enable
tracelogging fororg.elasticsearch.xpack.transform. Capture and examine logs of transform execution around index rollover. Notice how generated queries first produce 'incomplete' records and then attempt to overwrite them with complete data during next execution. Cross-examine logs with source code.Proposed solution:
My approach would be to eliminate 'incomplete' records from the whole process. This should solve duplication issue and also prevent upserts that currently occur for every checkpoint. One way to achieve this is to ensure that the right bound of time range processed within checkpoint is rounded down to nearest
date_histograminterval.As an experiment, I have implemented a custom
CheckpointProviderthat produces checkpoints aligned to given interval. I have modifiedTransformServiceso that thisIntervalBasedCheckpointProvideris instantiated wheneverdate_histogramis present in pivot definition, using histogram's interval. Current simplified implementation supports only fixed intervals.Initial results are encouraging - no duplicates are generated when running reproduction scenario. It also seems to work correctly for our project's metrics aggregation transforms. However, adding a new
CheckpointProviderseems somewhat excessive / not entirely correct. Maybe introducing fine-grained modifications in query building logic ofTransformIndexerand related classes would be a better idea?Since my experience with Elasticsearch source code is limited, any feedback / suggestions would be greatly appreciated.