Cluster version: 7.10.1.
Dedicated master nodes: 3.
Dedicated data nodes: 163.
Total primary shards: 80k, no replicas.
After full cluster restart, all the primaries would be fetched from all of the data nodes concurrently:
|
protected AsyncShardFetch.FetchResult<NodeGatewayStartedShards> fetchData(ShardRouting shard, RoutingAllocation allocation) { |
|
// explicitely type lister, some IDEs (Eclipse) are not able to correctly infer the function type |
|
Lister<BaseNodesResponse<NodeGatewayStartedShards>, NodeGatewayStartedShards> lister = this::listStartedShards; |
|
AsyncShardFetch<NodeGatewayStartedShards> fetch = |
|
asyncFetchStarted.computeIfAbsent(shard.shardId(), |
|
shardId -> new InternalAsyncFetch<>(logger, "shard_started", shardId, |
|
IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(shard.index()).getSettings()), |
|
lister)); |
|
AsyncShardFetch.FetchResult<NodeGatewayStartedShards> shardState = |
|
fetch.fetchData(allocation.nodes(), allocation.getIgnoreNodes(shard.shardId())); |
|
|
|
if (shardState.hasData()) { |
|
shardState.processAllocation(allocation); |
|
} |
|
return shardState; |
|
} |
The fetch result mainly contains DiscoveryNode and TransportAddress, around 1.7KB heap memory usage.
So single shard fetch result would cost: 1.7KB * 163 data nodes = 280KB. And 80k shard would cost: 80000 * 280KB = 21GB.
This big heap cost would explode current master node's jvm heap:


Even we have a reasonable 50k shards in a cluster, it would almost need 15GB heap, that's a huge memory cost.
Several ideas try to solve this issue:
-
Single shard should only belong to single node. After cluster restart, we don't need to send fetch request to all of the data nodes. But in a fresh started cluster, no routing table info could extract the previous allocated node for a shard. Could we try to save node id info just like inSyncAllocationIds in IndexMetadata ? Then we could send single fetch request to the target node that shard used to be allocated.
-
We could see that BaseNodeResponse contains a DiscoveryNode and basic abstract class TransportMessage, the TransportMessage has a duplicated entry in DiscoveryNode. In the fetch case, only nodeId is required. The node attributes are not necessary at all. Could we only return nodeId in shard fetch response instead of the heavy structures?
-
Shard recovery has node level concurrency limitations, could we fetch partial of the shards' store info instead of fetching all the shards together at the beginning?
-
Send request per node, batch all of the shards info result together.
Cluster version: 7.10.1.
Dedicated master nodes: 3.
Dedicated data nodes: 163.
Total primary shards: 80k, no replicas.
After full cluster restart, all the primaries would be fetched from all of the data nodes concurrently:
elasticsearch/server/src/main/java/org/elasticsearch/gateway/GatewayAllocator.java
Lines 228 to 243 in 4f22f43
The fetch result mainly contains


DiscoveryNodeandTransportAddress, around 1.7KB heap memory usage.So single shard fetch result would cost: 1.7KB * 163 data nodes = 280KB. And 80k shard would cost: 80000 * 280KB = 21GB.
This big heap cost would explode current master node's jvm heap:
Even we have a reasonable 50k shards in a cluster, it would almost need 15GB heap, that's a huge memory cost.
Several ideas try to solve this issue:
Single shard should only belong to single node. After cluster restart, we don't need to send fetch request to all of the data nodes. But in a fresh started cluster, no routing table info could extract the previous allocated node for a shard. Could we try to save node id info just like
inSyncAllocationIdsinIndexMetadata? Then we could send single fetch request to the target node that shard used to be allocated.We could see that
BaseNodeResponsecontains aDiscoveryNodeand basic abstract classTransportMessage, theTransportMessagehas a duplicated entry inDiscoveryNode. In the fetch case, onlynodeIdis required. The node attributes are not necessary at all. Could we only returnnodeIdin shard fetch response instead of the heavy structures?Shard recovery has node level concurrency limitations, could we fetch partial of the shards' store info instead of fetching all the shards together at the beginning?
Send request per node, batch all of the shards info result together.