Skip to content

Parallelism limitation of 100 VirtualThread in Flux #3857

@expe-elenigen

Description

@expe-elenigen

Expected Behavior

With Java 21 VirtualThread, I was expecting to be able to run a huge number of virtual threads in parallel, since they are much lighter to handle. I tried to run a Flux configured with a schedule which is a Schedulers.boundedElastic() while enabling the virtual threads: -Dreactor.schedulers.defaultBoundedElasticOnVirtualThreads=true.

The test is simple, in a loop, I run simple Thread sleep of 1 sec and at first it worked, if I picked a value 100 virtual threads, I can see them from loomBoundedElastic-1 to loomBoundedElastic-100 are executed and the total time elapsed is ~1 sec but if I double this number to 200, I would still expect to see ~1 sec.

Actual Behavior

With 200, I see a different pattern where the first 100 virtual threads are executed with the same ~1 sec, but then the remaining 100 are executed in ~ 100 sec for a total of 101542 ms.

Steps to Reproduce

Here's the code:

package com.my.test

import mu.KotlinLogging
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.core.scheduler.Schedulers

class Main

private val log = KotlinLogging.logger {}

const val ONE_SEC = 1_000L
const val TOTAL = 200

fun main(args: Array<String>) {
    val steps = (1..TOTAL).toList()
    val scheduler = Schedulers.boundedElastic()
    val startTime = System.currentTimeMillis()
    log.info { "=== START ===" }
    Flux
        .fromIterable(steps)
        .flatMap { step ->
            Mono
                .fromCallable {
                    Thread.sleep(ONE_SEC)
                }.log()
                .subscribeOn(scheduler)
        }.collectList()
        .block()
    log.info { "=== END ===" }
    log.info { "Time taken: ${System.currentTimeMillis() - startTime} ms" }
}
Logs TOTAL = 200
10:47:10.905 [main] INFO com.my.test.Main -- === START ===
10:47:10.957 [loomBoundedElastic-6] INFO reactor.Mono.Callable.6 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:47:10.957 [loomBoundedElastic-4] INFO reactor.Mono.Callable.4 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:47:10.959 [loomBoundedElastic-11] INFO reactor.Mono.Callable.11 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:47:10.957 [loomBoundedElastic-9] INFO reactor.Mono.Callable.9 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:47:10.957 [loomBoundedElastic-2] INFO reactor.Mono.Callable.2 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:47:10.959 [loomBoundedElastic-11] INFO reactor.Mono.Callable.11 -- | request(32)
10:47:10.957 [loomBoundedElastic-1] INFO reactor.Mono.Callable.1 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:47:10.959 [loomBoundedElastic-1] INFO reactor.Mono.Callable.1 -- | request(32)
10:47:10.960 [loomBoundedElastic-6] INFO reactor.Mono.Callable.6 -- | request(32)
10:47:10.957 [loomBoundedElastic-7] INFO reactor.Mono.Callable.7 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
...
10:47:11.969 [loomBoundedElastic-73] INFO reactor.Mono.Callable.73 -- | onComplete()
10:47:12.975 [loomBoundedElastic-101] INFO reactor.Mono.Callable.101 -- | onNext(kotlin.Unit)
10:47:12.976 [loomBoundedElastic-101] INFO reactor.Mono.Callable.101 -- | onComplete()
10:47:12.976 [loomBoundedElastic-102] INFO reactor.Mono.Callable.102 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:47:12.977 [loomBoundedElastic-102] INFO reactor.Mono.Callable.102 -- | request(32)
10:47:13.978 [loomBoundedElastic-102] INFO reactor.Mono.Callable.102 -- | onNext(kotlin.Unit)
10:47:13.979 [loomBoundedElastic-102] INFO reactor.Mono.Callable.102 -- | onComplete()
10:47:13.980 [loomBoundedElastic-103] INFO reactor.Mono.Callable.103 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:47:13.980 [loomBoundedElastic-103] INFO reactor.Mono.Callable.103 -- | request(32)
10:47:14.985 [loomBoundedElastic-103] INFO reactor.Mono.Callable.103 -- | onNext(kotlin.Unit)
10:47:14.986 [loomBoundedElastic-103] INFO reactor.Mono.Callable.103 -- | onComplete()
10:47:14.986 [loomBoundedElastic-104] INFO reactor.Mono.Callable.104 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:47:14.987 [loomBoundedElastic-104] INFO reactor.Mono.Callable.104 -- | request(32)
10:47:15.991 [loomBoundedElastic-104] INFO reactor.Mono.Callable.104 -- | onNext(kotlin.Unit)
10:47:15.992 [loomBoundedElastic-104] INFO reactor.Mono.Callable.104 -- | onComplete()
10:47:15.992 [loomBoundedElastic-105] INFO reactor.Mono.Callable.105 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:47:15.993 [loomBoundedElastic-105] INFO reactor.Mono.Callable.105 -- | request(32)
10:47:16.995 [loomBoundedElastic-105] INFO reactor.Mono.Callable.105 -- | onNext(kotlin.Unit)
10:47:16.995 [loomBoundedElastic-105] INFO reactor.Mono.Callable.105 -- | onComplete()
10:47:16.996 [loomBoundedElastic-106] INFO reactor.Mono.Callable.106 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:47:16.996 [loomBoundedElastic-106] INFO reactor.Mono.Callable.106 -- | request(32)
10:47:17.998 [loomBoundedElastic-106] INFO reactor.Mono.Callable.106 -- | onNext(kotlin.Unit)
...
10:48:51.440 [loomBoundedElastic-200] INFO reactor.Mono.Callable.200 -- | request(32)
10:48:52.443 [loomBoundedElastic-200] INFO reactor.Mono.Callable.200 -- | onNext(kotlin.Unit)
10:48:52.443 [loomBoundedElastic-200] INFO reactor.Mono.Callable.200 -- | onComplete()
10:48:52.445 [main] INFO com.my.test.Main -- === END ===
10:48:52.448 [main] INFO com.my.test.Main -- Time taken: 101542 ms
Logs TOTAL = 100
10:59:05.849 [main] INFO com.my.test.Main -- === START ===
10:59:05.905 [loomBoundedElastic-7] INFO reactor.Mono.Callable.7 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:59:05.905 [loomBoundedElastic-2] INFO reactor.Mono.Callable.2 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
10:59:05.905 [loomBoundedElastic-5] INFO reactor.Mono.Callable.5 -- | onSubscribe([Fuseable] MonoCallable.MonoCallableSubscription)
...
10:59:06.918 [loomBoundedElastic-30] INFO reactor.Mono.Callable.30 -- | onComplete()
10:59:06.919 [loomBoundedElastic-63] INFO reactor.Mono.Callable.63 -- | onComplete()
10:59:06.919 [loomBoundedElastic-65] INFO reactor.Mono.Callable.65 -- | onComplete()
10:59:06.919 [loomBoundedElastic-72] INFO reactor.Mono.Callable.72 -- | onComplete()
10:59:06.919 [loomBoundedElastic-70] INFO reactor.Mono.Callable.70 -- | onComplete()
10:59:06.919 [loomBoundedElastic-86] INFO reactor.Mono.Callable.86 -- | onComplete()
10:59:06.920 [main] INFO com.my.test.Main -- === END ===
10:59:06.922 [main] INFO com.my.test.Main -- Time taken: 1073 ms

Possible Solution

I didn't see anywhere mentioning a limitation of 100 virtual threads and I'm curious also if the threads are supposed to be reused or they are simply disposable? I was also thinking, if there's a cap of 100 threads, once the first batch is processed, the second 100 tasks should be done within 1 sec, so a total of ~2 sec.

Your Environment

MacBook Pro M1

  • Reactor version(s) used: 3.6.0
  • Other relevant libraries versions (eg. netty, ...): N/A
  • JVM version (java -version): OpenJDK Runtime Environment Temurin-21.0.2+13 (build 21.0.2+13-LTS)
  • OS and version (eg uname -a): 23.5.0 Darwin Kernel Version 23.5.0

Metadata

Metadata

Assignees

Labels

Type

No type

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions