Skip to content

Make TransportNodesAction finishHim Execute on Configured Executor#62753

Merged
original-brownbear merged 5 commits intoelastic:masterfrom
original-brownbear:fix-mapping-monitoring-serialization
Sep 28, 2020
Merged

Make TransportNodesAction finishHim Execute on Configured Executor#62753
original-brownbear merged 5 commits intoelastic:masterfrom
original-brownbear:fix-mapping-monitoring-serialization

Conversation

@original-brownbear
Copy link
Copy Markdown
Contributor

@original-brownbear original-brownbear commented Sep 22, 2020

Currently, finishHim can either execute on the specified executor
(in the less likely case that the local node request is the last to arrive)
or on a transport thread.
In case of e.g. org.elasticsearch.action.admin.cluster.stats.TransportClusterStatsAction
this leads to an expensive execution that deserializes all mapping metadata in the cluster
running on the transport thread and destabilizing the cluster. In case of this transport
action it was specifically moved to the MANAGEMENT thread to avoid the high cost of processing
the stats requests on the nodes during fan-out but that did not cover the final execution
on the node that received the initial request. This PR adds to ability to optionally specify the executor for the final step of the
nodes request execution and uses that to work around the issue for the slow TransportClusterStatsAction.

Note: the specific problem that motivated this PR is essentially the same as #57937 where we moved the execution off the transport and on the management thread as a fix as well.

Currently, `finishHim` can either execute on the specified executor
(in the less likely case that the local node request is the last to arrive)
or on a transport thread.
In case of e.g. `org.elasticsearch.action.admin.cluster.stats.TransportClusterStatsAction`
this leads to an expensive execution that deserializes all mapping metadata in the cluster
running on the transport thread and destabilizing the cluster. In case of this transport
action it was specifically moved to the `MANAGEMENT` thread to avoid the high cost of processing
the stats requests on the nodes during fan-out but that did not cover the final execution
on the node that received the initial request.
@original-brownbear original-brownbear added >non-issue :Distributed/Network Http and internode communication implementations v8.0.0 v7.10.0 labels Sep 22, 2020
@elasticmachine
Copy link
Copy Markdown
Collaborator

Pinging @elastic/es-distributed (:Distributed/Network)

@elasticmachine elasticmachine added the Team:Distributed Meta label for distributed team. label Sep 22, 2020
@original-brownbear
Copy link
Copy Markdown
Contributor Author

original-brownbear commented Sep 24, 2020

As discussed during distrib sync: I updated the PR to only fork off to another thread for the specific transport action that was causing trouble here, should be good for review now :)

Copy link
Copy Markdown
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

Can we also add an assert to TransportClusterStatsAction.newResponse that it does not run on a transport thread (or always runs on a management thread)?

return;
}
listener.onResponse(finalResponse);
threadPool.executor(finalExecutor).execute(ActionRunnable.supply(listener, () -> newResponse(request, responses)));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: this reintroduces double firing the listener if listener.onResponse fails (will also invoke onFailure now). I am OK with it, since it is a separate effort to come up with a good solution for the pattern you use here - unless you can see an easy way around it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I knew you were gonna point that out (really :)). The problem here is, that we really need this safe-guard here I think. Otherwise an exception in listener.onResponse (highly likely a bug in the transport code but still) would lead to the request never getting a response back leaking memory on the caller. We have the same issue when we run this on the same thread actually (the transport handler upstream brings the exception back around to the listener in that case).
-> no easy fix here for now I'm afraid. But I do think we should look into adding more assertions for this kind of thing to ActionRunnable like you did to ActionListener.map and clean these things up more for sure.

@original-brownbear
Copy link
Copy Markdown
Contributor Author

Can we also add an assert to TransportClusterStatsAction.newResponse that it does not run on a transport thread (or always runs on a management thread)?

Done in 474d4cb , went with the not-transport-thread assertion since we use that in multiple other places. Also, it's a little more flexible if we want to start using the transport action in DeterministicTaskQueue tests (I have some background experiments with that ongoing :)).

@original-brownbear
Copy link
Copy Markdown
Contributor Author

Thanks Henning!

@original-brownbear original-brownbear merged commit 82aa1d3 into elastic:master Sep 28, 2020
@original-brownbear original-brownbear deleted the fix-mapping-monitoring-serialization branch September 28, 2020 12:57
original-brownbear added a commit that referenced this pull request Sep 28, 2020
…62753) (#62955)

Currently, `finishHim` can either execute on the specified executor
(in the less likely case that the local node request is the last to arrive)
or on a transport thread.
In case of e.g. `org.elasticsearch.action.admin.cluster.stats.TransportClusterStatsAction`
this leads to an expensive execution that deserializes all mapping metadata in the cluster
running on the transport thread and destabilizing the cluster. In case of this transport
action it was specifically moved to the `MANAGEMENT` thread to avoid the high cost of processing
the stats requests on the nodes during fan-out but that did not cover the final execution
on the node that received the initial request. This PR  adds to ability to optionally specify the executor for the final step of the
nodes request execution and uses that to work around the issue for the slow `TransportClusterStatsAction`.

Note: the specific problem that motivated this PR is essentially the same as #57937 where we moved the execution off the transport and on the management thread as a fix as well.
original-brownbear added a commit that referenced this pull request Oct 27, 2020
Follow up to #62753, which moved this expensive method off of the transport threads.
Often times monitoring will hit this endpoint every few seconds while the metadata
will likely change at a much slower interval.
Given that in practice the computation of the stats might take up to a minute for large
cluster states, caching them on a best-effort basis seems like a reasonable improvement.
original-brownbear added a commit that referenced this pull request Oct 27, 2020
…64195)

Follow up to #62753, which moved this expensive method off of the transport threads.
Often times monitoring will hit this endpoint every few seconds while the metadata
will likely change at a much slower interval.
Given that in practice the computation of the stats might take up to a minute for large
cluster states, caching them on a best-effort basis seems like a reasonable improvement.
@original-brownbear original-brownbear restored the fix-mapping-monitoring-serialization branch December 6, 2020 19:02
@kkewwei
Copy link
Copy Markdown
Contributor

kkewwei commented Oct 21, 2021

In our product, occasionally /_nodes/stats will last 30s+, we find there are too many tasks pending in MANAGEMENT threadpool , which slows down Cluster Stats API(use MANAGEMENT threadpool). The usage of the threadpool is as follow:

 "management" : {
          "threads" : 5,
          "queue" : 793,
          "active" : 5,
          "rejected" : 0,
          "largest" : 5,
          "completed" : 888436
},

Why is the threadpool so busy? the reason is that the 5 active threads are used to write disk:

"elasticsearch[node1][management][T#5]" #176 daemon prio=5 os_prio=0 tid=0x00007f7854003800 nid=0x7e2 runnable [0x00007f79ab8f8000]
   java.lang.Thread.State: RUNNABLE
        at sun.nio.ch.FileDispatcherImpl.force0(Native Method)
        at sun.nio.ch.FileDispatcherImpl.force(FileDispatcherImpl.java:76)
        at sun.nio.ch.FileChannelImpl.force(FileChannelImpl.java:388)
        at org.apache.lucene.util.IOUtils.fsync(IOUtils.java:471)
        at org.apache.lucene.store.FSDirectory.syncMetaData(FSDirectory.java:309)
        at org.elasticsearch.gateway.MetadataStateFormat.performStateDirectoriesFsync(MetadataStateFormat.java:172)
        at org.elasticsearch.gateway.MetadataStateFormat.write(MetadataStateFormat.java:246)
        at org.elasticsearch.gateway.MetadataStateFormat.writeAndCleanup(MetadataStateFormat.java:185)
        at org.elasticsearch.index.seqno.ReplicationTracker.persistRetentionLeases(ReplicationTracker.java:494)
        - locked <0x00007f7c4981d978> (a java.lang.Object)
        at org.elasticsearch.index.shard.IndexShard.persistRetentionLeases(IndexShard.java:2256)
        at org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction.lambda$shardOperationOnPrimary$0(RetentionLeaseBackgroundSyncAction.java:161)
        at org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction$$Lambda$5355/1679727689.get(Unknown Source)
        at org.elasticsearch.action.ActionListener.completeWith(ActionListener.java:325)
        at org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction.shardOperationOnPrimary(RetentionLeaseBackgroundSyncAction.java:157)
        at org.elasticsearch.index.seqno.RetentionLeaseBackgroundSyncAction.shardOperationOnPrimary(RetentionLeaseBackgroundSyncAction.java:64)
        at org.elasticsearch.action.support.replication.TransportReplicationAction$PrimaryShardReference.perform(TransportReplicationAction.java:968)
        at org.elasticsearch.action.support.replication.ReplicationOperation.execute(ReplicationOperation.java:122)
        at org.elasticsearch.action.support.replication.TransportReplicationAction$AsyncPrimaryAction.runWithPrimaryShardReference(TransportReplicationAction.java:429)

The MANAGEMENT threadpool will be blocked, as long as one disk is busy, which is common in produce.

The api is relatively important:

  1. InternalClusterInfoService in master use to sniffer nodes stats.
  2. we use the api to monitor the cluster.

If we should create a new threadpool to do with those tasks related to metadata.

@henningandersen
Copy link
Copy Markdown
Contributor

@kkewwei I agree that this looks problematic. A few initial questions on your setup:

  1. How many shards do you have per node and how many of those are active receiving indexing.
  2. Is this against fast disks (SSD's) or slow ones?
  3. Are you using CCR to replicate data from this cluster?

I am hesitant for retention lease sync to warrant a new pool, but it may be more suitable on some other thread pool or we can look into optimizing the amount of fsync'ing needed here.

@kkewwei
Copy link
Copy Markdown
Contributor

kkewwei commented Oct 27, 2021

  1. In the cluster, there are 75 data nodes, 3596 indices, 45,383 shard, about 10% shards are active receiving indexing.
  2. Slow ones, there are 9 disk in one data node.
  3. No, but write thread is busy, occasionally some disks have high IO utilization.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Distributed/Network Http and internode communication implementations >non-issue Team:Distributed Meta label for distributed team. v7.10.0 v8.0.0-alpha1

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants