Skip to content

DataBuffer Leak in Server Response #26232

@rstoyanchev

Description

@rstoyanchev

This is based on a report in Reactor Netty reactor/reactor-netty#1408 where the last leak record is in AbstractJackson2Encoder with no further records in Reactor Netty, which indicates it gets dropped before it gets there:

LEAK: ByteBuf.release() was not called before it's garbage-collected. See http://netty.io/wiki/reference-counted-objects.html for more information.
Recent access records: 
#1:
	io.netty.buffer.AdvancedLeakAwareByteBuf.writeBytes(AdvancedLeakAwareByteBuf.java:616)
	io.netty.buffer.ByteBufOutputStream.write(ByteBufOutputStream.java:68)
	com.fasterxml.jackson.core.json.UTF8JsonGenerator._flushBuffer(UTF8JsonGenerator.java:2085)
	com.fasterxml.jackson.core.json.UTF8JsonGenerator.flush(UTF8JsonGenerator.java:1097)
	com.fasterxml.jackson.databind.ObjectWriter.writeValue(ObjectWriter.java:915)
	org.springframework.http.codec.json.AbstractJackson2Encoder.encodeValue(AbstractJackson2Encoder.java:174)
	org.springframework.http.codec.json.AbstractJackson2Encoder.lambda$encode$0(AbstractJackson2Encoder.java:122)
	reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:107)
	reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2070)
	reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162)
	reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:1878)
	reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:1752)
	reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)
	reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)
	reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59)
	reactor.core.publisher.MonoSwitchIfEmpty.subscribe(MonoSwitchIfEmpty.java:44)
	reactor.core.publisher.MonoFlatMap.subscribe(MonoFlatMap.java:60)
	reactor.core.publisher.MonoOnErrorResume.subscribe(MonoOnErrorResume.java:44)
	reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150)
	reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1505)
	reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:241)
	reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
	reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1505)
	reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:241)
	reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:121)
	reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2070)
	reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:162)
	reactor.core.publisher.MonoFlatMap$FlatMapInner.onSubscribe(MonoFlatMap.java:230)
	reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:90)
	reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54)
	reactor.core.publisher.MonoMapFuseable.subscribe(MonoMapFuseable.java:59)
	reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150)
	reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1505)
	reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:144)
	reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1505)
	reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:241)
	reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1505)
	reactor.core.publisher.MonoSingle$SingleSubscriber.onComplete(MonoSingle.java:171)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:794)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:560)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:540)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.onComplete(FluxFlatMap.java:426)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.checkTerminated(FluxFlatMap.java:794)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.drainLoop(FluxFlatMap.java:560)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.drain(FluxFlatMap.java:540)
	reactor.core.publisher.FluxFlatMap$FlatMapMain.onComplete(FluxFlatMap.java:426)
	reactor.core.publisher.DrainUtils.postCompleteDrain(DrainUtils.java:131)
	reactor.core.publisher.DrainUtils.postComplete(DrainUtils.java:186)
	reactor.core.publisher.FluxMapSignal$FluxMapSignalSubscriber.onComplete(FluxMapSignal.java:213)
	reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)
	reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)
	reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)
	reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)
	reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:372)
	reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:196)
	reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:337)
	reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:333)
	reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:382)
	reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:522)
	reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:141)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
	io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
	io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
	io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
	io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
	io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:337)
	io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:359)
	io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:345)
	io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
	io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:796)
	io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:427)
	io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:328)
	io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
	java.lang.Thread.run(Thread.java:748)

The scenario is a simple write to the body of a server response:

@Component
public class Handler {

    private WebClient webClient;

    public Handler(WebClient.Builder webClientBuilder) {
        String url = UriComponentsBuilder.newInstance()
                .scheme("http")
                .host("127.0.0.1")
                .build()
                .toUriString();
        webClient = webClientBuilder
                .baseUrl(url)
                .clientConnector(new ReactorClientHttpConnector(HttpClient.create()))
                .build();
    }
    public Mono<ServerResponse> handle(ServerRequest serverRequest) {
        return query(serverRequest.path()).flatMap(jsonNode -> ServerResponse.ok().syncBody(jsonNode));
    }
    private Mono<JsonNode> query(String path) {
        return webClient.get()
                .uri(uriBuilder -> uriBuilder
                        .port(8080)
                        .path(path)
                        .build())
                .retrieve()
                .bodyToMono(JsonNode.class)
                .flatMap(Mono::just);
    }
}

The use of WebClient is probably not key. The main point is that the client (JMeter tests) is set to timeout immediately at 1 ms which sets up a race between writing to the response and the client going away.

The full sample https://github.com/yinjianfei/webflux-leak-test.

Metadata

Metadata

Assignees

Labels

in: webIssues in web modules (web, webmvc, webflux, websocket)status: backportedAn issue that has been backported to maintenance branchestype: bugA general bug

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions