Handle InternalSendException inline for non-forking handlers#114375
Conversation
|
Pinging @elastic/es-distributed (Team:Distributed) |
|
Thanks for the really helpful pointer. I'll repurpose this PR to follow your suggestion. Please note that the issue may not be entirely fixed with the new approach. As said in the PR description, there is another failture path in |
…hen handling exception
| public void testInternalSendExceptionWithNonForkingResponseHandlerCanCompleteFutureWaiterFromGenericThread() throws Exception { | ||
| try (var nodeA = new TestNode("node-A")) { | ||
| final var future = new PlainActionFuture<TransportResponse.Empty>(); | ||
| final var latch = new CountDownLatch(1); | ||
| nodeA.transportService.getThreadPool().generic().execute(() -> { | ||
| assertEquals("simulated exception in sendRequest", getSendRequestException(future, IOException.class).getMessage()); | ||
| latch.countDown(); | ||
| }); | ||
| nodeA.transportService.sendRequest( | ||
| nodeA.getThrowingConnection(), | ||
| TestNode.randomActionName(random()), | ||
| new EmptyRequest(), | ||
| TransportRequestOptions.EMPTY, | ||
| new ActionListenerResponseHandler<>(future.delegateResponse((l, e) -> { | ||
| assertThat(Thread.currentThread().getName(), startsWith("TEST-")); | ||
| l.onFailure(e); | ||
| }), unusedReader(), EsExecutors.DIRECT_EXECUTOR_SERVICE) | ||
| ); | ||
| assertBusy(() -> assertTrue(future.isDone())); | ||
| } |
There was a problem hiding this comment.
This is a more direct low-level test for the assertCompleteAllowed issue.
|
Hi @ywangd, I've created a changelog YAML for you. |
|
I updated the PR to avoid forking in |
DaveCTurner
left a comment
There was a problem hiding this comment.
Change LGTM but I left some comments on the tests. FWIW I'd have written this test as follows:
public void testInternalSendExceptionWithNonForkingResponseHandlerCanCompleteFutureWaiterFromGenericThread() {
try (var nodeA = new TestNode("node-A")) {
final var testThread = Thread.currentThread();
assertEquals(
"simulated exception in sendRequest",
safeAwaitAndUnwrapFailure(
IOException.class,
TransportResponse.Empty.class,
l -> nodeA.transportService.sendRequest(
nodeA.getThrowingConnection(),
TestNode.randomActionName(random()),
new EmptyRequest(),
TransportRequestOptions.EMPTY,
new ActionListenerResponseHandler<>(
ActionListener.runBefore(l, () -> assertSame(testThread, Thread.currentThread())),
unusedReader(),
EsExecutors.DIRECT_EXECUTOR_SERVICE
)
)
).getMessage()
);
}
}| final var latch = new CountDownLatch(1); | ||
| nodeA.transportService.getThreadPool().generic().execute(() -> { | ||
| assertEquals("simulated exception in sendRequest", getSendRequestException(future, IOException.class).getMessage()); | ||
| latch.countDown(); | ||
| }); |
There was a problem hiding this comment.
Not sure what this bit of the test is doing - is it needed?
There was a problem hiding this comment.
The original reason that leads to this change is that PlainActionFuture#assertCompleteAllowedfails inPlainActionFuture#onFailurewhen the waiter is with ageneric` thread. So I wanted to have this test to simulate that situation, i.e. waiting for future with a generic thread.
I do recognize that asserting the completing thread is currentThread instead of generic already proves the change is effective. But I added the above intending to preserve some context on how this issue is identified. If you think that's unnecessary, I can definitely rewrite the test as you suggested. Please let me know.
There was a problem hiding this comment.
Sorry I just noticed the latch is indeed useless. Removed in ffaefbd
There was a problem hiding this comment.
(this is an overall response right, not just related to the few lines at the top of this thread?)
Yeah I worry that this is a very indirect way to test that property, and it'd still be a problem if we called sendRequest on a (different) generic thread from the one that's waiting on the future. I'd rather we just focussed on the actual threading behaviour here in the context of the TransportService tests.
There was a problem hiding this comment.
Yeah I worry that this is a very indirect way to test that property
I agree. It's in addition to the same thread assertion.
You are right that assertion can still be tripped if we send the request with the generic thread pool. This is already another problem that can happen in stress test (with a slightly different code path).
I pushed e90005d to rewrite the test as you suggested.
server/src/test/java/org/elasticsearch/transport/TransportServiceLifecycleTests.java
Outdated
Show resolved
Hide resolved
server/src/test/java/org/elasticsearch/transport/TransportServiceLifecycleTests.java
Outdated
Show resolved
Hide resolved
server/src/test/java/org/elasticsearch/transport/TransportServiceLifecycleTests.java
Outdated
Show resolved
Hide resolved
server/src/test/java/org/elasticsearch/transport/TransportServiceLifecycleTests.java
Outdated
Show resolved
Hide resolved
|
@elasticmachine update branch |
|
FYI, I plan to backport this PR since it's labelled as |
💚 Backport successful
|
…#114375) When TransportService fails to send a transport action, it can complete the listener's `onFailure` with the `generic` executor. If the listener is a `PlainActionFuture` and also waits to be completed with a `generic` thread, it will trip the `assertCompleteAllowed` assertion. https://github.com/elastic/elasticsearch/blob/fb482f863d5430702b19bd3dd23e9d8652f12ddd/server/src/main/java/org/elasticsearch/transport/TransportService.java#L1062-L1064 With this PR, we no longer fork to the generic thread pool and instead just handle the exeption inline with the current thread. The expectation is that the downstream handler should take care potential stack overflow issues. This is similar to what is done in elastic#109236
#114493) When TransportService fails to send a transport action, it can complete the listener's `onFailure` with the `generic` executor. If the listener is a `PlainActionFuture` and also waits to be completed with a `generic` thread, it will trip the `assertCompleteAllowed` assertion. https://github.com/elastic/elasticsearch/blob/fb482f863d5430702b19bd3dd23e9d8652f12ddd/server/src/main/java/org/elasticsearch/transport/TransportService.java#L1062-L1064 With this PR, we no longer fork to the generic thread pool and instead just handle the exeption inline with the current thread. The expectation is that the downstream handler should take care potential stack overflow issues. This is similar to what is done in #109236
…#114375) When TransportService fails to send a transport action, it can complete the listener's `onFailure` with the `generic` executor. If the listener is a `PlainActionFuture` and also waits to be completed with a `generic` thread, it will trip the `assertCompleteAllowed` assertion. https://github.com/elastic/elasticsearch/blob/fb482f863d5430702b19bd3dd23e9d8652f12ddd/server/src/main/java/org/elasticsearch/transport/TransportService.java#L1062-L1064 With this PR, we no longer fork to the generic thread pool and instead just handle the exeption inline with the current thread. The expectation is that the downstream handler should take care potential stack overflow issues. This is similar to what is done in elastic#109236
…#114375) When TransportService fails to send a transport action, it can complete the listener's `onFailure` with the `generic` executor. If the listener is a `PlainActionFuture` and also waits to be completed with a `generic` thread, it will trip the `assertCompleteAllowed` assertion. https://github.com/elastic/elasticsearch/blob/fb482f863d5430702b19bd3dd23e9d8652f12ddd/server/src/main/java/org/elasticsearch/transport/TransportService.java#L1062-L1064 With this PR, we no longer fork to the generic thread pool and instead just handle the exeption inline with the current thread. The expectation is that the downstream handler should take care potential stack overflow issues. This is similar to what is done in elastic#109236
…tic#2966) We do not pass down the executor for the GetVBCCChunk action so that we can retain the bytes on the transport thread. The response is then fulfilled with the dedicated fill_vbcc executor. There is no such concern on the failure path since no bytes are available. With this PR, we fork to the same thread pool before completing the listener exceptionally. This helps avoid occassional PlainActionFture#assertCompleteAllowed failures. Relates: elastic#114375 Relates: elastic#2933
There are a few edge cases where closing a node can causes test failures: Closing the handling node when looking up master node name. Closing the coordinating node when a search is ongoing. This can lead to leaking search context in MockSearchService on the data nodes. Closing the data node when a search is ongoing. This can lead to leaking resource on the coordinating node. This PR fixes 1 by avoiding lookup since the master node does not change and is already known. It fixes 2 by always uses master node as the coordinating node. It fixes 3 by avoid restarting search node. With these changes in place (along with elastic#2790, elastic#2966, elastic#2983, elastic#112748, elastic#114375) the test is stable enough (running in a loop for 40+ hours without failure) to be unmuted. Resolves: elastic#2327 Resolves:
…tic#2966) We do not pass down the executor for the GetVBCCChunk action so that we can retain the bytes on the transport thread. The response is then fulfilled with the dedicated fill_vbcc executor. There is no such concern on the failure path since no bytes are available. With this PR, we fork to the same thread pool before completing the listener exceptionally. This helps avoid occassional PlainActionFture#assertCompleteAllowed failures. Relates: elastic#114375 Relates: elastic#2933
There are a few edge cases where closing a node can causes test failures: Closing the handling node when looking up master node name. Closing the coordinating node when a search is ongoing. This can lead to leaking search context in MockSearchService on the data nodes. Closing the data node when a search is ongoing. This can lead to leaking resource on the coordinating node. This PR fixes 1 by avoiding lookup since the master node does not change and is already known. It fixes 2 by always uses master node as the coordinating node. It fixes 3 by avoid restarting search node. With these changes in place (along with elastic#2790, elastic#2966, elastic#2983, elastic#112748, elastic#114375) the test is stable enough (running in a loop for 40+ hours without failure) to be unmuted. Resolves: elastic#2327 Resolves:
When TransportService fails to send a transport action, it can complete the listener's
onFailurewith thegenericexecutor. If the listener is aPlainActionFutureand also waits to be completed with agenericthread, it will trip theassertCompleteAllowedassertion.elasticsearch/server/src/main/java/org/elasticsearch/transport/TransportService.java
Lines 1062 to 1064 in fb482f8
With this PR, we no longer fork to the generic thread pool and instead just handle the exeption inline with the current thread. The expectation is that the downstream handler should take care potential stack overflow issues. This is similar to what is done in #109236