Skip to content

Too many async fetch shard results caused JVM heap explosion during cluster recovering. #76218

@howardhuanghua

Description

@howardhuanghua

Cluster version: 7.10.1.
Dedicated master nodes: 3.
Dedicated data nodes: 163.
Total primary shards: 80k, no replicas.

After full cluster restart, all the primaries would be fetched from all of the data nodes concurrently:

protected AsyncShardFetch.FetchResult<NodeGatewayStartedShards> fetchData(ShardRouting shard, RoutingAllocation allocation) {
// explicitely type lister, some IDEs (Eclipse) are not able to correctly infer the function type
Lister<BaseNodesResponse<NodeGatewayStartedShards>, NodeGatewayStartedShards> lister = this::listStartedShards;
AsyncShardFetch<NodeGatewayStartedShards> fetch =
asyncFetchStarted.computeIfAbsent(shard.shardId(),
shardId -> new InternalAsyncFetch<>(logger, "shard_started", shardId,
IndexMetadata.INDEX_DATA_PATH_SETTING.get(allocation.metadata().index(shard.index()).getSettings()),
lister));
AsyncShardFetch.FetchResult<NodeGatewayStartedShards> shardState =
fetch.fetchData(allocation.nodes(), allocation.getIgnoreNodes(shard.shardId()));
if (shardState.hasData()) {
shardState.processAllocation(allocation);
}
return shardState;
}

The fetch result mainly contains DiscoveryNode and TransportAddress, around 1.7KB heap memory usage.
So single shard fetch result would cost: 1.7KB * 163 data nodes = 280KB. And 80k shard would cost: 80000 * 280KB = 21GB.
This big heap cost would explode current master node's jvm heap:
image
image

Even we have a reasonable 50k shards in a cluster, it would almost need 15GB heap, that's a huge memory cost.

Several ideas try to solve this issue:

  1. Single shard should only belong to single node. After cluster restart, we don't need to send fetch request to all of the data nodes. But in a fresh started cluster, no routing table info could extract the previous allocated node for a shard. Could we try to save node id info just like inSyncAllocationIds in IndexMetadata ? Then we could send single fetch request to the target node that shard used to be allocated.

  2. We could see that BaseNodeResponse contains a DiscoveryNode and basic abstract class TransportMessage, the TransportMessage has a duplicated entry in DiscoveryNode. In the fetch case, only nodeId is required. The node attributes are not necessary at all. Could we only return nodeId in shard fetch response instead of the heavy structures?

  3. Shard recovery has node level concurrency limitations, could we fetch partial of the shards' store info instead of fetching all the shards together at the beginning?

  4. Send request per node, batch all of the shards info result together.

Metadata

Metadata

Assignees

No one assigned

    Labels

    :Distributed/AllocationAll issues relating to the decision making around placing a shard (both master logic & on the nodes)>bugTeam:DistributedMeta label for distributed team.

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions