-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Closed as not planned
Closed as not planned
Copy link
Labels
status/need-designThis needs more in depth design workThis needs more in depth design worktype/enhancementA general enhancementA general enhancement
Description
expandDeep uses unbounded queue internally which results in OutOfMemoryException when subscriber is not fast enough to keep up with publisher
Expected Behavior
not sure
Actual Behavior
OutOfMemoryException thrown
Steps to Reproduce
@Test
void reproCase() {
val genBytes = { Array(1024 * 1024 * 100) { 0 } }
val i = AtomicInteger(0)
Flux. just(genBytes()).expandDeep({
if (i.incrementAndGet() > 100)
Flux.empty()
else
Flux.just(genBytes())
}, 1)
.delayElements(Duration.ofMinutes(5))
.subscribe {
println("byte array processed")
}
}Possible Solution
add variety of expand and expandDeep methods with bounded queue in order to limit memory usage
Your Environment
- Reactor version(s) used: 3.4.27
- Other relevant libraries versions (eg.
netty, ...): - JVM version (
java -version): 11 - OS and version (eg
uname -a): Windows 10
Reactions are currently unavailable
Metadata
Metadata
Assignees
Labels
status/need-designThis needs more in depth design workThis needs more in depth design worktype/enhancementA general enhancementA general enhancement