Batch async fetch shards data to reduce memory consumption.#81081
Batch async fetch shards data to reduce memory consumption.#81081howardhuanghua wants to merge 25 commits intoelastic:mainfrom
Conversation
DaveCTurner
left a comment
There was a problem hiding this comment.
This seems like the right sort of idea. I left some comments inline. I think we should also do this for replica allocations too.
| */ | ||
| public interface Lister<NodesResponse extends BaseNodesResponse<NodeResponse>, NodeResponse extends BaseNodeResponse> { | ||
| void list(ShardId shardId, @Nullable String customDataPath, DiscoveryNode[] nodes, ActionListener<NodesResponse> listener); | ||
| void flush(); |
There was a problem hiding this comment.
Rather than introducing this method to the lister (and the corresponding flag passed in to fetchData) could we have the allocator directly indicate the end of an allocation round which triggers the flush.
| for (ShardId shardId : requestMap.keySet()) { | ||
| ShardRequestInfo shardRequest = requestMap.get(shardId); | ||
| shards.put(shardRequest.shardId(), shardRequest.getCustomDataPath()); | ||
| if (node.getVersion().before(Version.V_7_16_0)) { |
There was a problem hiding this comment.
The version in master is now 8.1.0; it's unlikely we'll backport this to an earlier version.
| }; | ||
|
|
||
| client.executeLocally( | ||
| TransportNodesListGatewayStartedShards.TYPE, |
There was a problem hiding this comment.
I'm undecided about re-using the same action type for both kinds of request here. I think it'd be cleaner to introduce a new one (and to name it something better than internal:gateway/local/started_shards) given how big a difference in behaviour we are making.
There was a problem hiding this comment.
Hi @DaveCTurner , if we introduce a new action, then we need to refactor some logics in GatewayAllocator, like the follow structures, it seems that would a big change for the high level allocators. How do you think so?
There was a problem hiding this comment.
I'm not sure this is true, I think we could keep pretty much the same interface from the point of view of GatewayAllocator. It should be possible to implement a batching Lister which reworks the batched responses into a BaseNodesResponse<NodeGatewayStartedShards>.
| protected NodeGatewayStartedShards nodeOperation(NodeRequest request, Task task) { | ||
| protected NodeGroupedGatewayStartedShards nodeOperation(NodeRequest request, Task task) { | ||
| NodeGroupedGatewayStartedShards groupedStartedShards = new NodeGroupedGatewayStartedShards(clusterService.localNode()); | ||
| for (Map.Entry<ShardId, String> entry : request.getShards().entrySet()) { |
There was a problem hiding this comment.
When sending these requests per-shard we execute them in parallel across the FETCH_SHARD_STARTED threadpool. I think we should continue to parallelise them at the shard level like that.
|
Thanks for the suggestion. I am going to complete the optimization. |
|
Pinging @elastic/es-search (Team:Search) |
|
Pinging @elastic/es-distributed (Team:Distributed) |
|
Pinging @elastic/es-distributed-obsolete (Team:Distributed (Obsolete)) |
|
Pinging @elastic/es-distributed-coordination (Team:Distributed Coordination) |
This commit is going to fix #80694.
node-to-shardand call the cached listeners after receiving node level fetch reqeusts.Async shard fetch requests before/after optimization:
