1111import org .elasticsearch .action .admin .indices .create .CreateIndexResponse ;
1212import org .elasticsearch .client .Request ;
1313import org .elasticsearch .client .Response ;
14- import org .elasticsearch .client .ResponseException ;
1514import org .elasticsearch .cluster .metadata .IndexMetadata ;
1615import org .elasticsearch .common .settings .Settings ;
1716import org .elasticsearch .index .IndexFeatures ;
1817import org .elasticsearch .index .IndexMode ;
1918import org .elasticsearch .index .IndexSettings ;
20- import org .elasticsearch .index .IndexVersions ;
2119import org .elasticsearch .test .rest .ESRestTestCase ;
2220import org .elasticsearch .test .rest .ObjectPath ;
2321import org .hamcrest .Matchers ;
@@ -37,47 +35,58 @@ public class TSDBSyntheticIdUpgradeIT extends AbstractLogsdbRollingUpgradeTestCa
3735 public void testRollingUpgrade () throws IOException {
3836 int numNodes = getCluster ().getNumNodes ();
3937 boolean isServerless = isServerless ();
38+ boolean oldClusterHasSyntheticId = oldClusterHasFeature (IndexFeatures .TIME_SERIES_SYNTHETIC_ID );
4039
41- if (oldClusterHasFeature (IndexFeatures .TIME_SERIES_SYNTHETIC_ID )) {
42- // Should be able to create synthetic id index throughout the rolling upgrade
43- String index_0 = indexName (0 );
44- assertIndexCanBeCreated (index_0 );
45- assertCanAddDocuments (index_0 );
46- assertIndexRead (index_0 , isServerless , 1 );
47- clusterRollingUpgrade (i -> {
48- int nextIndexId = i + 1 ;
49- String indexName = indexName (nextIndexId );
50- assertIndexCanBeCreated (indexName );
51- for (int j = 0 ; j <= nextIndexId ; j ++) {
52- assertCanAddDocuments (indexName (j ));
53- assertIndexRead (indexName (j ), isServerless , nextIndexId + 1 - j );
54- }
55- });
56- } else {
57- // Cluster supports synthetic id index after all nodes have been upgraded, not before
58- assertNoWriteIndex (indexName (0 ));
59- clusterRollingUpgrade (i -> {
60- int nextIndexId = i + 1 ;
61- if (nextIndexId == numNodes ) {
62- // Last node has been upgraded, we should be able to write
63- String indexName = indexName (nextIndexId );
64- assertIndexCanBeCreated (indexName );
65- assertCanAddDocuments (indexName );
66- assertIndexRead (indexName (nextIndexId ), isServerless , 1 );
67- } else {
68- assertNoWriteIndex (indexName (nextIndexId ));
69- }
70- });
71- }
40+ // This test upgrade all nodes in the cluster one by one,
41+ // and create one new index in-between every upgrade,
42+ // before upgrades start and after upgrade has finished.
43+ // In every gap we write to all existing indices and test
44+ // that we read the expected number of documents from them.
45+ // The indices will use synthetic_id setting if the cluster
46+ // is expected to have support for it at the time of index
47+ // creation, otherwise it will not.
48+ // If old cluster has support, then cluster will have support
49+ // for it all through the upgrade.
50+ // If old cluster doesn't have support, then cluster will
51+ // only have support after the upgrade is finished, the
52+ // last iteration in clusterRollingUpgrade.
53+
54+ String indexZero = indexName (0 );
55+ assertIndexCanBeCreated (indexZero , oldClusterHasSyntheticId );
56+ assertCanAddDocuments (indexZero );
57+ assertIndexRead (indexZero , isServerless , 1 , oldClusterHasSyntheticId );
58+
59+ clusterRollingUpgrade (i -> {
60+ int nextIndexId = i + 1 ;
61+ String indexName = indexName (nextIndexId );
62+ boolean allNodesUpgraded = nextIndexId == numNodes ;
63+ assertIndexCanBeCreated (indexName , oldClusterHasSyntheticId || allNodesUpgraded );
64+
65+ // For all indices we have created so far
66+ for (int j = 0 ; j <= nextIndexId ; j ++) {
67+ assertCanAddDocuments (indexName (j ));
68+ int expectedNbrOfBatchesInIndex = nextIndexId + 1 - j ;
69+ boolean lastIndex = j == nextIndexId && allNodesUpgraded ;
70+ assertIndexRead (indexName (j ), isServerless , expectedNbrOfBatchesInIndex , oldClusterHasSyntheticId || lastIndex );
71+ }
72+ });
7273 }
7374
74- private static void assertIndexRead (String indexName , boolean isServerless , int nbrOfBatches ) throws IOException {
75+ private static void assertIndexRead (String indexName , boolean isServerless , int nbrOfBatches , boolean expectSyntheticId )
76+ throws IOException {
7577 assertTrue ("Expected index [" + indexName + "] to exist, but did not" , indexExists (indexName ));
7678 Map <String , Object > indexSettingsAsMap = getIndexSettingsAsMap (indexName );
77- assertThat (indexSettingsAsMap .get (IndexSettings .SYNTHETIC_ID .getKey ()), Matchers .equalTo ("true" ));
79+ assertThat (
80+ Boolean .parseBoolean ((String ) indexSettingsAsMap .get (IndexSettings .SYNTHETIC_ID .getKey ())),
81+ Matchers .equalTo (expectSyntheticId )
82+ );
7883 assertDocCount (client (), indexName , (long ) nbrOfBatches * DOC_COUNT );
7984 if (!isServerless ) {
80- assertThat (invertedIndexSize (indexName ), Matchers .equalTo (0 ));
85+ if (expectSyntheticId ) {
86+ assertThat (invertedIndexSize (indexName ), Matchers .equalTo (0 ));
87+ } else {
88+ assertThat (invertedIndexSize (indexName ), Matchers .greaterThan (0 ));
89+ }
8190 }
8291 }
8392
@@ -88,11 +97,12 @@ private static int invertedIndexSize(String indexName) throws IOException {
8897 return objectPath .evaluate (indexName + ".all_fields.inverted_index.total_in_bytes" );
8998 }
9099
91- private void assertIndexCanBeCreated (String indexName ) throws IOException {
100+ private void assertIndexCanBeCreated (String indexName , boolean useSyntheticId ) throws IOException {
92101 logClusterStateBeforeCreate (indexName );
102+ logger .info ("--> Create index {} with synthetic id {}" , indexName , useSyntheticId );
93103 CreateIndexResponse response = null ;
94104 try {
95- response = createSyntheticIdIndex (indexName );
105+ response = createSyntheticIdIndex (indexName , useSyntheticId );
96106 logger .info (
97107 "Create index [{}] response: acknowledged={}, shards_acknowledged={}" ,
98108 indexName ,
@@ -227,7 +237,7 @@ private record ShardSlot(int shardId, boolean primary) {}
227237 /**
228238 * Log master's pending cluster tasks (e.g. cluster state updates). A backlog can delay shard allocation.
229239 */
230- private void logPendingClusterTasks (String indexName , String when ) throws IOException {
240+ private void logPendingClusterTasks (String indexName , String when ) {
231241 try {
232242 Response response = client ().performRequest (new Request ("GET" , "_cluster/pending_tasks" ));
233243 logger .info ("Index [{}] pending cluster tasks {}: {}" , indexName , when , entityAsMap (response ));
@@ -256,29 +266,14 @@ private static void addDocument(StringJoiner joiner, Instant timestamp) {
256266 """ , timestamp , randomByte ()));
257267 }
258268
259- private static void assertNoWriteIndex (String indexName ) throws IOException {
260- String setting = IndexSettings .SYNTHETIC_ID .getKey ();
261- String unknownSetting = "unknown setting [" + setting + "]" ;
262- String versionTooLow = String .format (
263- Locale .ROOT ,
264- "The setting [%s] is only permitted for indexVersion [%s] or later. Current indexVersion:" ,
265- setting ,
266- IndexVersions .TIME_SERIES_USE_SYNTHETIC_ID_94
267- );
268-
269- ResponseException e = assertThrows (ResponseException .class , () -> createSyntheticIdIndex (indexName ));
270- String reason = ObjectPath .createFromResponse (e .getResponse ()).evaluate ("error.reason" );
271-
272- assertThat (reason , Matchers .either (Matchers .containsString (unknownSetting )).or (Matchers .containsString (versionTooLow )));
273- assertThat (e .getMessage (), Matchers .containsString ("illegal_argument_exception" ));
274- }
275-
276- private static CreateIndexResponse createSyntheticIdIndex (String indexName ) throws IOException {
277- Settings settings = Settings .builder ()
278- .put (IndexSettings .SYNTHETIC_ID .getKey (), true )
269+ private static CreateIndexResponse createSyntheticIdIndex (String indexName , boolean useSyntheticId ) throws IOException {
270+ Settings .Builder settingsBuilder = Settings .builder ()
279271 .put (IndexSettings .MODE .getKey (), IndexMode .TIME_SERIES )
280- .put (IndexMetadata .INDEX_ROUTING_PATH .getKey (), "hostname" )
281- .build ();
272+ .put (IndexMetadata .INDEX_ROUTING_PATH .getKey (), "hostname" );
273+ if (useSyntheticId ) {
274+ settingsBuilder .put (IndexSettings .SYNTHETIC_ID .getKey (), true );
275+ }
276+ Settings settings = settingsBuilder .build ();
282277 final var mapping = """
283278 {
284279 "properties": {
0 commit comments