Skip to content

Commit 0a008ed

Browse files
authored
Preserve thread context when calling getNextPart() (#109519)
The listener supplied to `getNextPart()` may be completed in a non-default thread context, and this trips assertions about thread context pollution in the transport layer, so this commit adds the missing protection against that. Also the listener supplied to `getNextPart()` may be completed immediately on the transport thread, which could lead to a stack overflow, so this commit adds another `execute()` call to unconditionally fork a fresh task. Relates #104851
1 parent c983b3f commit 0a008ed

2 files changed

Lines changed: 70 additions & 45 deletions

File tree

modules/transport-netty4/src/internalClusterTest/java/org/elasticsearch/http/netty4/Netty4ChunkedContinuationsIT.java

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -310,7 +310,7 @@ public void onFailure(Exception exception) {
310310
safeSleep(scaledRandomIntBetween(10, 500)); // make it more likely the request started executing
311311
}
312312
cancellable.cancel();
313-
} // closing the request tracker ensures that everything is released, including all response chunks and the overall response
313+
} // closing the resource tracker ensures that everything is released, including all response chunks and the overall response
314314
}
315315

316316
private static Releasable withResourceTracker() {
@@ -525,6 +525,7 @@ public ActionRequestValidationException validate() {
525525
public static class Response extends ActionResponse {
526526
private final Executor executor;
527527
volatile boolean computingContinuation;
528+
boolean recursive = false;
528529

529530
public Response(Executor executor) {
530531
this.executor = executor;
@@ -551,11 +552,17 @@ public boolean isLastPart() {
551552

552553
@Override
553554
public void getNextPart(ActionListener<ChunkedRestResponseBodyPart> listener) {
554-
computingContinuation = true;
555-
executor.execute(ActionRunnable.supply(listener, () -> {
556-
computingContinuation = false;
557-
return getResponseBodyPart();
558-
}));
555+
assertFalse(recursive);
556+
recursive = true;
557+
try {
558+
computingContinuation = true;
559+
executor.execute(ActionRunnable.supply(listener, () -> {
560+
computingContinuation = false;
561+
return getResponseBodyPart();
562+
}));
563+
} finally {
564+
recursive = false;
565+
}
559566
}
560567

561568
@Override
@@ -585,7 +592,10 @@ public TransportInfiniteContinuationsAction(ActionFilters actionFilters, Transpo
585592
@Override
586593
protected void doExecute(Task task, Request request, ActionListener<Response> listener) {
587594
executor.execute(
588-
ActionRunnable.supply(ActionTestUtils.assertNoFailureListener(listener::onResponse), () -> new Response(executor))
595+
ActionRunnable.supply(
596+
ActionTestUtils.assertNoFailureListener(listener::onResponse),
597+
() -> new Response(randomFrom(executor, EsExecutors.DIRECT_EXECUTOR_SERVICE))
598+
)
589599
);
590600
}
591601
}

modules/transport-netty4/src/main/java/org/elasticsearch/http/netty4/Netty4HttpPipeliningHandler.java

Lines changed: 53 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.apache.logging.log4j.Logger;
3030
import org.elasticsearch.ExceptionsHelper;
3131
import org.elasticsearch.action.ActionListener;
32+
import org.elasticsearch.action.support.ContextPreservingActionListener;
3233
import org.elasticsearch.common.Strings;
3334
import org.elasticsearch.common.bytes.ReleasableBytesReference;
3435
import org.elasticsearch.common.network.ThreadWatchdog;
@@ -271,45 +272,59 @@ private void finishChunkedWrite() {
271272
writeSequence++;
272273
finishingWrite.combiner().finish(finishingWrite.onDone());
273274
} else {
275+
final var threadContext = serverTransport.getThreadPool().getThreadContext();
276+
assert Transports.assertDefaultThreadContext(threadContext);
274277
final var channel = finishingWrite.onDone().channel();
275-
ActionListener.run(ActionListener.assertOnce(new ActionListener<>() {
276-
@Override
277-
public void onResponse(ChunkedRestResponseBodyPart continuation) {
278-
channel.writeAndFlush(
279-
new Netty4ChunkedHttpContinuation(writeSequence, continuation, finishingWrite.combiner()),
280-
finishingWrite.onDone() // pass the terminal listener/promise along the line
281-
);
282-
checkShutdown();
283-
}
284-
285-
@Override
286-
public void onFailure(Exception e) {
287-
logger.error(
288-
Strings.format("failed to get continuation of HTTP response body for [%s], closing connection", channel),
289-
e
290-
);
291-
channel.close().addListener(ignored -> {
292-
finishingWrite.combiner().add(channel.newFailedFuture(e));
293-
finishingWrite.combiner().finish(finishingWrite.onDone());
294-
});
295-
checkShutdown();
296-
}
297-
298-
private void checkShutdown() {
299-
if (channel.eventLoop().isShuttingDown()) {
300-
// The event loop is shutting down, and https://github.com/netty/netty/issues/8007 means that we cannot know if the
301-
// preceding activity made it onto its queue before shutdown or whether it will just vanish without a trace, so
302-
// to avoid a leak we must double-check that the final listener is completed once the event loop is terminated.
303-
// Note that the final listener came from Netty4Utils#safeWriteAndFlush so its executor is an ImmediateEventExecutor
304-
// which means this completion is not subject to the same issue, it still works even if the event loop has already
305-
// terminated.
306-
channel.eventLoop()
307-
.terminationFuture()
308-
.addListener(ignored -> finishingWrite.onDone().tryFailure(new ClosedChannelException()));
309-
}
310-
}
311-
312-
}), finishingWriteBodyPart::getNextPart);
278+
ActionListener.run(
279+
new ContextPreservingActionListener<>(
280+
threadContext.newRestorableContext(false),
281+
ActionListener.assertOnce(new ActionListener<>() {
282+
@Override
283+
public void onResponse(ChunkedRestResponseBodyPart continuation) {
284+
// always fork a fresh task to avoid stack overflow
285+
assert Transports.assertDefaultThreadContext(threadContext);
286+
channel.eventLoop()
287+
.execute(
288+
() -> channel.writeAndFlush(
289+
new Netty4ChunkedHttpContinuation(writeSequence, continuation, finishingWrite.combiner()),
290+
finishingWrite.onDone() // pass the terminal listener/promise along the line
291+
)
292+
);
293+
checkShutdown();
294+
}
295+
296+
@Override
297+
public void onFailure(Exception e) {
298+
assert Transports.assertDefaultThreadContext(threadContext);
299+
logger.error(
300+
Strings.format("failed to get continuation of HTTP response body for [%s], closing connection", channel),
301+
e
302+
);
303+
channel.close().addListener(ignored -> {
304+
finishingWrite.combiner().add(channel.newFailedFuture(e));
305+
finishingWrite.combiner().finish(finishingWrite.onDone());
306+
});
307+
checkShutdown();
308+
}
309+
310+
private void checkShutdown() {
311+
if (channel.eventLoop().isShuttingDown()) {
312+
// The event loop is shutting down, and https://github.com/netty/netty/issues/8007 means that we cannot know
313+
// if the preceding activity made it onto its queue before shutdown or whether it will just vanish without a
314+
// trace, so to avoid a leak we must double-check that the final listener is completed once the event loop
315+
// is terminated. Note that the final listener came from Netty4Utils#safeWriteAndFlush so its executor is an
316+
// ImmediateEventExecutor which means this completion is not subject to the same issue, it still works even
317+
// if the event loop has already terminated.
318+
channel.eventLoop()
319+
.terminationFuture()
320+
.addListener(ignored -> finishingWrite.onDone().tryFailure(new ClosedChannelException()));
321+
}
322+
}
323+
324+
})
325+
),
326+
finishingWriteBodyPart::getNextPart
327+
);
313328
}
314329
}
315330

0 commit comments

Comments
 (0)