Skip to content

Commit 4768c43

Browse files
author
Dariusz Jędrzejczyk
authored
Add disposeGracefully method to Scheduler (#3089)
`Scheduler`s now can be disposed lazily using a new method, `disposeGracefully()`. It returns a `Mono<Void>`, which allows awaiting for the signal that all underlying resources have been properly shut down. In contrast to the existing `dispose()`, which calls `shutdownNow()` on underlying `ExecutorService`s, the lazy variant calls `shutdown()` and sends the termination signal once a monitoring `Thread` successfully observes a positive result from `awaitTermination()`. Fixes ##3068.
1 parent 18ce835 commit 4768c43

17 files changed

Lines changed: 1359 additions & 345 deletions

reactor-core/build.gradle

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,9 @@ task japicmp(type: JapicmpTask) {
162162
// TODO after a .0 release, bump the gradle.properties baseline
163163
// TODO after a .0 release, remove the reactor-core exclusions below if any
164164
classExcludes = [ ]
165-
methodExcludes = [ ]
165+
methodExcludes = [
166+
"reactor.core.scheduler.Scheduler#disposeGracefully()"
167+
]
166168
}
167169

168170
gradle.taskGraph.afterTask { task, state ->
Lines changed: 358 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,358 @@
1+
/*
2+
* Copyright (c) 2022 VMware Inc. or its affiliates, All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package reactor.core.scheduler;
18+
19+
import java.time.Duration;
20+
import java.util.concurrent.CountDownLatch;
21+
import java.util.concurrent.TimeUnit;
22+
23+
import org.openjdk.jcstress.annotations.Actor;
24+
import org.openjdk.jcstress.annotations.Arbiter;
25+
import org.openjdk.jcstress.annotations.Expect;
26+
import org.openjdk.jcstress.annotations.JCStressTest;
27+
import org.openjdk.jcstress.annotations.Outcome;
28+
import org.openjdk.jcstress.annotations.State;
29+
import org.openjdk.jcstress.infra.results.IIZ_Result;
30+
import org.openjdk.jcstress.infra.results.Z_Result;
31+
32+
public abstract class SchedulersStressTest {
33+
34+
private static void restart(Scheduler scheduler) {
35+
scheduler.disposeGracefully().block(Duration.ofMillis(100));
36+
scheduler.start();
37+
}
38+
39+
private static boolean canScheduleTask(Scheduler scheduler) {
40+
final CountDownLatch latch = new CountDownLatch(1);
41+
if (scheduler.isDisposed()) {
42+
return false;
43+
}
44+
scheduler.schedule(latch::countDown);
45+
boolean taskDone = false;
46+
try {
47+
taskDone = latch.await(100, TimeUnit.MILLISECONDS);
48+
} catch (InterruptedException ignored) {
49+
}
50+
return taskDone;
51+
}
52+
53+
@JCStressTest
54+
@Outcome(id = {"true"}, expect = Expect.ACCEPTABLE, desc = "Task scheduled after racing restart")
55+
@State
56+
public static class SingleSchedulerStartDisposeStressTest {
57+
58+
private final SingleScheduler scheduler = new SingleScheduler(Thread::new);
59+
60+
{
61+
scheduler.start();
62+
}
63+
64+
@Actor
65+
public void restart1() {
66+
restart(scheduler);
67+
}
68+
69+
@Actor
70+
public void restart2() {
71+
restart(scheduler);
72+
}
73+
74+
@Arbiter
75+
public void arbiter(Z_Result r) {
76+
// At this stage, at least one actor called scheduler.start(),
77+
// so we should be able to execute a task.
78+
r.r1 = canScheduleTask(scheduler);
79+
scheduler.dispose();
80+
}
81+
}
82+
83+
@JCStressTest
84+
@Outcome(id = {"true"}, expect = Expect.ACCEPTABLE, desc = "Task scheduled after racing restart")
85+
@State
86+
public static class ParallelSchedulerStartDisposeStressTest {
87+
88+
private final ParallelScheduler scheduler =
89+
new ParallelScheduler(4, Thread::new);
90+
91+
{
92+
scheduler.start();
93+
}
94+
95+
@Actor
96+
public void restart1() {
97+
restart(scheduler);
98+
}
99+
100+
@Actor
101+
public void restart2() {
102+
restart(scheduler);
103+
}
104+
105+
@Arbiter
106+
public void arbiter(Z_Result r) {
107+
// At this stage, at least one actor called scheduler.start(),
108+
// so we should be able to execute a task.
109+
r.r1 = canScheduleTask(scheduler);
110+
scheduler.dispose();
111+
}
112+
}
113+
114+
@JCStressTest
115+
@Outcome(id = {"true"}, expect = Expect.ACCEPTABLE, desc = "Task scheduled after racing restart")
116+
@State
117+
public static class BoundedElasticSchedulerStartDisposeStressTest {
118+
119+
private final BoundedElasticScheduler scheduler =
120+
new BoundedElasticScheduler(1, 1, Thread::new, 5);
121+
{
122+
scheduler.start();
123+
}
124+
125+
@Actor
126+
public void restart1() {
127+
restart(scheduler);
128+
}
129+
130+
@Actor
131+
public void restart2() {
132+
restart(scheduler);
133+
}
134+
135+
@Arbiter
136+
public void arbiter(Z_Result r) {
137+
// At this stage, at least one actor called scheduler.start(),
138+
// so we should be able to execute a task.
139+
r.r1 = canScheduleTask(scheduler);
140+
scheduler.dispose();
141+
}
142+
}
143+
144+
@JCStressTest
145+
@Outcome(id = {".*, true"}, expect = Expect.ACCEPTABLE,
146+
desc = "Scheduler in consistent state upon concurrent dispose and " +
147+
"eventually disposed.")
148+
@State
149+
public static class SingleSchedulerDisposeGracefullyStressTest {
150+
151+
private final SingleScheduler scheduler = new SingleScheduler(Thread::new);
152+
153+
{
154+
scheduler.start();
155+
}
156+
157+
@Actor
158+
public void disposeGracefully1(IIZ_Result r) {
159+
scheduler.disposeGracefully().subscribe();
160+
r.r1 = scheduler.state.initialResource.hashCode();
161+
}
162+
163+
@Actor
164+
public void disposeGracefully2(IIZ_Result r) {
165+
scheduler.disposeGracefully().subscribe();
166+
r.r2 = scheduler.state.initialResource.hashCode();
167+
}
168+
169+
@Arbiter
170+
public void arbiter(IIZ_Result r) {
171+
// Validate both disposals left the Scheduler in consistent state,
172+
// assuming the await process coordinates on the resources as identified
173+
// by r.r1 and r.r2, which should be equal.
174+
boolean consistentState = r.r1 == r.r2;
175+
r.r3 = consistentState && scheduler.isDisposed();
176+
}
177+
}
178+
179+
@JCStressTest
180+
@Outcome(id = {".*, true"}, expect = Expect.ACCEPTABLE,
181+
desc = "Scheduler in consistent state upon concurrent dispose and " +
182+
"eventually disposed.")
183+
@State
184+
public static class ParallelSchedulerDisposeGracefullyStressTest {
185+
186+
private final ParallelScheduler scheduler =
187+
new ParallelScheduler(10, Thread::new);
188+
189+
{
190+
scheduler.start();
191+
}
192+
193+
@Actor
194+
public void disposeGracefully1(IIZ_Result r) {
195+
scheduler.disposeGracefully().subscribe();
196+
r.r1 = scheduler.state.initialResource.hashCode();
197+
}
198+
199+
@Actor
200+
public void disposeGracefully2(IIZ_Result r) {
201+
scheduler.disposeGracefully().subscribe();
202+
r.r2 = scheduler.state.initialResource.hashCode();
203+
}
204+
205+
@Arbiter
206+
public void arbiter(IIZ_Result r) {
207+
// Validate both disposals left the Scheduler in consistent state,
208+
// assuming the await process coordinates on the resources as identified
209+
// by r.r1 and r.r2, which should be equal.
210+
boolean consistentState = r.r1 == r.r2;
211+
r.r3 = consistentState && scheduler.isDisposed();
212+
}
213+
}
214+
215+
@JCStressTest
216+
@Outcome(id = {".*, true"}, expect = Expect.ACCEPTABLE,
217+
desc = "Scheduler in consistent state upon concurrent dispose and " +
218+
"eventually disposed.")
219+
@State
220+
public static class BoundedElasticSchedulerDisposeGracefullyStressTest {
221+
222+
private final BoundedElasticScheduler scheduler =
223+
new BoundedElasticScheduler(4, 4, Thread::new, 5);
224+
225+
{
226+
scheduler.start();
227+
}
228+
229+
@Actor
230+
public void disposeGracefully1(IIZ_Result r) {
231+
scheduler.disposeGracefully().subscribe();
232+
r.r1 = scheduler.state.initialResource.hashCode();
233+
}
234+
235+
@Actor
236+
public void disposeGracefully2(IIZ_Result r) {
237+
scheduler.disposeGracefully().subscribe();
238+
r.r2 = scheduler.state.initialResource.hashCode();
239+
}
240+
241+
@Arbiter
242+
public void arbiter(IIZ_Result r) {
243+
// Validate both disposals left the Scheduler in consistent state,
244+
// assuming the await process coordinates on the resources as identified
245+
// by r.r1 and r.r2, which should be equal.
246+
boolean consistentState = r.r1 == r.r2;
247+
r.r3 = consistentState && scheduler.isDisposed();
248+
}
249+
}
250+
251+
@JCStressTest
252+
@Outcome(id = {".*, true"}, expect = Expect.ACCEPTABLE,
253+
desc = "Scheduler in consistent state upon concurrent dispose and " +
254+
"disposeGracefully, eventually disposed.")
255+
@State
256+
public static class SingleSchedulerDisposeGracefullyAndDisposeStressTest {
257+
258+
private final SingleScheduler scheduler = new SingleScheduler(Thread::new);
259+
260+
{
261+
scheduler.start();
262+
}
263+
264+
@Actor
265+
public void disposeGracefully(IIZ_Result r) {
266+
scheduler.disposeGracefully().subscribe();
267+
r.r1 = scheduler.state.initialResource.hashCode();
268+
}
269+
270+
@Actor
271+
public void dispose(IIZ_Result r) {
272+
scheduler.dispose();
273+
r.r2 = scheduler.state.initialResource.hashCode();
274+
}
275+
276+
@Arbiter
277+
public void arbiter(IIZ_Result r) {
278+
// Validate both disposals left the Scheduler in consistent state,
279+
// assuming the await process coordinates on the resources as identified
280+
// by r.r1 and r.r2, which should be equal.
281+
boolean consistentState = r.r1 == r.r2;
282+
r.r3 = consistentState && scheduler.isDisposed();
283+
}
284+
}
285+
286+
@JCStressTest
287+
@Outcome(id = {".*, true"}, expect = Expect.ACCEPTABLE,
288+
desc = "Scheduler in consistent state upon concurrent dispose and " +
289+
"disposeGracefully, eventually disposed.")
290+
@State
291+
public static class ParallelSchedulerDisposeGracefullyAndDisposeStressTest {
292+
293+
private final ParallelScheduler scheduler =
294+
new ParallelScheduler(10, Thread::new);
295+
296+
{
297+
scheduler.start();
298+
}
299+
300+
@Actor
301+
public void disposeGracefully(IIZ_Result r) {
302+
scheduler.disposeGracefully().subscribe();
303+
r.r1 = scheduler.state.initialResource.hashCode();
304+
}
305+
306+
@Actor
307+
public void dispose(IIZ_Result r) {
308+
scheduler.dispose();
309+
r.r2 = scheduler.state.initialResource.hashCode();
310+
}
311+
312+
@Arbiter
313+
public void arbiter(IIZ_Result r) {
314+
// Validate both disposals left the Scheduler in consistent state,
315+
// assuming the await process coordinates on the resources as identified
316+
// by r.r1 and r.r2, which should be equal.
317+
boolean consistentState = r.r1 == r.r2;
318+
r.r3 = consistentState && scheduler.isDisposed();
319+
}
320+
}
321+
322+
@JCStressTest
323+
@Outcome(id = {".*, true"}, expect = Expect.ACCEPTABLE,
324+
desc = "Scheduler in consistent state upon concurrent dispose and " +
325+
"disposeGracefully, eventually disposed.")
326+
@State
327+
public static class BoundedElasticSchedulerDisposeGracefullyAndDisposeStressTest {
328+
329+
330+
private final BoundedElasticScheduler scheduler =
331+
new BoundedElasticScheduler(4, 4, Thread::new, 5);
332+
333+
{
334+
scheduler.start();
335+
}
336+
337+
@Actor
338+
public void disposeGracefully(IIZ_Result r) {
339+
scheduler.disposeGracefully().subscribe();
340+
r.r1 = scheduler.state.initialResource.hashCode();
341+
}
342+
343+
@Actor
344+
public void dispose(IIZ_Result r) {
345+
scheduler.dispose();
346+
r.r2 = scheduler.state.initialResource.hashCode();
347+
}
348+
349+
@Arbiter
350+
public void arbiter(IIZ_Result r) {
351+
// Validate both disposals left the Scheduler in consistent state,
352+
// assuming the await process coordinates on the resources as identified
353+
// by r.r1 and r.r2, which should be equal.
354+
boolean consistentState = r.r1 == r.r2;
355+
r.r3 = consistentState && scheduler.isDisposed();
356+
}
357+
}
358+
}

0 commit comments

Comments
 (0)