Skip to content

Commit a8b3dd2

Browse files
committed
use globalcheckpoint as arg
1 parent 722c9d2 commit a8b3dd2

1 file changed

Lines changed: 28 additions & 31 deletions

File tree

core/src/test/java/org/elasticsearch/index/engine/KeepUntilGlobalCheckpointDeletionPolicyTests.java

Lines changed: 28 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -30,34 +30,28 @@
3030
import org.elasticsearch.index.seqno.SequenceNumbersService;
3131
import org.elasticsearch.index.store.Store;
3232
import org.elasticsearch.index.translog.Translog;
33-
import org.junit.Before;
3433

3534
import java.io.IOException;
3635
import java.nio.file.Path;
3736
import java.util.ArrayList;
3837
import java.util.List;
39-
import java.util.concurrent.atomic.AtomicInteger;
4038
import java.util.concurrent.atomic.AtomicLong;
39+
import java.util.function.LongSupplier;
4140

4241
import static org.hamcrest.Matchers.equalTo;
4342
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
4443
import static org.hamcrest.Matchers.hasSize;
4544

4645
public class KeepUntilGlobalCheckpointDeletionPolicyTests extends EngineTestCase {
47-
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
48-
49-
@Before
50-
public void resetCounters() throws Exception {
51-
globalCheckpoint.set(SequenceNumbers.UNASSIGNED_SEQ_NO);
52-
}
5346

5447
public void testUnassignedGlobalCheckpoint() throws IOException {
55-
Path indexPath = createTempDir();
56-
globalCheckpoint.set(SequenceNumbers.UNASSIGNED_SEQ_NO);
48+
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
49+
final Path indexPath = createTempDir();
50+
5751
try (Store store = createStore()) {
5852
int initDocs = scaledRandomIntBetween(10, 1000);
5953
int initCommits = 1;
60-
try (InternalEngine engine = newEngine(store, indexPath)) {
54+
try (InternalEngine engine = newEngine(store, indexPath, globalCheckpoint::get)) {
6155
for (int i = 0; i < initDocs; i++) {
6256
addDoc(engine, Integer.toString(i));
6357
if (frequently()) {
@@ -71,7 +65,7 @@ public void testUnassignedGlobalCheckpoint() throws IOException {
7165
engine.flush(true, true);
7266
}
7367
assertThat(DirectoryReader.listCommits(store.directory()), hasSize(initCommits + 1));
74-
try (InternalEngine engine = newEngine(store, indexPath)) {
68+
try (InternalEngine engine = newEngine(store, indexPath, globalCheckpoint::get)) {
7569
engine.refresh("test");
7670
assertThat("Unassigned global checkpoint reserves all commits", DirectoryReader.listCommits(store.directory()),
7771
hasSize(initCommits + 1));
@@ -98,10 +92,12 @@ public void testUnassignedGlobalCheckpoint() throws IOException {
9892
}
9993

10094
public void testKeepUpGlobalCheckpoint() throws Exception {
101-
Path indexPath = createTempDir();
95+
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
96+
final Path indexPath = createTempDir();
97+
10298
try (Store store = createStore()) {
10399
int initDocs = scaledRandomIntBetween(10, 1000);
104-
try (InternalEngine engine = newEngine(store, indexPath)) {
100+
try (InternalEngine engine = newEngine(store, indexPath, globalCheckpoint::get)) {
105101
for (int i = 0; i < initDocs; i++) {
106102
addDoc(engine, Integer.toString(i));
107103
globalCheckpoint.set(engine.seqNoService().getLocalCheckpoint());
@@ -112,7 +108,7 @@ public void testKeepUpGlobalCheckpoint() throws Exception {
112108
engine.flush(true, true);
113109
}
114110
assertThat(DirectoryReader.listCommits(store.directory()), hasSize(1));
115-
try (InternalEngine engine = newEngine(store, indexPath)) {
111+
try (InternalEngine engine = newEngine(store, indexPath, globalCheckpoint::get)) {
116112
assertThat("OnInit deletes unreferenced commits", DirectoryReader.listCommits(store.directory()), hasSize(1));
117113
int moreDocs = scaledRandomIntBetween(1, 100);
118114
for (int i = 0; i < moreDocs; i++) {
@@ -128,10 +124,12 @@ public void testKeepUpGlobalCheckpoint() throws Exception {
128124
}
129125

130126
public void testLaggingGlobalCheckpoint() throws Exception {
131-
Path indexPath = createTempDir();
127+
final AtomicLong globalCheckpoint = new AtomicLong(SequenceNumbers.UNASSIGNED_SEQ_NO);
128+
final Path indexPath = createTempDir();
129+
132130
try (Store store = createStore()) {
133131
int initDocs = scaledRandomIntBetween(100, 1000);
134-
try (InternalEngine engine = newEngine(store, indexPath)) {
132+
try (InternalEngine engine = newEngine(store, indexPath, globalCheckpoint::get)) {
135133
for (int i = 0; i < initDocs; i++) {
136134
addDoc(engine, Integer.toString(i));
137135
if (frequently()) {
@@ -144,15 +142,16 @@ public void testLaggingGlobalCheckpoint() throws Exception {
144142
engine.rollTranslogGeneration();
145143
}
146144
try (Translog.Snapshot snapshot = engine.getTranslog().newSnapshot()) {
147-
assertThat((long) snapshot.totalOperations(), greaterThanOrEqualTo(requiredOperations(i + 1)));
145+
assertThat("Should keep translog operations up to the global checkpoint",
146+
(long) snapshot.totalOperations(), greaterThanOrEqualTo(i + 1 - Math.max(0, globalCheckpoint.get())));
148147
}
149148
}
150149
engine.flush(true, true);
151150
}
152-
assertThat("Reserved commits should be 1", reservedCommits(), hasSize(1));
151+
assertThat("Reserved commits should be 1", reservedCommits(globalCheckpoint.get()), hasSize(1));
153152

154-
try (InternalEngine engine = newEngine(store, indexPath)) {
155-
assertThat("Reserved commits should always be 1", reservedCommits(), hasSize(1));
153+
try (InternalEngine engine = newEngine(store, indexPath, globalCheckpoint::get)) {
154+
assertThat("Reserved commits should always be 1", reservedCommits(globalCheckpoint.get()), hasSize(1));
156155
int moreDocs = scaledRandomIntBetween(1, 100);
157156
for (int i = 0; i < moreDocs; i++) {
158157
addDoc(engine, Integer.toString(initDocs + i));
@@ -161,28 +160,26 @@ public void testLaggingGlobalCheckpoint() throws Exception {
161160
}
162161
if (frequently()) {
163162
engine.flush(true, true);
164-
assertThat("Reserved commits should be 1", reservedCommits(), hasSize(1));
163+
assertThat("Reserved commits should be 1", reservedCommits(globalCheckpoint.get()), hasSize(1));
165164
}
166165
if (rarely()) {
167166
engine.rollTranslogGeneration();
168167
}
169168
try (Translog.Snapshot snapshot = engine.getTranslog().newSnapshot()) {
170-
assertThat((long) snapshot.totalOperations(), greaterThanOrEqualTo(requiredOperations(initDocs + i + 1)));
169+
long requiredOps = initDocs + i + 1 - Math.max(0, globalCheckpoint.get());
170+
assertThat("Should keep translog operations up to the global checkpoint",
171+
(long) snapshot.totalOperations(), greaterThanOrEqualTo(requiredOps));
171172
}
172173
}
173174
}
174175
}
175176
}
176177

177-
long requiredOperations(int processedOps) {
178-
return processedOps - Math.max(0, globalCheckpoint.get());
179-
}
180-
181-
List<IndexCommit> reservedCommits() throws IOException {
178+
List<IndexCommit> reservedCommits(long currentGlobalCheckpoint) throws IOException {
182179
List<IndexCommit> reservedCommits = new ArrayList<>();
183180
List<IndexCommit> existingCommits = DirectoryReader.listCommits(store.directory());
184181
for (IndexCommit commit : existingCommits) {
185-
if (Long.parseLong(commit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) <= globalCheckpoint.get()) {
182+
if (Long.parseLong(commit.getUserData().get(SequenceNumbers.LOCAL_CHECKPOINT_KEY)) <= currentGlobalCheckpoint) {
186183
reservedCommits.add(commit);
187184
}
188185
}
@@ -196,7 +193,7 @@ void addDoc(Engine engine, String id) throws IOException {
196193
engine.index(indexForDoc(doc));
197194
}
198195

199-
InternalEngine newEngine(Store store, Path indexPath) throws IOException {
196+
InternalEngine newEngine(Store store, Path indexPath, LongSupplier globalCheckpointSupplier) throws IOException {
200197
return createEngine(defaultSettings, store, indexPath, newMergePolicy(), null,
201198
(config, seqNoStats) -> new SequenceNumbersService(
202199
config.getShardId(),
@@ -207,7 +204,7 @@ InternalEngine newEngine(Store store, Path indexPath) throws IOException {
207204
seqNoStats.getGlobalCheckpoint()) {
208205
@Override
209206
public long getGlobalCheckpoint() {
210-
return globalCheckpoint.get();
207+
return globalCheckpointSupplier.getAsLong();
211208
}
212209
}
213210
);

0 commit comments

Comments
 (0)