@@ -154,6 +154,7 @@ public class DataStreamLifecycleServiceTests extends ESTestCase {
154154 private List <TransportRequest > clientSeenRequests ;
155155 private DoExecuteDelegate clientDelegate ;
156156 private volatile CountDownLatch clientWaitLatch ;
157+ private volatile CountDownLatch invokerWaitLatch ;
157158 private ClusterService clusterService ;
158159 private final DataStreamGlobalRetentionSettings globalRetentionSettings = DataStreamGlobalRetentionSettings .create (
159160 ClusterSettings .createBuiltInClusterSettings ()
@@ -206,6 +207,7 @@ public void setupServices() {
206207 actions
207208 );
208209 clientWaitLatch = null ;
210+ invokerWaitLatch = null ;
209211 clientDelegate = null ;
210212 }
211213
@@ -288,6 +290,7 @@ public void testDLMRunsOnlyOnce() {
288290 ClusterState state = ClusterState .builder (ClusterName .DEFAULT ).putProjectMetadata (builder ).build ();
289291
290292 clientWaitLatch = new CountDownLatch (1 );
293+ invokerWaitLatch = new CountDownLatch (1 );
291294 AtomicBoolean runCompleted = new AtomicBoolean (false );
292295 // Should block because of the latch
293296 Thread t = new Thread (() -> {
@@ -296,11 +299,23 @@ public void testDLMRunsOnlyOnce() {
296299 });
297300 t .start ();
298301
302+ // So it's possible for the thread to be started above, but for the
303+ // actual `.run` invocation not to have been called by this point.
304+ // What we actually need to do is wait for some moment where we know
305+ // we're in the middle of the DLM service. In order to do that, we wait
306+ // for the "invokerWaitLatch" which is counted down inside of the fake
307+ // client. That way we know that the DLM service is running, but is
308+ // "paused" because of the `clientWaitLatch`.
309+ try {
310+ assertTrue ("expected the client to count the latch down, but it didn't" , invokerWaitLatch .await (10 , TimeUnit .SECONDS ));
311+ } catch (InterruptedException e ) {
312+ fail ("expected the client to have been invoked, but it never was" );
313+ }
299314 // Will return immediately because it's already running
300315 logger .info ("--> second 'run' invocation" );
301316 dataStreamLifecycleService .run (state );
302317
303- // Let the first invocation proceed
318+ // Let the first invocation proceed by decrementing clientWatchLatch.
304319 logger .info ("--> decrementing latch" );
305320 clientWaitLatch .countDown ();
306321 try {
@@ -309,6 +324,8 @@ public void testDLMRunsOnlyOnce() {
309324 } catch (InterruptedException e ) {
310325 throw new ElasticsearchException (e );
311326 }
327+ // Always check that we finished the initial `.run` call that we did
328+ // inside the thread.
312329 assertTrue (runCompleted .get ());
313330 }
314331
@@ -1874,6 +1891,9 @@ protected <Request extends ActionRequest, Response extends ActionResponse> void
18741891 if (clientDelegate != null ) {
18751892 clientDelegate .doExecute (action , request , listener );
18761893 }
1894+ if (invokerWaitLatch != null ) {
1895+ invokerWaitLatch .countDown ();
1896+ }
18771897 if (clientWaitLatch != null && clientWaitLatch .getCount () > 0 ) {
18781898 try {
18791899 logger .info ("--> blocking client invocation" );
0 commit comments