Skip to content

Commit 86ec4d0

Browse files
committed
Sync translog after global checkpoint sync
This commit causes the translog to be synced after every global checkpoint sync and removes syncing of the global checkpoint from the indexing path.
1 parent 19d4db0 commit 86ec4d0

6 files changed

Lines changed: 140 additions & 38 deletions

File tree

core/src/main/java/org/elasticsearch/index/engine/InternalEngine.java

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -167,9 +167,7 @@ public InternalEngine(EngineConfig engineConfig) throws EngineException {
167167
engineConfig.getIndexSettings(),
168168
seqNoStats.getMaxSeqNo(),
169169
seqNoStats.getLocalCheckpoint(),
170-
seqNoStats.getGlobalCheckpoint(),
171-
this::onGlobalCheckpointUpdate
172-
);
170+
seqNoStats.getGlobalCheckpoint());
173171
indexWriter = writer;
174172
translog = openTranslog(engineConfig, writer);
175173
assert translog.getGeneration() != null;
@@ -360,19 +358,6 @@ private static SeqNoStats loadSeqNoStats(final EngineConfig engineConfig, final
360358
return new SeqNoStats(maxSeqNo, localCheckpoint, globalCheckpoint);
361359
}
362360

363-
/**
364-
* Sync the translog after the global checkpoint is updated.
365-
*/
366-
void onGlobalCheckpointUpdate() {
367-
try (ReleasableLock ignored = readLock.acquire()) {
368-
ensureOpen();
369-
translog.sync();
370-
} catch (final IOException e) {
371-
maybeFailEngine("on global checkpoint update", e);
372-
throw new EngineException(shardId, "failed on global checkpoint update", e);
373-
}
374-
}
375-
376361
private SearcherManager createSearcherManager() throws EngineException {
377362
boolean success = false;
378363
SearcherManager searcherManager = null;

core/src/main/java/org/elasticsearch/index/seqno/GlobalCheckpointSyncAction.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import org.elasticsearch.transport.TransportService;
4242

4343
import java.io.IOException;
44+
import java.io.UncheckedIOException;
45+
import java.io.UnsupportedEncodingException;
4446

4547
public class GlobalCheckpointSyncAction extends TransportReplicationAction<GlobalCheckpointSyncAction.PrimaryRequest,
4648
GlobalCheckpointSyncAction.ReplicaRequest, ReplicationResponse> {
@@ -68,6 +70,7 @@ protected PrimaryResult shardOperationOnPrimary(PrimaryRequest request) {
6870
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
6971
IndexShard indexShard = indexService.getShard(request.shardId().id());
7072
long checkpoint = indexShard.getGlobalCheckpoint();
73+
syncTranslog(indexShard);
7174
return new PrimaryResult(new ReplicaRequest(request, checkpoint), new ReplicationResponse());
7275
}
7376

@@ -76,9 +79,18 @@ protected ReplicaResult shardOperationOnReplica(ReplicaRequest request) {
7679
IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex());
7780
IndexShard indexShard = indexService.getShard(request.shardId().id());
7881
indexShard.updateGlobalCheckpointOnReplica(request.checkpoint);
82+
syncTranslog(indexShard);
7983
return new ReplicaResult();
8084
}
8185

86+
private void syncTranslog(final IndexShard indexShard) {
87+
try {
88+
indexShard.getTranslog().sync();
89+
} catch (final IOException e) {
90+
throw new UncheckedIOException("failed to sync translog after updating global checkpoint for shard " + indexShard.shardId(), e);
91+
}
92+
}
93+
8294
public void updateCheckpointForShard(ShardId shardId) {
8395
execute(new PrimaryRequest(shardId), new ActionListener<ReplicationResponse>() {
8496
@Override
@@ -135,4 +147,5 @@ public long getCheckpoint() {
135147
return checkpoint;
136148
}
137149
}
150+
138151
}

core/src/main/java/org/elasticsearch/index/seqno/SequenceNumbersService.java

Lines changed: 11 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,6 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
3838

3939
final LocalCheckpointService localCheckpointService;
4040
final GlobalCheckpointService globalCheckpointService;
41-
private final Runnable onGlobalCheckpointUpdate;
4241

4342
/**
4443
* Initialize the sequence number service. The {@code maxSeqNo} should be set to the last sequence number assigned by this shard, or
@@ -51,19 +50,16 @@ public class SequenceNumbersService extends AbstractIndexShardComponent {
5150
* @param maxSeqNo the last sequence number assigned by this shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}
5251
* @param localCheckpoint the last known local checkpoint for this shard, or {@link SequenceNumbersService#NO_OPS_PERFORMED}
5352
* @param globalCheckpoint the last known global checkpoint for this shard, or {@link SequenceNumbersService#UNASSIGNED_SEQ_NO}
54-
* @param onGlobalCheckpointUpdate invoked when the global checkpoint is updated
5553
*/
5654
public SequenceNumbersService(
5755
final ShardId shardId,
5856
final IndexSettings indexSettings,
5957
final long maxSeqNo,
6058
final long localCheckpoint,
61-
final long globalCheckpoint,
62-
final Runnable onGlobalCheckpointUpdate) {
59+
final long globalCheckpoint) {
6360
super(shardId, indexSettings);
6461
localCheckpointService = new LocalCheckpointService(shardId, indexSettings, maxSeqNo, localCheckpoint);
6562
globalCheckpointService = new GlobalCheckpointService(shardId, indexSettings, globalCheckpoint);
66-
this.onGlobalCheckpointUpdate = onGlobalCheckpointUpdate;
6763
}
6864

6965
/**
@@ -123,12 +119,21 @@ public long getGlobalCheckpoint() {
123119
return globalCheckpointService.getCheckpoint();
124120
}
125121

122+
/**
123+
* Scans through the currently known local checkpoint and updates the global checkpoint accordingly.
124+
*
125+
* @return true if the checkpoint has been updated or if it can not be updated since one of the local checkpoints
126+
* of one of the active allocations is not known.
127+
*/
128+
public boolean updateGlobalCheckpointOnPrimary() {
129+
return globalCheckpointService.updateCheckpointOnPrimary();
130+
}
131+
126132
/**
127133
* updates the global checkpoint on a replica shard (after it has been updated by the primary).
128134
*/
129135
public void updateGlobalCheckpointOnReplica(long checkpoint) {
130136
globalCheckpointService.updateCheckpointOnReplica(checkpoint);
131-
onGlobalCheckpointUpdate.run();
132137
}
133138

134139
/**
@@ -142,17 +147,4 @@ public void updateAllocationIdsFromMaster(Set<String> activeAllocationIds, Set<S
142147
globalCheckpointService.updateAllocationIdsFromMaster(activeAllocationIds, initializingAllocationIds);
143148
}
144149

145-
/**
146-
* Scans through the currently known local checkpoint and updates the global checkpoint accordingly.
147-
*
148-
* @return true if the checkpoint has been updated or if it can not be updated since one of the local checkpoints
149-
* of one of the active allocations is not known.
150-
*/
151-
public boolean updateGlobalCheckpointOnPrimary() {
152-
final boolean maybeUpdateGlobalCheckpoint = globalCheckpointService.updateCheckpointOnPrimary();
153-
if (maybeUpdateGlobalCheckpoint) {
154-
onGlobalCheckpointUpdate.run();
155-
}
156-
return maybeUpdateGlobalCheckpoint;
157-
}
158150
}

core/src/main/java/org/elasticsearch/index/translog/Translog.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -558,7 +558,7 @@ public boolean ensureSynced(Location location) throws IOException {
558558

559559
/**
560560
* Ensures that all locations in the given stream have been synced / written to the underlying storage.
561-
* This method allows for internal optimization to minimize the amout of fsync operations if multiple
561+
* This method allows for internal optimization to minimize the amount of fsync operations if multiple
562562
* locations must be synced.
563563
*
564564
* @return Returns <code>true</code> iff this call caused an actual sync operation otherwise <code>false</code>

core/src/test/java/org/elasticsearch/index/engine/InternalEngineTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -588,8 +588,7 @@ public SequenceNumbersService seqNoService() {
588588
this.config().getIndexSettings(),
589589
maxSeqNo.get(),
590590
localCheckpoint.get(),
591-
globalCheckpoint.get(),
592-
() -> {});
591+
globalCheckpoint.get());
593592
}
594593
};
595594
CommitStats stats1 = engine.commitStats();
@@ -1712,6 +1711,7 @@ public void testSeqNoAndCheckpoints() throws IOException {
17121711
assertThat(
17131712
Long.parseLong(initialEngine.commitStats().getUserData().get(InternalEngine.LOCAL_CHECKPOINT_KEY)),
17141713
equalTo(localCheckpoint));
1714+
initialEngine.getTranslog().sync(); // to guarantee the global checkpoint is written to the translog checkpoint
17151715
assertThat(
17161716
initialEngine.getTranslog().getLastSyncedGlobalCheckpoint(),
17171717
equalTo(globalCheckpoint));
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache license, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the license for the specific language governing permissions and
15+
* limitations under the license.
16+
*/
17+
18+
package org.elasticsearch.index.seqno;
19+
20+
import org.apache.lucene.util.IOUtils;
21+
import org.elasticsearch.action.support.ActionFilters;
22+
import org.elasticsearch.cluster.action.shard.ShardStateAction;
23+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
24+
import org.elasticsearch.cluster.service.ClusterService;
25+
import org.elasticsearch.common.settings.Settings;
26+
import org.elasticsearch.index.Index;
27+
import org.elasticsearch.index.IndexService;
28+
import org.elasticsearch.index.shard.IndexShard;
29+
import org.elasticsearch.index.shard.ShardId;
30+
import org.elasticsearch.index.translog.Translog;
31+
import org.elasticsearch.indices.IndicesService;
32+
import org.elasticsearch.test.ESTestCase;
33+
import org.elasticsearch.test.transport.CapturingTransport;
34+
import org.elasticsearch.threadpool.TestThreadPool;
35+
import org.elasticsearch.threadpool.ThreadPool;
36+
import org.elasticsearch.transport.Transport;
37+
import org.elasticsearch.transport.TransportService;
38+
39+
import java.io.IOException;
40+
import java.io.UncheckedIOException;
41+
import java.util.Collections;
42+
import java.util.HashSet;
43+
44+
import static org.elasticsearch.mock.orig.Mockito.when;
45+
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
46+
import static org.mockito.Mockito.mock;
47+
import static org.mockito.Mockito.verify;
48+
49+
public class GlobalCheckpointSyncActionTests extends ESTestCase {
50+
51+
private ThreadPool threadPool;
52+
private Transport transport;
53+
private ClusterService clusterService;
54+
private TransportService transportService;
55+
private ShardStateAction shardStateAction;
56+
57+
public void setUp() throws Exception {
58+
super.setUp();
59+
threadPool = new TestThreadPool(getClass().getName());
60+
transport = new CapturingTransport();
61+
clusterService = createClusterService(threadPool);
62+
transportService = new TransportService(clusterService.getSettings(), transport, threadPool,
63+
TransportService.NOOP_TRANSPORT_INTERCEPTOR);
64+
transportService.start();
65+
transportService.acceptIncomingRequests();
66+
shardStateAction = new ShardStateAction(Settings.EMPTY, clusterService, transportService, null, null, threadPool);
67+
}
68+
69+
public void tearDown() throws Exception {
70+
try {
71+
IOUtils.close(transportService, clusterService, transport);
72+
} finally {
73+
terminate(threadPool);
74+
}
75+
super.tearDown();
76+
}
77+
78+
public void testTranslogSyncAfterGlobalCheckpointSync() throws IOException {
79+
final IndicesService indicesService = mock(IndicesService.class);
80+
81+
final Index index = new Index("index", "uuid");
82+
final IndexService indexService = mock(IndexService.class);
83+
when(indicesService.indexServiceSafe(index)).thenReturn(indexService);
84+
85+
final int id = randomIntBetween(0, 4);
86+
final IndexShard indexShard = mock(IndexShard.class);
87+
when(indexService.getShard(id)).thenReturn(indexShard);
88+
89+
final Translog translog = mock(Translog.class);
90+
when(indexShard.getTranslog()).thenReturn(translog);
91+
92+
final GlobalCheckpointSyncAction action = new GlobalCheckpointSyncAction(
93+
Settings.EMPTY,
94+
transportService,
95+
clusterService,
96+
indicesService,
97+
threadPool,
98+
shardStateAction,
99+
new ActionFilters(Collections.emptySet()),
100+
new IndexNameExpressionResolver(Settings.EMPTY));
101+
final ShardId shardId = new ShardId(index, id);
102+
final GlobalCheckpointSyncAction.PrimaryRequest primaryRequest = new GlobalCheckpointSyncAction.PrimaryRequest(shardId);
103+
if (randomBoolean()) {
104+
action.shardOperationOnPrimary(primaryRequest);
105+
} else {
106+
action.shardOperationOnReplica(new GlobalCheckpointSyncAction.ReplicaRequest(primaryRequest, randomPositiveLong()));
107+
}
108+
109+
verify(translog).sync();
110+
}
111+
112+
}

0 commit comments

Comments
 (0)