2727
2828import com .google .common .util .concurrent .testing .TestingExecutors ;
2929import io .grpc .SynchronizationContext .ScheduledHandle ;
30+ import java .time .Duration ;
3031import java .util .concurrent .BlockingQueue ;
3132import java .util .concurrent .CountDownLatch ;
3233import java .util .concurrent .LinkedBlockingQueue ;
5253 */
5354@ RunWith (JUnit4 .class )
5455public class SynchronizationContextTest {
56+
5557 private final BlockingQueue <Throwable > uncaughtErrors = new LinkedBlockingQueue <>();
5658 private final SynchronizationContext syncContext = new SynchronizationContext (
5759 new Thread .UncaughtExceptionHandler () {
@@ -72,8 +74,9 @@ public void uncaughtException(Thread t, Throwable e) {
7274
7375 @ Mock
7476 private Runnable task3 ;
75-
76- @ After public void tearDown () {
77+
78+ @ After
79+ public void tearDown () {
7780 assertThat (uncaughtErrors ).isEmpty ();
7881 }
7982
@@ -105,36 +108,36 @@ public void multiThread() throws Exception {
105108 final AtomicReference <Thread > task2Thread = new AtomicReference <>();
106109
107110 doAnswer (new Answer <Void >() {
108- @ Override
109- public Void answer (InvocationOnMock invocation ) {
110- task1Thread .set (Thread .currentThread ());
111- task1Running .countDown ();
112- try {
113- assertTrue (task1Proceed .await (5 , TimeUnit .SECONDS ));
114- } catch (InterruptedException e ) {
115- throw new RuntimeException (e );
116- }
117- return null ;
111+ @ Override
112+ public Void answer (InvocationOnMock invocation ) {
113+ task1Thread .set (Thread .currentThread ());
114+ task1Running .countDown ();
115+ try {
116+ assertTrue (task1Proceed .await (5 , TimeUnit .SECONDS ));
117+ } catch (InterruptedException e ) {
118+ throw new RuntimeException (e );
118119 }
119- }).when (task1 ).run ();
120+ return null ;
121+ }
122+ }).when (task1 ).run ();
120123
121124 doAnswer (new Answer <Void >() {
122- @ Override
123- public Void answer (InvocationOnMock invocation ) {
124- task2Thread .set (Thread .currentThread ());
125- return null ;
126- }
127- }).when (task2 ).run ();
125+ @ Override
126+ public Void answer (InvocationOnMock invocation ) {
127+ task2Thread .set (Thread .currentThread ());
128+ return null ;
129+ }
130+ }).when (task2 ).run ();
128131
129132 Thread sideThread = new Thread () {
130- @ Override
131- public void run () {
132- syncContext .executeLater (task1 );
133- task1Added .countDown ();
134- syncContext .drain ();
135- sideThreadDone .countDown ();
136- }
137- };
133+ @ Override
134+ public void run () {
135+ syncContext .executeLater (task1 );
136+ task1Added .countDown ();
137+ syncContext .drain ();
138+ sideThreadDone .countDown ();
139+ }
140+ };
138141 sideThread .start ();
139142
140143 assertTrue (task1Added .await (5 , TimeUnit .SECONDS ));
@@ -162,26 +165,26 @@ public void throwIfNotInThisSynchronizationContext() throws Exception {
162165 final CountDownLatch task1Proceed = new CountDownLatch (1 );
163166
164167 doAnswer (new Answer <Void >() {
165- @ Override
166- public Void answer (InvocationOnMock invocation ) {
167- task1Running .countDown ();
168- syncContext .throwIfNotInThisSynchronizationContext ();
169- try {
170- assertTrue (task1Proceed .await (5 , TimeUnit .SECONDS ));
171- } catch (InterruptedException e ) {
172- throw new RuntimeException (e );
173- }
174- taskSuccess .set (true );
175- return null ;
168+ @ Override
169+ public Void answer (InvocationOnMock invocation ) {
170+ task1Running .countDown ();
171+ syncContext .throwIfNotInThisSynchronizationContext ();
172+ try {
173+ assertTrue (task1Proceed .await (5 , TimeUnit .SECONDS ));
174+ } catch (InterruptedException e ) {
175+ throw new RuntimeException (e );
176176 }
177- }).when (task1 ).run ();
177+ taskSuccess .set (true );
178+ return null ;
179+ }
180+ }).when (task1 ).run ();
178181
179182 Thread sideThread = new Thread () {
180- @ Override
181- public void run () {
182- syncContext .execute (task1 );
183- }
184- };
183+ @ Override
184+ public void run () {
185+ syncContext .execute (task1 );
186+ }
187+ };
185188 sideThread .start ();
186189
187190 assertThat (task1Running .await (5 , TimeUnit .SECONDS )).isTrue ();
@@ -215,11 +218,11 @@ public void taskThrows() {
215218 InOrder inOrder = inOrder (task1 , task2 , task3 );
216219 final RuntimeException e = new RuntimeException ("Simulated" );
217220 doAnswer (new Answer <Void >() {
218- @ Override
219- public Void answer (InvocationOnMock invocation ) {
220- throw e ;
221- }
222- }).when (task2 ).run ();
221+ @ Override
222+ public Void answer (InvocationOnMock invocation ) {
223+ throw e ;
224+ }
225+ }).when (task2 ).run ();
223226 syncContext .executeLater (task1 );
224227 syncContext .executeLater (task2 );
225228 syncContext .executeLater (task3 );
@@ -246,6 +249,24 @@ public void schedule() {
246249 verify (task1 ).run ();
247250 }
248251
252+ @ Test
253+ public void testScheduleWithFixedDelay () {
254+ MockScheduledExecutorService executorService = new MockScheduledExecutorService ();
255+
256+ ScheduledHandle handle =
257+ syncContext .scheduleWithFixedDelay (task1 , Duration .ofNanos (110 ), Duration .ofNanos (110 ),
258+ TimeUnit .NANOSECONDS , executorService );
259+
260+ assertThat (executorService .delay )
261+ .isEqualTo (executorService .unit .convert (110 , TimeUnit .NANOSECONDS ));
262+ assertThat (handle .isPending ()).isTrue ();
263+ verify (task1 , never ()).run ();
264+
265+ executorService .command .run ();
266+ assertThat (handle .isPending ()).isFalse ();
267+ verify (task1 ).run ();
268+ }
269+
249270 @ Test
250271 public void scheduleDueImmediately () {
251272 MockScheduledExecutorService executorService = new MockScheduledExecutorService ();
@@ -288,28 +309,28 @@ public void scheduledHandle_cancelRacesWithTimerExpiration() throws Exception {
288309 final CountDownLatch sideThreadDone = new CountDownLatch (1 );
289310
290311 doAnswer (new Answer <Void >() {
291- @ Override
292- public Void answer (InvocationOnMock invocation ) {
293- task1Running .countDown ();
294- try {
295- ScheduledHandle task2Handle ;
296- assertThat (task2Handle = task2HandleQueue .poll (5 , TimeUnit .SECONDS )).isNotNull ();
297- task2Handle .cancel ();
298- } catch (InterruptedException e ) {
299- throw new RuntimeException (e );
300- }
301- task1Done .set (true );
302- return null ;
312+ @ Override
313+ public Void answer (InvocationOnMock invocation ) {
314+ task1Running .countDown ();
315+ try {
316+ ScheduledHandle task2Handle ;
317+ assertThat (task2Handle = task2HandleQueue .poll (5 , TimeUnit .SECONDS )).isNotNull ();
318+ task2Handle .cancel ();
319+ } catch (InterruptedException e ) {
320+ throw new RuntimeException (e );
303321 }
304- }).when (task1 ).run ();
322+ task1Done .set (true );
323+ return null ;
324+ }
325+ }).when (task1 ).run ();
305326
306327 Thread sideThread = new Thread () {
307- @ Override
308- public void run () {
309- syncContext .execute (task1 );
310- sideThreadDone .countDown ();
311- }
312- };
328+ @ Override
329+ public void run () {
330+ syncContext .execute (task1 );
331+ sideThreadDone .countDown ();
332+ }
333+ };
313334
314335 ScheduledHandle handle = syncContext .schedule (task2 , 10 , TimeUnit .NANOSECONDS , executorService );
315336 // This will execute and block in task1
@@ -340,22 +361,33 @@ public void run() {
340361 }
341362
342363 static class MockScheduledExecutorService extends ForwardingScheduledExecutorService {
364+
343365 private ScheduledExecutorService delegate = TestingExecutors .noOpScheduledExecutor ();
344366
345367 Runnable command ;
346368 long delay ;
347369 TimeUnit unit ;
348370 ScheduledFuture <?> future ;
349371
350- @ Override public ScheduledExecutorService delegate () {
372+ @ Override
373+ public ScheduledExecutorService delegate () {
351374 return delegate ;
352375 }
353376
354- @ Override public ScheduledFuture <?> schedule (Runnable command , long delay , TimeUnit unit ) {
377+ @ Override
378+ public ScheduledFuture <?> schedule (Runnable command , long delay , TimeUnit unit ) {
355379 this .command = command ;
356380 this .delay = delay ;
357381 this .unit = unit ;
358382 return future = super .schedule (command , delay , unit );
359383 }
384+ @ Override
385+ public ScheduledFuture <?> scheduleWithFixedDelay (Runnable command , long intialDelay , long delay ,
386+ TimeUnit unit ) {
387+ this .command = command ;
388+ this .delay = delay ;
389+ this .unit = unit ;
390+ return future = super .scheduleWithFixedDelay (command , intialDelay , delay , unit );
391+ }
360392 }
361393}
0 commit comments