3030import org .elasticsearch .index .seqno .SequenceNumbersService ;
3131import org .elasticsearch .index .store .Store ;
3232import org .elasticsearch .index .translog .Translog ;
33- import org .junit .Before ;
3433
3534import java .io .IOException ;
3635import java .nio .file .Path ;
3736import java .util .ArrayList ;
3837import java .util .List ;
39- import java .util .concurrent .atomic .AtomicInteger ;
4038import java .util .concurrent .atomic .AtomicLong ;
39+ import java .util .function .LongSupplier ;
4140
4241import static org .hamcrest .Matchers .equalTo ;
4342import static org .hamcrest .Matchers .greaterThanOrEqualTo ;
4443import static org .hamcrest .Matchers .hasSize ;
4544
4645public class KeepUntilGlobalCheckpointDeletionPolicyTests extends EngineTestCase {
47- final AtomicLong globalCheckpoint = new AtomicLong (SequenceNumbers .UNASSIGNED_SEQ_NO );
48-
49- @ Before
50- public void resetCounters () throws Exception {
51- globalCheckpoint .set (SequenceNumbers .UNASSIGNED_SEQ_NO );
52- }
5346
5447 public void testUnassignedGlobalCheckpoint () throws IOException {
55- Path indexPath = createTempDir ();
56- globalCheckpoint .set (SequenceNumbers .UNASSIGNED_SEQ_NO );
48+ final AtomicLong globalCheckpoint = new AtomicLong (SequenceNumbers .UNASSIGNED_SEQ_NO );
49+ final Path indexPath = createTempDir ();
50+
5751 try (Store store = createStore ()) {
5852 int initDocs = scaledRandomIntBetween (10 , 1000 );
5953 int initCommits = 1 ;
60- try (InternalEngine engine = newEngine (store , indexPath )) {
54+ try (InternalEngine engine = newEngine (store , indexPath , globalCheckpoint :: get )) {
6155 for (int i = 0 ; i < initDocs ; i ++) {
6256 addDoc (engine , Integer .toString (i ));
6357 if (frequently ()) {
@@ -71,7 +65,7 @@ public void testUnassignedGlobalCheckpoint() throws IOException {
7165 engine .flush (true , true );
7266 }
7367 assertThat (DirectoryReader .listCommits (store .directory ()), hasSize (initCommits + 1 ));
74- try (InternalEngine engine = newEngine (store , indexPath )) {
68+ try (InternalEngine engine = newEngine (store , indexPath , globalCheckpoint :: get )) {
7569 engine .refresh ("test" );
7670 assertThat ("Unassigned global checkpoint reserves all commits" , DirectoryReader .listCommits (store .directory ()),
7771 hasSize (initCommits + 1 ));
@@ -98,10 +92,12 @@ public void testUnassignedGlobalCheckpoint() throws IOException {
9892 }
9993
10094 public void testKeepUpGlobalCheckpoint () throws Exception {
101- Path indexPath = createTempDir ();
95+ final AtomicLong globalCheckpoint = new AtomicLong (SequenceNumbers .UNASSIGNED_SEQ_NO );
96+ final Path indexPath = createTempDir ();
97+
10298 try (Store store = createStore ()) {
10399 int initDocs = scaledRandomIntBetween (10 , 1000 );
104- try (InternalEngine engine = newEngine (store , indexPath )) {
100+ try (InternalEngine engine = newEngine (store , indexPath , globalCheckpoint :: get )) {
105101 for (int i = 0 ; i < initDocs ; i ++) {
106102 addDoc (engine , Integer .toString (i ));
107103 globalCheckpoint .set (engine .seqNoService ().getLocalCheckpoint ());
@@ -112,7 +108,7 @@ public void testKeepUpGlobalCheckpoint() throws Exception {
112108 engine .flush (true , true );
113109 }
114110 assertThat (DirectoryReader .listCommits (store .directory ()), hasSize (1 ));
115- try (InternalEngine engine = newEngine (store , indexPath )) {
111+ try (InternalEngine engine = newEngine (store , indexPath , globalCheckpoint :: get )) {
116112 assertThat ("OnInit deletes unreferenced commits" , DirectoryReader .listCommits (store .directory ()), hasSize (1 ));
117113 int moreDocs = scaledRandomIntBetween (1 , 100 );
118114 for (int i = 0 ; i < moreDocs ; i ++) {
@@ -128,10 +124,12 @@ public void testKeepUpGlobalCheckpoint() throws Exception {
128124 }
129125
130126 public void testLaggingGlobalCheckpoint () throws Exception {
131- Path indexPath = createTempDir ();
127+ final AtomicLong globalCheckpoint = new AtomicLong (SequenceNumbers .UNASSIGNED_SEQ_NO );
128+ final Path indexPath = createTempDir ();
129+
132130 try (Store store = createStore ()) {
133131 int initDocs = scaledRandomIntBetween (100 , 1000 );
134- try (InternalEngine engine = newEngine (store , indexPath )) {
132+ try (InternalEngine engine = newEngine (store , indexPath , globalCheckpoint :: get )) {
135133 for (int i = 0 ; i < initDocs ; i ++) {
136134 addDoc (engine , Integer .toString (i ));
137135 if (frequently ()) {
@@ -144,15 +142,16 @@ public void testLaggingGlobalCheckpoint() throws Exception {
144142 engine .rollTranslogGeneration ();
145143 }
146144 try (Translog .Snapshot snapshot = engine .getTranslog ().newSnapshot ()) {
147- assertThat ((long ) snapshot .totalOperations (), greaterThanOrEqualTo (requiredOperations (i + 1 )));
145+ assertThat ("Should keep translog operations up to the global checkpoint" ,
146+ (long ) snapshot .totalOperations (), greaterThanOrEqualTo (i + 1 - Math .max (0 , globalCheckpoint .get ())));
148147 }
149148 }
150149 engine .flush (true , true );
151150 }
152- assertThat ("Reserved commits should be 1" , reservedCommits (), hasSize (1 ));
151+ assertThat ("Reserved commits should be 1" , reservedCommits (globalCheckpoint . get () ), hasSize (1 ));
153152
154- try (InternalEngine engine = newEngine (store , indexPath )) {
155- assertThat ("Reserved commits should always be 1" , reservedCommits (), hasSize (1 ));
153+ try (InternalEngine engine = newEngine (store , indexPath , globalCheckpoint :: get )) {
154+ assertThat ("Reserved commits should always be 1" , reservedCommits (globalCheckpoint . get () ), hasSize (1 ));
156155 int moreDocs = scaledRandomIntBetween (1 , 100 );
157156 for (int i = 0 ; i < moreDocs ; i ++) {
158157 addDoc (engine , Integer .toString (initDocs + i ));
@@ -161,28 +160,26 @@ public void testLaggingGlobalCheckpoint() throws Exception {
161160 }
162161 if (frequently ()) {
163162 engine .flush (true , true );
164- assertThat ("Reserved commits should be 1" , reservedCommits (), hasSize (1 ));
163+ assertThat ("Reserved commits should be 1" , reservedCommits (globalCheckpoint . get () ), hasSize (1 ));
165164 }
166165 if (rarely ()) {
167166 engine .rollTranslogGeneration ();
168167 }
169168 try (Translog .Snapshot snapshot = engine .getTranslog ().newSnapshot ()) {
170- assertThat ((long ) snapshot .totalOperations (), greaterThanOrEqualTo (requiredOperations (initDocs + i + 1 )));
169+ long requiredOps = initDocs + i + 1 - Math .max (0 , globalCheckpoint .get ());
170+ assertThat ("Should keep translog operations up to the global checkpoint" ,
171+ (long ) snapshot .totalOperations (), greaterThanOrEqualTo (requiredOps ));
171172 }
172173 }
173174 }
174175 }
175176 }
176177
177- long requiredOperations (int processedOps ) {
178- return processedOps - Math .max (0 , globalCheckpoint .get ());
179- }
180-
181- List <IndexCommit > reservedCommits () throws IOException {
178+ List <IndexCommit > reservedCommits (long currentGlobalCheckpoint ) throws IOException {
182179 List <IndexCommit > reservedCommits = new ArrayList <>();
183180 List <IndexCommit > existingCommits = DirectoryReader .listCommits (store .directory ());
184181 for (IndexCommit commit : existingCommits ) {
185- if (Long .parseLong (commit .getUserData ().get (SequenceNumbers .LOCAL_CHECKPOINT_KEY )) <= globalCheckpoint . get () ) {
182+ if (Long .parseLong (commit .getUserData ().get (SequenceNumbers .LOCAL_CHECKPOINT_KEY )) <= currentGlobalCheckpoint ) {
186183 reservedCommits .add (commit );
187184 }
188185 }
@@ -196,7 +193,7 @@ void addDoc(Engine engine, String id) throws IOException {
196193 engine .index (indexForDoc (doc ));
197194 }
198195
199- InternalEngine newEngine (Store store , Path indexPath ) throws IOException {
196+ InternalEngine newEngine (Store store , Path indexPath , LongSupplier globalCheckpointSupplier ) throws IOException {
200197 return createEngine (defaultSettings , store , indexPath , newMergePolicy (), null ,
201198 (config , seqNoStats ) -> new SequenceNumbersService (
202199 config .getShardId (),
@@ -207,7 +204,7 @@ InternalEngine newEngine(Store store, Path indexPath) throws IOException {
207204 seqNoStats .getGlobalCheckpoint ()) {
208205 @ Override
209206 public long getGlobalCheckpoint () {
210- return globalCheckpoint . get ();
207+ return globalCheckpointSupplier . getAsLong ();
211208 }
212209 }
213210 );
0 commit comments