Expected Behavior
With Java 21 VirtualThread, I was expecting to be able to run a huge number of virtual threads in parallel, since they are much lighter to handle. I tried to run a Flux configured with a schedule which is a Schedulers.boundedElastic() while enabling the virtual threads: -Dreactor.schedulers.defaultBoundedElasticOnVirtualThreads=true.
The test is simple, in a loop, I run simple Thread sleep of 1 sec and at first it worked, if I picked a value 100 virtual threads, I can see them from loomBoundedElastic-1 to loomBoundedElastic-100 are executed and the total time elapsed is ~1 sec but if I double this number to 200, I would still expect to see ~1 sec.
Actual Behavior
With 200, I see a different pattern where the first 100 virtual threads are executed with the same ~1 sec, but then the remaining 100 are executed in ~ 100 sec for a total of 101542 ms.
Steps to Reproduce
Here's the code:
package com.my.test
import mu.KotlinLogging
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers
class Main
private val log = KotlinLogging.logger {}
const val ONE_SEC = 1_000L
const val TOTAL = 200
fun main(args: Array<String>) {
val steps = (1..TOTAL).toList()
val scheduler = Schedulers.boundedElastic()
val startTime = System.currentTimeMillis()
log.info { "=== START ===" }
Flux
.fromIterable(steps)
.flatMap { step ->
Mono
.fromCallable {
Thread.sleep(ONE_SEC)
}.log()
.subscribeOn(scheduler)
}.collectList()
.block()
log.info { "=== END ===" }
log.info { "Time taken: ${System.currentTimeMillis() - startTime} ms" }
}
Logs TOTAL = 200
10:47:10.905 [main] INFO com.my.test.Main -- === START ===
10:47:10.957 [loomBoundedElastic-6] INFO reactor.Mono.Callable.6 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:47:10.957 [loomBoundedElastic-4] INFO reactor.Mono.Callable.4 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:47:10.959 [loomBoundedElastic-11] INFO reactor.Mono.Callable.11 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:47:10.957 [loomBoundedElastic-9] INFO reactor.Mono.Callable.9 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:47:10.957 [loomBoundedElastic-2] INFO reactor.Mono.Callable.2 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:47:10.959 [loomBoundedElastic-11] INFO reactor.Mono.Callable.11 -- | request(32)
10:47:10.957 [loomBoundedElastic-1] INFO reactor.Mono.Callable.1 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:47:10.959 [loomBoundedElastic-1] INFO reactor.Mono.Callable.1 -- | request(32)
10:47:10.960 [loomBoundedElastic-6] INFO reactor.Mono.Callable.6 -- | request(32)
10:47:10.957 [loomBoundedElastic-7] INFO reactor.Mono.Callable.7 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
...
10:47:11.969 [loomBoundedElastic-73] INFO reactor.Mono.Callable.73 -- | onComplete()
10:47:12.975 [loomBoundedElastic-101] INFO reactor.Mono.Callable.101 -- | onNext(kotlin.Unit)
10:47:12.976 [loomBoundedElastic-101] INFO reactor.Mono.Callable.101 -- | onComplete()
10:47:12.976 [loomBoundedElastic-102] INFO reactor.Mono.Callable.102 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:47:12.977 [loomBoundedElastic-102] INFO reactor.Mono.Callable.102 -- | request(32)
10:47:13.978 [loomBoundedElastic-102] INFO reactor.Mono.Callable.102 -- | onNext(kotlin.Unit)
10:47:13.979 [loomBoundedElastic-102] INFO reactor.Mono.Callable.102 -- | onComplete()
10:47:13.980 [loomBoundedElastic-103] INFO reactor.Mono.Callable.103 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:47:13.980 [loomBoundedElastic-103] INFO reactor.Mono.Callable.103 -- | request(32)
10:47:14.985 [loomBoundedElastic-103] INFO reactor.Mono.Callable.103 -- | onNext(kotlin.Unit)
10:47:14.986 [loomBoundedElastic-103] INFO reactor.Mono.Callable.103 -- | onComplete()
10:47:14.986 [loomBoundedElastic-104] INFO reactor.Mono.Callable.104 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:47:14.987 [loomBoundedElastic-104] INFO reactor.Mono.Callable.104 -- | request(32)
10:47:15.991 [loomBoundedElastic-104] INFO reactor.Mono.Callable.104 -- | onNext(kotlin.Unit)
10:47:15.992 [loomBoundedElastic-104] INFO reactor.Mono.Callable.104 -- | onComplete()
10:47:15.992 [loomBoundedElastic-105] INFO reactor.Mono.Callable.105 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:47:15.993 [loomBoundedElastic-105] INFO reactor.Mono.Callable.105 -- | request(32)
10:47:16.995 [loomBoundedElastic-105] INFO reactor.Mono.Callable.105 -- | onNext(kotlin.Unit)
10:47:16.995 [loomBoundedElastic-105] INFO reactor.Mono.Callable.105 -- | onComplete()
10:47:16.996 [loomBoundedElastic-106] INFO reactor.Mono.Callable.106 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:47:16.996 [loomBoundedElastic-106] INFO reactor.Mono.Callable.106 -- | request(32)
10:47:17.998 [loomBoundedElastic-106] INFO reactor.Mono.Callable.106 -- | onNext(kotlin.Unit)
...
10:48:51.440 [loomBoundedElastic-200] INFO reactor.Mono.Callable.200 -- | request(32)
10:48:52.443 [loomBoundedElastic-200] INFO reactor.Mono.Callable.200 -- | onNext(kotlin.Unit)
10:48:52.443 [loomBoundedElastic-200] INFO reactor.Mono.Callable.200 -- | onComplete()
10:48:52.445 [main] INFO com.my.test.Main -- === END ===
10:48:52.448 [main] INFO com.my.test.Main -- Time taken: 101542 ms
Logs TOTAL = 100
10:59:05.849 [main] INFO com.my.test.Main -- === START ===
10:59:05.905 [loomBoundedElastic-7] INFO reactor.Mono.Callable.7 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:59:05.905 [loomBoundedElastic-2] INFO reactor.Mono.Callable.2 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:59:05.905 [loomBoundedElastic-5] INFO reactor.Mono.Callable.5 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
...
10:59:06.918 [loomBoundedElastic-30] INFO reactor.Mono.Callable.30 -- | onComplete()
10:59:06.919 [loomBoundedElastic-63] INFO reactor.Mono.Callable.63 -- | onComplete()
10:59:06.919 [loomBoundedElastic-65] INFO reactor.Mono.Callable.65 -- | onComplete()
10:59:06.919 [loomBoundedElastic-72] INFO reactor.Mono.Callable.72 -- | onComplete()
10:59:06.919 [loomBoundedElastic-70] INFO reactor.Mono.Callable.70 -- | onComplete()
10:59:06.919 [loomBoundedElastic-86] INFO reactor.Mono.Callable.86 -- | onComplete()
10:59:06.920 [main] INFO com.my.test.Main -- === END ===
10:59:06.922 [main] INFO com.my.test.Main -- Time taken: 1073 ms
Possible Solution
I didn't see anywhere mentioning a limitation of 100 virtual threads and I'm curious also if the threads are supposed to be reused or they are simply disposable? I was also thinking, if there's a cap of 100 threads, once the first batch is processed, the second 100 tasks should be done within 1 sec, so a total of ~2 sec.
Your Environment
MacBook Pro M1
- Reactor version(s) used: 3.6.0
- Other relevant libraries versions (eg.
netty, ...): N/A
- JVM version (
java -version): OpenJDK Runtime Environment Temurin-21.0.2+13 (build 21.0.2+13-LTS)
- OS and version (eg
uname -a): 23.5.0 Darwin Kernel Version 23.5.0
Expected Behavior
With Java 21 VirtualThread, I was expecting to be able to run a huge number of virtual threads in parallel, since they are much lighter to handle. I tried to run a Flux configured with a schedule which is a
Schedulers.boundedElastic()while enabling the virtual threads:-Dreactor.schedulers.defaultBoundedElasticOnVirtualThreads=true.The test is simple, in a loop, I run simple Thread sleep of 1 sec and at first it worked, if I picked a value 100 virtual threads, I can see them from
loomBoundedElastic-1toloomBoundedElastic-100are executed and the total time elapsed is ~1 sec but if I double this number to 200, I would still expect to see ~1 sec.Actual Behavior
With 200, I see a different pattern where the first 100 virtual threads are executed with the same ~1 sec, but then the remaining 100 are executed in ~ 100 sec for a total of 101542 ms.
Steps to Reproduce
Here's the code:
Logs TOTAL = 200
Logs TOTAL = 100
Possible Solution
I didn't see anywhere mentioning a limitation of 100 virtual threads and I'm curious also if the threads are supposed to be reused or they are simply disposable? I was also thinking, if there's a cap of 100 threads, once the first batch is processed, the second 100 tasks should be done within 1 sec, so a total of ~2 sec.
Your Environment
MacBook Pro M1
netty, ...): N/Ajava -version): OpenJDK Runtime Environment Temurin-21.0.2+13 (build 21.0.2+13-LTS)uname -a): 23.5.0 Darwin Kernel Version 23.5.0