|
29 | 29 | import org.apache.logging.log4j.Logger; |
30 | 30 | import org.elasticsearch.ExceptionsHelper; |
31 | 31 | import org.elasticsearch.action.ActionListener; |
| 32 | +import org.elasticsearch.action.support.ContextPreservingActionListener; |
32 | 33 | import org.elasticsearch.common.Strings; |
33 | 34 | import org.elasticsearch.common.bytes.ReleasableBytesReference; |
34 | 35 | import org.elasticsearch.common.network.ThreadWatchdog; |
@@ -271,45 +272,59 @@ private void finishChunkedWrite() { |
271 | 272 | writeSequence++; |
272 | 273 | finishingWrite.combiner().finish(finishingWrite.onDone()); |
273 | 274 | } else { |
| 275 | + final var threadContext = serverTransport.getThreadPool().getThreadContext(); |
| 276 | + assert Transports.assertDefaultThreadContext(threadContext); |
274 | 277 | final var channel = finishingWrite.onDone().channel(); |
275 | | - ActionListener.run(ActionListener.assertOnce(new ActionListener<>() { |
276 | | - @Override |
277 | | - public void onResponse(ChunkedRestResponseBodyPart continuation) { |
278 | | - channel.writeAndFlush( |
279 | | - new Netty4ChunkedHttpContinuation(writeSequence, continuation, finishingWrite.combiner()), |
280 | | - finishingWrite.onDone() // pass the terminal listener/promise along the line |
281 | | - ); |
282 | | - checkShutdown(); |
283 | | - } |
284 | | - |
285 | | - @Override |
286 | | - public void onFailure(Exception e) { |
287 | | - logger.error( |
288 | | - Strings.format("failed to get continuation of HTTP response body for [%s], closing connection", channel), |
289 | | - e |
290 | | - ); |
291 | | - channel.close().addListener(ignored -> { |
292 | | - finishingWrite.combiner().add(channel.newFailedFuture(e)); |
293 | | - finishingWrite.combiner().finish(finishingWrite.onDone()); |
294 | | - }); |
295 | | - checkShutdown(); |
296 | | - } |
297 | | - |
298 | | - private void checkShutdown() { |
299 | | - if (channel.eventLoop().isShuttingDown()) { |
300 | | - // The event loop is shutting down, and https://github.com/netty/netty/issues/8007 means that we cannot know if the |
301 | | - // preceding activity made it onto its queue before shutdown or whether it will just vanish without a trace, so |
302 | | - // to avoid a leak we must double-check that the final listener is completed once the event loop is terminated. |
303 | | - // Note that the final listener came from Netty4Utils#safeWriteAndFlush so its executor is an ImmediateEventExecutor |
304 | | - // which means this completion is not subject to the same issue, it still works even if the event loop has already |
305 | | - // terminated. |
306 | | - channel.eventLoop() |
307 | | - .terminationFuture() |
308 | | - .addListener(ignored -> finishingWrite.onDone().tryFailure(new ClosedChannelException())); |
309 | | - } |
310 | | - } |
311 | | - |
312 | | - }), finishingWriteBodyPart::getNextPart); |
| 278 | + ActionListener.run( |
| 279 | + new ContextPreservingActionListener<>( |
| 280 | + threadContext.newRestorableContext(false), |
| 281 | + ActionListener.assertOnce(new ActionListener<>() { |
| 282 | + @Override |
| 283 | + public void onResponse(ChunkedRestResponseBodyPart continuation) { |
| 284 | + // always fork a fresh task to avoid stack overflow |
| 285 | + assert Transports.assertDefaultThreadContext(threadContext); |
| 286 | + channel.eventLoop() |
| 287 | + .execute( |
| 288 | + () -> channel.writeAndFlush( |
| 289 | + new Netty4ChunkedHttpContinuation(writeSequence, continuation, finishingWrite.combiner()), |
| 290 | + finishingWrite.onDone() // pass the terminal listener/promise along the line |
| 291 | + ) |
| 292 | + ); |
| 293 | + checkShutdown(); |
| 294 | + } |
| 295 | + |
| 296 | + @Override |
| 297 | + public void onFailure(Exception e) { |
| 298 | + assert Transports.assertDefaultThreadContext(threadContext); |
| 299 | + logger.error( |
| 300 | + Strings.format("failed to get continuation of HTTP response body for [%s], closing connection", channel), |
| 301 | + e |
| 302 | + ); |
| 303 | + channel.close().addListener(ignored -> { |
| 304 | + finishingWrite.combiner().add(channel.newFailedFuture(e)); |
| 305 | + finishingWrite.combiner().finish(finishingWrite.onDone()); |
| 306 | + }); |
| 307 | + checkShutdown(); |
| 308 | + } |
| 309 | + |
| 310 | + private void checkShutdown() { |
| 311 | + if (channel.eventLoop().isShuttingDown()) { |
| 312 | + // The event loop is shutting down, and https://github.com/netty/netty/issues/8007 means that we cannot know |
| 313 | + // if the preceding activity made it onto its queue before shutdown or whether it will just vanish without a |
| 314 | + // trace, so to avoid a leak we must double-check that the final listener is completed once the event loop |
| 315 | + // is terminated. Note that the final listener came from Netty4Utils#safeWriteAndFlush so its executor is an |
| 316 | + // ImmediateEventExecutor which means this completion is not subject to the same issue, it still works even |
| 317 | + // if the event loop has already terminated. |
| 318 | + channel.eventLoop() |
| 319 | + .terminationFuture() |
| 320 | + .addListener(ignored -> finishingWrite.onDone().tryFailure(new ClosedChannelException())); |
| 321 | + } |
| 322 | + } |
| 323 | + |
| 324 | + }) |
| 325 | + ), |
| 326 | + finishingWriteBodyPart::getNextPart |
| 327 | + ); |
313 | 328 | } |
314 | 329 | } |
315 | 330 |
|
|
0 commit comments