Skip to content

ESQL: Promptly clean up CCS exchange sinks#143325

Merged
ebarlas merged 8 commits intoelastic:mainfrom
ebarlas:cleanup-cross-cluster-esql-exchange-sinks
Mar 3, 2026
Merged

ESQL: Promptly clean up CCS exchange sinks#143325
ebarlas merged 8 commits intoelastic:mainfrom
ebarlas:cleanup-cross-cluster-esql-exchange-sinks

Conversation

@ebarlas
Copy link
Copy Markdown
Contributor

@ebarlas ebarlas commented Feb 28, 2026

Problem

The cross-cluster exchange sink created by ClusterComputeHandler for transferring reduced results from a remote cluster back to the origin coordinator was not explicitly finished on normal completion. It relied solely on task cancellation or the InactiveSinksReaper for cleanup, unlike data node sinks which finish immediately via completion listeners.

Fix

Add a finishSinkHandler call to the exchange sink's completion listener in runComputeOnRemoteCluster, matching the pattern already used by DataNodeComputeHandler. Add an @After assertion to CrossClusterQueryIT to verify that all exchange sinks across all clusters are released within 5 seconds of each test.

This test pattern is already used in AbstractEsqlIntegTestCase.

The cross-cluster exchange sink created by ClusterComputeHandler for
transferring reduced results from a remote cluster back to the origin
coordinator was not explicitly finished on normal completion. It relied
solely on task cancellation or the InactiveSinksReaper
for cleanup, unlike data node sinks which finish immediately
via completion listeners.

Add a finishSinkHandler call to the exchange sink's completion listener
in runComputeOnRemoteCluster, matching the pattern already used by
DataNodeComputeHandler. Add an @after assertion to CrossClusterQueryIT
to verify that all exchange sinks across all clusters are released
within 5 seconds of each test.
@ebarlas ebarlas self-assigned this Feb 28, 2026
@ebarlas ebarlas added >bug v9.4.0 Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) :Analytics/ES|QL AKA ESQL labels Feb 28, 2026
@elasticsearchmachine
Copy link
Copy Markdown
Collaborator

Hi @ebarlas, I've created a changelog YAML for you.

@idegtiarenko idegtiarenko requested a review from dnhatn March 2, 2026 13:10
exchangeSink.addCompletionListener(computeListener.acquireAvoid());
exchangeSink.addCompletionListener(
ActionListener.runBefore(computeListener.acquireAvoid(), () -> exchangeService.finishSinkHandler(globalSessionId, null))
);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This makes sense, but I would like @dnhatn to confirm

Copy link
Copy Markdown
Member

@dnhatn dnhatn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've left a comment. Thanks for fixing this @ebarlas

);
try (Releasable ignored = exchangeSource.addEmptySink()) {
exchangeSink.addCompletionListener(computeListener.acquireAvoid());
exchangeSink.addCompletionListener(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we revert this change and add this in 264 instead?

exchangeSink.addCompletionListener(ActionListener.running( () -> exchangeService.finishSinkHandler(globalSessionId, null));

@dnhatn dnhatn self-requested a review March 2, 2026 20:23
@ebarlas ebarlas added auto-backport Automatically create backport pull requests when merged v9.3.2 v8.19.13 v9.2.7 labels Mar 2, 2026
@ebarlas ebarlas marked this pull request as ready for review March 2, 2026 22:06
@elasticsearchmachine
Copy link
Copy Markdown
Collaborator

Pinging @elastic/es-analytical-engine (Team:Analytics)

Copy link
Copy Markdown
Member

@dnhatn dnhatn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Thanks @ebarlas!

@ebarlas ebarlas merged commit 895a3b5 into elastic:main Mar 3, 2026
35 checks passed
ebarlas added a commit to ebarlas/elasticsearch that referenced this pull request Mar 3, 2026
The cross-cluster exchange sink in ClusterComputeHandler was
not explicitly finished on normal completion. It relied on
task cancellation or the InactiveSinksReaper for cleanup,
unlike data node sinks which finish immediately via
completion listeners.

Add a finishSinkHandler call to the exchange sink's
completion listener in runComputeOnRemoteCluster, matching
the pattern used by DataNodeComputeHandler. Also add an
@after assertion to CrossClusterQueryIT to verify that all
exchange sinks across clusters are released within 5 seconds
of each test.
ebarlas added a commit to ebarlas/elasticsearch that referenced this pull request Mar 3, 2026
The cross-cluster exchange sink in ClusterComputeHandler was
not explicitly finished on normal completion. It relied on
task cancellation or the InactiveSinksReaper for cleanup,
unlike data node sinks which finish immediately via
completion listeners.

Add a finishSinkHandler call to the exchange sink's
completion listener in runComputeOnRemoteCluster, matching
the pattern used by DataNodeComputeHandler. Also add an
@after assertion to CrossClusterQueryIT to verify that all
exchange sinks across clusters are released within 5 seconds
of each test.
ebarlas added a commit to ebarlas/elasticsearch that referenced this pull request Mar 3, 2026
The cross-cluster exchange sink in ClusterComputeHandler was
not explicitly finished on normal completion. It relied on
task cancellation or the InactiveSinksReaper for cleanup,
unlike data node sinks which finish immediately via
completion listeners.

Add a finishSinkHandler call to the exchange sink's
completion listener in runComputeOnRemoteCluster, matching
the pattern used by DataNodeComputeHandler. Also add an
@after assertion to CrossClusterQueryIT to verify that all
exchange sinks across clusters are released within 5 seconds
of each test.
@elasticsearchmachine
Copy link
Copy Markdown
Collaborator

💚 Backport successful

Status Branch Result
9.3
8.19
9.2

elasticsearchmachine pushed a commit that referenced this pull request Mar 3, 2026
The cross-cluster exchange sink in ClusterComputeHandler was
not explicitly finished on normal completion. It relied on
task cancellation or the InactiveSinksReaper for cleanup,
unlike data node sinks which finish immediately via
completion listeners.

Add a finishSinkHandler call to the exchange sink's
completion listener in runComputeOnRemoteCluster, matching
the pattern used by DataNodeComputeHandler. Also add an
@after assertion to CrossClusterQueryIT to verify that all
exchange sinks across clusters are released within 5 seconds
of each test.
elasticsearchmachine pushed a commit that referenced this pull request Mar 3, 2026
The cross-cluster exchange sink in ClusterComputeHandler was
not explicitly finished on normal completion. It relied on
task cancellation or the InactiveSinksReaper for cleanup,
unlike data node sinks which finish immediately via
completion listeners.

Add a finishSinkHandler call to the exchange sink's
completion listener in runComputeOnRemoteCluster, matching
the pattern used by DataNodeComputeHandler. Also add an
@after assertion to CrossClusterQueryIT to verify that all
exchange sinks across clusters are released within 5 seconds
of each test.
elasticsearchmachine pushed a commit that referenced this pull request Mar 3, 2026
The cross-cluster exchange sink in ClusterComputeHandler was
not explicitly finished on normal completion. It relied on
task cancellation or the InactiveSinksReaper for cleanup,
unlike data node sinks which finish immediately via
completion listeners.

Add a finishSinkHandler call to the exchange sink's
completion listener in runComputeOnRemoteCluster, matching
the pattern used by DataNodeComputeHandler. Also add an
@after assertion to CrossClusterQueryIT to verify that all
exchange sinks across clusters are released within 5 seconds
of each test.
szybia added a commit to szybia/elasticsearch that referenced this pull request Mar 3, 2026
…cations

* upstream/main: (56 commits)
  Mute org.elasticsearch.compute.lucene.read.ValueSourceReaderTypeConversionTests testLoadAll elastic#143471
  [DOCS] Fix ES|QL function and commands lists versioning metadata (elastic#143402)
  Fix MMROperatorTests (elastic#143453)
  Fix CSV-escaped quotes in generated docs examples (elastic#143449)
  Fix SQL client parsing of array header values (elastic#143408)
  ESQL: Add extended distribution tests and fault injection for external sources (elastic#143420)
  ESQL: Fix datasource test failures on Windows and FIPS (elastic#143417)
  Add circuit breaker for query construction to prevent OOM from automaton-based queries (elastic#142150)
  Cleanup SpecIT logging configuration (elastic#143365)
  ESQL: Prune unused regex extract nodes in optimizer (elastic#140982)
  Ensure supported locale outside of Entitlements check (elastic#143405)
  feat(es|ql): add dense_vector support in coalesce (elastic#142974)
  [Test] Unmute SnapshotStressTestsIT (elastic#143359)
  Mute org.elasticsearch.xpack.esql.CsvIT test {csv-spec:lookup-join.LookupJoinWithCoalesceFilterOnRight} elastic#143443
  Mute org.elasticsearch.xpack.esql.CsvIT test {csv-spec:lookup-join.MvJoinKeyOnTheLookupIndex} elastic#143442
  ESQL: Fix CCS exchange sink cleanup (elastic#143325)
  Mute org.elasticsearch.xpack.esql.CsvIT test {csv-spec:lookup-join.MvJoinKeyOnTheLookupIndexAfterStats} elastic#143434
  Mute org.elasticsearch.xpack.esql.CsvIT test {csv-spec:lookup-join.MvJoinKeyFromRow} elastic#143432
  Mute org.elasticsearch.xpack.esql.qa.mixed.MixedClusterEsqlSpecIT test {csv-spec:k8s-timeseries.Datenanos_derivative_compared_to_rate} elastic#143431
  Mute org.elasticsearch.multiproject.test.CoreWithMultipleProjectsClientYamlTestSuiteIT test {yaml=search.retrievers/result-diversification/10_mmr_result_diversification_retriever/Test MMR result diversification single index float type} elastic#143430
  ...
tballison pushed a commit to tballison/elasticsearch that referenced this pull request Mar 3, 2026
The cross-cluster exchange sink in ClusterComputeHandler was
not explicitly finished on normal completion. It relied on
task cancellation or the InactiveSinksReaper for cleanup,
unlike data node sinks which finish immediately via
completion listeners.

Add a finishSinkHandler call to the exchange sink's
completion listener in runComputeOnRemoteCluster, matching
the pattern used by DataNodeComputeHandler. Also add an
@after assertion to CrossClusterQueryIT to verify that all
exchange sinks across clusters are released within 5 seconds
of each test.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Analytics/ES|QL AKA ESQL auto-backport Automatically create backport pull requests when merged >bug Team:Analytics Meta label for analytical engine team (ESQL/Aggs/Geo) v8.19.13 v9.2.7 v9.3.2 v9.4.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants