ESQL: Promptly clean up CCS exchange sinks#143325
Merged
ebarlas merged 8 commits intoelastic:mainfrom Mar 3, 2026
Merged
Conversation
The cross-cluster exchange sink created by ClusterComputeHandler for transferring reduced results from a remote cluster back to the origin coordinator was not explicitly finished on normal completion. It relied solely on task cancellation or the InactiveSinksReaper for cleanup, unlike data node sinks which finish immediately via completion listeners. Add a finishSinkHandler call to the exchange sink's completion listener in runComputeOnRemoteCluster, matching the pattern already used by DataNodeComputeHandler. Add an @after assertion to CrossClusterQueryIT to verify that all exchange sinks across all clusters are released within 5 seconds of each test.
Collaborator
|
Hi @ebarlas, I've created a changelog YAML for you. |
idegtiarenko
reviewed
Mar 2, 2026
| exchangeSink.addCompletionListener(computeListener.acquireAvoid()); | ||
| exchangeSink.addCompletionListener( | ||
| ActionListener.runBefore(computeListener.acquireAvoid(), () -> exchangeService.finishSinkHandler(globalSessionId, null)) | ||
| ); |
Contributor
There was a problem hiding this comment.
This makes sense, but I would like @dnhatn to confirm
dnhatn
reviewed
Mar 2, 2026
| ); | ||
| try (Releasable ignored = exchangeSource.addEmptySink()) { | ||
| exchangeSink.addCompletionListener(computeListener.acquireAvoid()); | ||
| exchangeSink.addCompletionListener( |
Member
There was a problem hiding this comment.
Can we revert this change and add this in 264 instead?
exchangeSink.addCompletionListener(ActionListener.running( () -> exchangeService.finishSinkHandler(globalSessionId, null));
Collaborator
|
Pinging @elastic/es-analytical-engine (Team:Analytics) |
dnhatn
approved these changes
Mar 2, 2026
ebarlas
added a commit
to ebarlas/elasticsearch
that referenced
this pull request
Mar 3, 2026
The cross-cluster exchange sink in ClusterComputeHandler was not explicitly finished on normal completion. It relied on task cancellation or the InactiveSinksReaper for cleanup, unlike data node sinks which finish immediately via completion listeners. Add a finishSinkHandler call to the exchange sink's completion listener in runComputeOnRemoteCluster, matching the pattern used by DataNodeComputeHandler. Also add an @after assertion to CrossClusterQueryIT to verify that all exchange sinks across clusters are released within 5 seconds of each test.
ebarlas
added a commit
to ebarlas/elasticsearch
that referenced
this pull request
Mar 3, 2026
The cross-cluster exchange sink in ClusterComputeHandler was not explicitly finished on normal completion. It relied on task cancellation or the InactiveSinksReaper for cleanup, unlike data node sinks which finish immediately via completion listeners. Add a finishSinkHandler call to the exchange sink's completion listener in runComputeOnRemoteCluster, matching the pattern used by DataNodeComputeHandler. Also add an @after assertion to CrossClusterQueryIT to verify that all exchange sinks across clusters are released within 5 seconds of each test.
ebarlas
added a commit
to ebarlas/elasticsearch
that referenced
this pull request
Mar 3, 2026
The cross-cluster exchange sink in ClusterComputeHandler was not explicitly finished on normal completion. It relied on task cancellation or the InactiveSinksReaper for cleanup, unlike data node sinks which finish immediately via completion listeners. Add a finishSinkHandler call to the exchange sink's completion listener in runComputeOnRemoteCluster, matching the pattern used by DataNodeComputeHandler. Also add an @after assertion to CrossClusterQueryIT to verify that all exchange sinks across clusters are released within 5 seconds of each test.
Collaborator
elasticsearchmachine
pushed a commit
that referenced
this pull request
Mar 3, 2026
The cross-cluster exchange sink in ClusterComputeHandler was not explicitly finished on normal completion. It relied on task cancellation or the InactiveSinksReaper for cleanup, unlike data node sinks which finish immediately via completion listeners. Add a finishSinkHandler call to the exchange sink's completion listener in runComputeOnRemoteCluster, matching the pattern used by DataNodeComputeHandler. Also add an @after assertion to CrossClusterQueryIT to verify that all exchange sinks across clusters are released within 5 seconds of each test.
elasticsearchmachine
pushed a commit
that referenced
this pull request
Mar 3, 2026
The cross-cluster exchange sink in ClusterComputeHandler was not explicitly finished on normal completion. It relied on task cancellation or the InactiveSinksReaper for cleanup, unlike data node sinks which finish immediately via completion listeners. Add a finishSinkHandler call to the exchange sink's completion listener in runComputeOnRemoteCluster, matching the pattern used by DataNodeComputeHandler. Also add an @after assertion to CrossClusterQueryIT to verify that all exchange sinks across clusters are released within 5 seconds of each test.
elasticsearchmachine
pushed a commit
that referenced
this pull request
Mar 3, 2026
The cross-cluster exchange sink in ClusterComputeHandler was not explicitly finished on normal completion. It relied on task cancellation or the InactiveSinksReaper for cleanup, unlike data node sinks which finish immediately via completion listeners. Add a finishSinkHandler call to the exchange sink's completion listener in runComputeOnRemoteCluster, matching the pattern used by DataNodeComputeHandler. Also add an @after assertion to CrossClusterQueryIT to verify that all exchange sinks across clusters are released within 5 seconds of each test.
szybia
added a commit
to szybia/elasticsearch
that referenced
this pull request
Mar 3, 2026
…cations * upstream/main: (56 commits) Mute org.elasticsearch.compute.lucene.read.ValueSourceReaderTypeConversionTests testLoadAll elastic#143471 [DOCS] Fix ES|QL function and commands lists versioning metadata (elastic#143402) Fix MMROperatorTests (elastic#143453) Fix CSV-escaped quotes in generated docs examples (elastic#143449) Fix SQL client parsing of array header values (elastic#143408) ESQL: Add extended distribution tests and fault injection for external sources (elastic#143420) ESQL: Fix datasource test failures on Windows and FIPS (elastic#143417) Add circuit breaker for query construction to prevent OOM from automaton-based queries (elastic#142150) Cleanup SpecIT logging configuration (elastic#143365) ESQL: Prune unused regex extract nodes in optimizer (elastic#140982) Ensure supported locale outside of Entitlements check (elastic#143405) feat(es|ql): add dense_vector support in coalesce (elastic#142974) [Test] Unmute SnapshotStressTestsIT (elastic#143359) Mute org.elasticsearch.xpack.esql.CsvIT test {csv-spec:lookup-join.LookupJoinWithCoalesceFilterOnRight} elastic#143443 Mute org.elasticsearch.xpack.esql.CsvIT test {csv-spec:lookup-join.MvJoinKeyOnTheLookupIndex} elastic#143442 ESQL: Fix CCS exchange sink cleanup (elastic#143325) Mute org.elasticsearch.xpack.esql.CsvIT test {csv-spec:lookup-join.MvJoinKeyOnTheLookupIndexAfterStats} elastic#143434 Mute org.elasticsearch.xpack.esql.CsvIT test {csv-spec:lookup-join.MvJoinKeyFromRow} elastic#143432 Mute org.elasticsearch.xpack.esql.qa.mixed.MixedClusterEsqlSpecIT test {csv-spec:k8s-timeseries.Datenanos_derivative_compared_to_rate} elastic#143431 Mute org.elasticsearch.multiproject.test.CoreWithMultipleProjectsClientYamlTestSuiteIT test {yaml=search.retrievers/result-diversification/10_mmr_result_diversification_retriever/Test MMR result diversification single index float type} elastic#143430 ...
tballison
pushed a commit
to tballison/elasticsearch
that referenced
this pull request
Mar 3, 2026
The cross-cluster exchange sink in ClusterComputeHandler was not explicitly finished on normal completion. It relied on task cancellation or the InactiveSinksReaper for cleanup, unlike data node sinks which finish immediately via completion listeners. Add a finishSinkHandler call to the exchange sink's completion listener in runComputeOnRemoteCluster, matching the pattern used by DataNodeComputeHandler. Also add an @after assertion to CrossClusterQueryIT to verify that all exchange sinks across clusters are released within 5 seconds of each test.
This was referenced Mar 6, 2026
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Problem
The cross-cluster exchange sink created by
ClusterComputeHandlerfor transferring reduced results from a remote cluster back to the origin coordinator was not explicitly finished on normal completion. It relied solely on task cancellation or theInactiveSinksReaperfor cleanup, unlike data node sinks which finish immediately via completion listeners.Fix
Add a
finishSinkHandlercall to the exchange sink's completion listener inrunComputeOnRemoteCluster, matching the pattern already used byDataNodeComputeHandler. Add an@Afterassertion toCrossClusterQueryITto verify that all exchange sinks across all clusters are released within 5 seconds of each test.This test pattern is already used in
AbstractEsqlIntegTestCase.