-
Notifications
You must be signed in to change notification settings - Fork 7.6k
Description
For some time, I have been investigating a deadlock that occurs in my codebase, seemingly after the default backbuffer size has been filled. While trying to create a repro, I ran into a similar issue which shows the same symptoms with a minimum amount of code. I see no obvious cause, and therefore believe this might be a potential RxJava 2.x bug (reproducible so far on 2.2.0 and 2.2.5).
fun main( args: Array<String> )
{
// Override the default buffer size of 128 to 10 for easier testing.
System.setProperty( "rx2.buffer-size", "10" )
val numbers = Flowable
.range( 0, Int.MAX_VALUE )
// Commenting out '.doOnSubscribe' here prevents the deadlock.
.doOnSubscribe { println( "Subscribed." ) }
.share()
val evenNumbers = numbers
.filter { number -> number % 2 == 0 }
val oddNumbers = numbers
.filter { number -> number % 2 != 0 }
val getNextOdd = oddNumbers.first( 0 )
evenNumbers
.concatMap { even ->
Single.zip(
Single.just( even ), getNextOdd,
BiFunction { a: Int, b: Int -> Pair( a, b ) }
).toFlowable()
}
.takeWhile { it.first < 1000 }
.doOnComplete { println( "Done." ) }
.subscribe { println( it ) }
readLine()
}
This halts depending on the backbuffer size. In the code example at:
Subscribed.
(0, 1)
(2, 3)
(4, 5)
(6, 7)
(8, 9)
As commented in the above code, commenting out the doOnSubscribe line will prevent the deadlock from occurring.
Is this a bug, or am I doing something inherently wrong? E.g., I imagine things get complicated the way I reuse the same stream to 'zip', but this is the use case which also is needed in my actual codebase. Regardless, I do not understand what doOnSubscribe would have to do with anything here.