Skip to content

Commit 0c5e87f

Browse files
committed
[RCI] Check blocks while having index shard permit in TransportReplicationAction (#35332)
Today, the TransportReplicationAction checks the global level blocks and the index level blocks before routing the operation to the primary, in the ReroutePhase, and it happens at the very beginning of the transport replication action execution. For the upcoming rework of the Close Index API and in order to deal with primary relocation, we'll need to also check for blocks before executing the operation on the primary (while holding a permit) but before routing to the new primary. This pull request change the AsyncPrimaryAction so that it checks for replication action's blocks before executing the operation locally or before routing the primary action to the newly primary shard. The check is done while holding a PrimaryShardReference. Related to #33888
1 parent cee022a commit 0c5e87f

2 files changed

Lines changed: 234 additions & 138 deletions

File tree

server/src/main/java/org/elasticsearch/action/support/replication/TransportReplicationAction.java

Lines changed: 75 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -236,9 +236,39 @@ protected TransportRequestOptions transportOptions(Settings settings) {
236236
return TransportRequestOptions.EMPTY;
237237
}
238238

239+
private String concreteIndex(final ClusterState state, final ReplicationRequest request) {
240+
return resolveIndex() ? indexNameExpressionResolver.concreteSingleIndex(state, request).getName() : request.index();
241+
}
242+
243+
private ClusterBlockException blockExceptions(final ClusterState state, final String indexName) {
244+
ClusterBlockLevel globalBlockLevel = globalBlockLevel();
245+
if (globalBlockLevel != null) {
246+
ClusterBlockException blockException = state.blocks().globalBlockedException(globalBlockLevel);
247+
if (blockException != null) {
248+
return blockException;
249+
}
250+
}
251+
ClusterBlockLevel indexBlockLevel = indexBlockLevel();
252+
if (indexBlockLevel != null) {
253+
ClusterBlockException blockException = state.blocks().indexBlockedException(indexBlockLevel, indexName);
254+
if (blockException != null) {
255+
return blockException;
256+
}
257+
}
258+
return null;
259+
}
260+
239261
protected boolean retryPrimaryException(final Throwable e) {
240262
return e.getClass() == ReplicationOperation.RetryOnPrimaryException.class
241-
|| TransportActions.isShardNotAvailableException(e);
263+
|| TransportActions.isShardNotAvailableException(e)
264+
|| isRetryableClusterBlockException(e);
265+
}
266+
267+
boolean isRetryableClusterBlockException(final Throwable e) {
268+
if (e instanceof ClusterBlockException) {
269+
return ((ClusterBlockException) e).retryable();
270+
}
271+
return false;
242272
}
243273

244274
protected class OperationTransportHandler implements TransportRequestHandler<Request> {
@@ -321,6 +351,15 @@ protected void doRun() throws Exception {
321351
@Override
322352
public void onResponse(PrimaryShardReference primaryShardReference) {
323353
try {
354+
final ClusterState clusterState = clusterService.state();
355+
final IndexMetaData indexMetaData = clusterState.metaData().getIndexSafe(primaryShardReference.routingEntry().index());
356+
357+
final ClusterBlockException blockException = blockExceptions(clusterState, indexMetaData.getIndex().getName());
358+
if (blockException != null) {
359+
logger.trace("cluster is blocked, action failed on primary", blockException);
360+
throw blockException;
361+
}
362+
324363
if (primaryShardReference.isRelocated()) {
325364
primaryShardReference.close(); // release shard operation lock as soon as possible
326365
setPhase(replicationTask, "primary_delegation");
@@ -334,7 +373,7 @@ public void onResponse(PrimaryShardReference primaryShardReference) {
334373
response.readFrom(in);
335374
return response;
336375
};
337-
DiscoveryNode relocatingNode = clusterService.state().nodes().get(primary.relocatingNodeId());
376+
DiscoveryNode relocatingNode = clusterState.nodes().get(primary.relocatingNodeId());
338377
transportService.sendRequest(relocatingNode, transportPrimaryAction,
339378
new ConcreteShardRequest<>(request, primary.allocationId().getRelocationId(), primaryTerm),
340379
transportOptions,
@@ -713,35 +752,42 @@ public void onFailure(Exception e) {
713752
protected void doRun() {
714753
setPhase(task, "routing");
715754
final ClusterState state = observer.setAndGetObservedState();
716-
if (handleBlockExceptions(state)) {
717-
return;
718-
}
719-
720-
// request does not have a shardId yet, we need to pass the concrete index to resolve shardId
721-
final String concreteIndex = concreteIndex(state);
722-
final IndexMetaData indexMetaData = state.metaData().index(concreteIndex);
723-
if (indexMetaData == null) {
724-
retry(new IndexNotFoundException(concreteIndex));
725-
return;
726-
}
727-
if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
728-
throw new IndexClosedException(indexMetaData.getIndex());
729-
}
755+
final String concreteIndex = concreteIndex(state, request);
756+
final ClusterBlockException blockException = blockExceptions(state, concreteIndex);
757+
if (blockException != null) {
758+
if (blockException.retryable()) {
759+
logger.trace("cluster is blocked, scheduling a retry", blockException);
760+
retry(blockException);
761+
} else {
762+
finishAsFailed(blockException);
763+
}
764+
} else {
765+
// request does not have a shardId yet, we need to pass the concrete index to resolve shardId
766+
final IndexMetaData indexMetaData = state.metaData().index(concreteIndex);
767+
if (indexMetaData == null) {
768+
retry(new IndexNotFoundException(concreteIndex));
769+
return;
770+
}
771+
if (indexMetaData.getState() == IndexMetaData.State.CLOSE) {
772+
throw new IndexClosedException(indexMetaData.getIndex());
773+
}
730774

731-
// resolve all derived request fields, so we can route and apply it
732-
resolveRequest(indexMetaData, request);
733-
assert request.shardId() != null : "request shardId must be set in resolveRequest";
734-
assert request.waitForActiveShards() != ActiveShardCount.DEFAULT : "request waitForActiveShards must be set in resolveRequest";
775+
// resolve all derived request fields, so we can route and apply it
776+
resolveRequest(indexMetaData, request);
777+
assert request.shardId() != null : "request shardId must be set in resolveRequest";
778+
assert request.waitForActiveShards() != ActiveShardCount.DEFAULT :
779+
"request waitForActiveShards must be set in resolveRequest";
735780

736-
final ShardRouting primary = primary(state);
737-
if (retryIfUnavailable(state, primary)) {
738-
return;
739-
}
740-
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
741-
if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
742-
performLocalAction(state, primary, node, indexMetaData);
743-
} else {
744-
performRemoteAction(state, primary, node);
781+
final ShardRouting primary = primary(state);
782+
if (retryIfUnavailable(state, primary)) {
783+
return;
784+
}
785+
final DiscoveryNode node = state.nodes().get(primary.currentNodeId());
786+
if (primary.currentNodeId().equals(state.nodes().getLocalNodeId())) {
787+
performLocalAction(state, primary, node, indexMetaData);
788+
} else {
789+
performRemoteAction(state, primary, node);
790+
}
745791
}
746792
}
747793

@@ -793,44 +839,11 @@ private boolean retryIfUnavailable(ClusterState state, ShardRouting primary) {
793839
return false;
794840
}
795841

796-
private String concreteIndex(ClusterState state) {
797-
return resolveIndex() ? indexNameExpressionResolver.concreteSingleIndex(state, request).getName() : request.index();
798-
}
799-
800842
private ShardRouting primary(ClusterState state) {
801843
IndexShardRoutingTable indexShard = state.getRoutingTable().shardRoutingTable(request.shardId());
802844
return indexShard.primaryShard();
803845
}
804846

805-
private boolean handleBlockExceptions(ClusterState state) {
806-
ClusterBlockLevel globalBlockLevel = globalBlockLevel();
807-
if (globalBlockLevel != null) {
808-
ClusterBlockException blockException = state.blocks().globalBlockedException(globalBlockLevel);
809-
if (blockException != null) {
810-
handleBlockException(blockException);
811-
return true;
812-
}
813-
}
814-
ClusterBlockLevel indexBlockLevel = indexBlockLevel();
815-
if (indexBlockLevel != null) {
816-
ClusterBlockException blockException = state.blocks().indexBlockedException(indexBlockLevel, concreteIndex(state));
817-
if (blockException != null) {
818-
handleBlockException(blockException);
819-
return true;
820-
}
821-
}
822-
return false;
823-
}
824-
825-
private void handleBlockException(ClusterBlockException blockException) {
826-
if (blockException.retryable()) {
827-
logger.trace("cluster is blocked, scheduling a retry", blockException);
828-
retry(blockException);
829-
} else {
830-
finishAsFailed(blockException);
831-
}
832-
}
833-
834847
private void performAction(final DiscoveryNode node, final String action, final boolean isPrimaryAction,
835848
final TransportRequest requestToPerform) {
836849
transportService.sendRequest(node, action, requestToPerform, transportOptions, new TransportResponseHandler<Response>() {

0 commit comments

Comments
 (0)