Skip to content

Commit fe9da92

Browse files
committed
Add transport action for primary term validation for remote-backed indices (#5616)
Add transport action for primary term validation for remote-backed indices Signed-off-by: Ashish Singh <ssashish@amazon.com>
1 parent 47d7485 commit fe9da92

22 files changed

Lines changed: 935 additions & 76 deletions

server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java

Lines changed: 235 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import org.apache.logging.log4j.util.MessageSupplier;
3939
import org.opensearch.ExceptionsHelper;
4040
import org.opensearch.action.ActionListener;
41+
import org.opensearch.action.ActionListenerResponseHandler;
4142
import org.opensearch.action.ActionRunnable;
4243
import org.opensearch.action.DocWriteRequest;
4344
import org.opensearch.action.DocWriteResponse;
@@ -46,25 +47,36 @@
4647
import org.opensearch.action.index.IndexRequest;
4748
import org.opensearch.action.index.IndexResponse;
4849
import org.opensearch.action.support.ActionFilters;
50+
import org.opensearch.action.support.ChannelActionListener;
51+
import org.opensearch.action.support.replication.ReplicationMode;
52+
import org.opensearch.action.support.replication.ReplicationOperation;
53+
import org.opensearch.action.support.replication.ReplicationTask;
4954
import org.opensearch.action.support.replication.TransportReplicationAction;
5055
import org.opensearch.action.support.replication.TransportWriteAction;
5156
import org.opensearch.action.update.UpdateHelper;
5257
import org.opensearch.action.update.UpdateRequest;
5358
import org.opensearch.action.update.UpdateResponse;
59+
import org.opensearch.client.transport.NoNodeAvailableException;
5460
import org.opensearch.cluster.ClusterState;
5561
import org.opensearch.cluster.ClusterStateObserver;
5662
import org.opensearch.cluster.action.index.MappingUpdatedAction;
5763
import org.opensearch.cluster.action.shard.ShardStateAction;
5864
import org.opensearch.cluster.metadata.IndexMetadata;
5965
import org.opensearch.cluster.metadata.MappingMetadata;
66+
import org.opensearch.cluster.node.DiscoveryNode;
67+
import org.opensearch.cluster.routing.AllocationId;
68+
import org.opensearch.cluster.routing.ShardRouting;
6069
import org.opensearch.cluster.service.ClusterService;
6170
import org.opensearch.common.bytes.BytesReference;
6271
import org.opensearch.common.collect.Tuple;
6372
import org.opensearch.common.compress.CompressedXContent;
6473
import org.opensearch.common.inject.Inject;
6574
import org.opensearch.common.io.stream.StreamInput;
75+
import org.opensearch.common.io.stream.StreamOutput;
76+
import org.opensearch.common.lease.Releasable;
6677
import org.opensearch.common.settings.Settings;
6778
import org.opensearch.common.unit.TimeValue;
79+
import org.opensearch.common.util.concurrent.AbstractRunnable;
6880
import org.opensearch.common.xcontent.ToXContent;
6981
import org.opensearch.common.xcontent.XContentHelper;
7082
import org.opensearch.common.xcontent.XContentType;
@@ -78,24 +90,29 @@
7890
import org.opensearch.index.seqno.SequenceNumbers;
7991
import org.opensearch.index.shard.IndexShard;
8092
import org.opensearch.index.shard.ShardId;
93+
import org.opensearch.index.shard.ShardNotFoundException;
8194
import org.opensearch.index.translog.Translog;
8295
import org.opensearch.indices.IndicesService;
8396
import org.opensearch.indices.SystemIndices;
8497
import org.opensearch.node.NodeClosedException;
98+
import org.opensearch.tasks.Task;
99+
import org.opensearch.tasks.TaskId;
85100
import org.opensearch.threadpool.ThreadPool;
86101
import org.opensearch.threadpool.ThreadPool.Names;
102+
import org.opensearch.transport.TransportChannel;
103+
import org.opensearch.transport.TransportRequest;
87104
import org.opensearch.transport.TransportRequestOptions;
88105
import org.opensearch.transport.TransportService;
89106

90107
import java.io.IOException;
108+
import java.util.Locale;
91109
import java.util.Map;
110+
import java.util.Objects;
92111
import java.util.concurrent.Executor;
93112
import java.util.function.Consumer;
94113
import java.util.function.Function;
95114
import java.util.function.LongSupplier;
96115

97-
import org.opensearch.action.support.replication.ReplicationMode;
98-
99116
/**
100117
* Performs shard-level bulk (index, delete or update) operations
101118
*
@@ -117,6 +134,15 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
117134
private final UpdateHelper updateHelper;
118135
private final MappingUpdatedAction mappingUpdatedAction;
119136

137+
/**
138+
* This action is used for performing primary term validation. With remote translog enabled, the translogs would
139+
* be durably persisted in remote store. Without remote translog, current transport replication calls does primary
140+
* term validation as well as logically replicate the data. With remote translog, the primary would make calls to
141+
* replicas to perform primary term validation. This make sures an isolated primary fails to ack after primary
142+
* term validation in presence of a new primary.
143+
*/
144+
private final String transportPrimaryTermValidationAction;
145+
120146
@Inject
121147
public TransportShardBulkAction(
122148
Settings settings,
@@ -149,6 +175,212 @@ public TransportShardBulkAction(
149175
);
150176
this.updateHelper = updateHelper;
151177
this.mappingUpdatedAction = mappingUpdatedAction;
178+
179+
this.transportPrimaryTermValidationAction = ACTION_NAME + "[validate_primary_term]";
180+
181+
transportService.registerRequestHandler(
182+
transportPrimaryTermValidationAction,
183+
executor,
184+
true,
185+
true,
186+
PrimaryTermValidationRequest::new,
187+
this::handlePrimaryTermValidationRequest
188+
);
189+
}
190+
191+
protected void handlePrimaryTermValidationRequest(
192+
final PrimaryTermValidationRequest request,
193+
final TransportChannel channel,
194+
final Task task
195+
) {
196+
ActionListener<ReplicaResponse> listener = new ChannelActionListener<>(channel, transportPrimaryTermValidationAction, request);
197+
final ShardId shardId = request.getShardId();
198+
assert shardId != null : "request shardId must be set";
199+
IndexShard replica = getIndexShard(shardId);
200+
try {
201+
new PrimaryTermValidationReplicaAction(listener, replica, (ReplicationTask) task, request).run();
202+
} catch (RuntimeException e) {
203+
listener.onFailure(e);
204+
}
205+
}
206+
207+
/**
208+
* This action is the primary term validation action which is used for doing primary term validation with replicas.
209+
* This is only applicable for TransportShardBulkAction because all writes (delete/update/single write/bulk)
210+
* ultimately boils down to TransportShardBulkAction and isolated primary could continue to acknowledge if it is not
211+
* aware that the primary has changed. This helps achieve the same. More details in java doc of
212+
* {@link TransportShardBulkAction#transportPrimaryTermValidationAction}.
213+
*
214+
* @opensearch.internal
215+
*/
216+
private static final class PrimaryTermValidationReplicaAction extends AbstractRunnable implements ActionListener<Releasable> {
217+
218+
private final ActionListener<ReplicaResponse> onCompletionListener;
219+
private final IndexShard replica;
220+
private final ReplicationTask task;
221+
private final PrimaryTermValidationRequest request;
222+
223+
public PrimaryTermValidationReplicaAction(
224+
ActionListener<ReplicaResponse> onCompletionListener,
225+
IndexShard replica,
226+
ReplicationTask task,
227+
PrimaryTermValidationRequest request
228+
) {
229+
this.onCompletionListener = onCompletionListener;
230+
this.replica = replica;
231+
this.task = task;
232+
this.request = request;
233+
}
234+
235+
@Override
236+
public void onResponse(Releasable releasable) {
237+
setPhase(task, "finished");
238+
onCompletionListener.onResponse(new ReplicaResponse(SequenceNumbers.NO_OPS_PERFORMED, SequenceNumbers.NO_OPS_PERFORMED));
239+
}
240+
241+
@Override
242+
public void onFailure(Exception e) {
243+
setPhase(task, "failed");
244+
onCompletionListener.onFailure(e);
245+
}
246+
247+
@Override
248+
protected void doRun() throws Exception {
249+
setPhase(task, "primary-term-validation");
250+
final String actualAllocationId = this.replica.routingEntry().allocationId().getId();
251+
if (actualAllocationId.equals(request.getTargetAllocationID()) == false) {
252+
throw new ShardNotFoundException(
253+
this.replica.shardId(),
254+
"expected allocation id [{}] but found [{}]",
255+
request.getTargetAllocationID(),
256+
actualAllocationId
257+
);
258+
}
259+
// Check operation primary term against the incoming primary term
260+
// If the request primary term is low, then trigger lister failure
261+
if (request.getPrimaryTerm() < replica.getOperationPrimaryTerm()) {
262+
final String message = String.format(
263+
Locale.ROOT,
264+
"%s operation primary term [%d] is too old (current [%d])",
265+
request.getShardId(),
266+
request.getPrimaryTerm(),
267+
replica.getOperationPrimaryTerm()
268+
);
269+
onFailure(new IllegalStateException(message));
270+
} else {
271+
onResponse(null);
272+
}
273+
}
274+
}
275+
276+
/**
277+
* Primary term validation request sent to a specific allocation id
278+
*
279+
* @opensearch.internal
280+
*/
281+
protected static final class PrimaryTermValidationRequest extends TransportRequest {
282+
283+
/**
284+
* {@link AllocationId#getId()} of the shard this request is sent to
285+
**/
286+
private final String targetAllocationID;
287+
private final long primaryTerm;
288+
private final ShardId shardId;
289+
290+
public PrimaryTermValidationRequest(String targetAllocationID, long primaryTerm, ShardId shardId) {
291+
this.targetAllocationID = Objects.requireNonNull(targetAllocationID);
292+
this.primaryTerm = primaryTerm;
293+
this.shardId = Objects.requireNonNull(shardId);
294+
}
295+
296+
public PrimaryTermValidationRequest(StreamInput in) throws IOException {
297+
super(in);
298+
targetAllocationID = in.readString();
299+
primaryTerm = in.readVLong();
300+
shardId = new ShardId(in);
301+
}
302+
303+
@Override
304+
public void writeTo(StreamOutput out) throws IOException {
305+
super.writeTo(out);
306+
out.writeString(targetAllocationID);
307+
out.writeVLong(primaryTerm);
308+
shardId.writeTo(out);
309+
}
310+
311+
@Override
312+
public Task createTask(long id, String type, String action, TaskId parentTaskId, Map<String, String> headers) {
313+
return new ReplicationTask(id, type, action, getDescription(), parentTaskId, headers);
314+
}
315+
316+
public String getTargetAllocationID() {
317+
return targetAllocationID;
318+
}
319+
320+
public long getPrimaryTerm() {
321+
return primaryTerm;
322+
}
323+
324+
public ShardId getShardId() {
325+
return shardId;
326+
}
327+
328+
@Override
329+
public String getDescription() {
330+
return toString();
331+
}
332+
333+
@Override
334+
public String toString() {
335+
return "PrimaryTermValidationRequest ["
336+
+ shardId
337+
+ "] for targetAllocationID ["
338+
+ targetAllocationID
339+
+ "] with primaryTerm ["
340+
+ primaryTerm
341+
+ "]";
342+
}
343+
}
344+
345+
@Override
346+
protected ReplicationOperation.Replicas<BulkShardRequest> primaryTermValidationReplicasProxy() {
347+
return new PrimaryTermValidationProxy();
348+
}
349+
350+
/**
351+
* This {@link org.opensearch.action.support.replication.TransportReplicationAction.ReplicasProxy} implementation is
352+
* used for primary term validation and is only relevant for TransportShardBulkAction replication action.
353+
*
354+
* @opensearch.internal
355+
*/
356+
private final class PrimaryTermValidationProxy extends WriteActionReplicasProxy {
357+
358+
@Override
359+
public void performOn(
360+
ShardRouting replica,
361+
BulkShardRequest request,
362+
long primaryTerm,
363+
long globalCheckpoint,
364+
long maxSeqNoOfUpdatesOrDeletes,
365+
ActionListener<ReplicationOperation.ReplicaResponse> listener
366+
) {
367+
String nodeId = replica.currentNodeId();
368+
final DiscoveryNode node = clusterService.state().nodes().get(nodeId);
369+
if (node == null) {
370+
listener.onFailure(new NoNodeAvailableException("unknown node [" + nodeId + "]"));
371+
return;
372+
}
373+
final PrimaryTermValidationRequest validationRequest = new PrimaryTermValidationRequest(
374+
replica.allocationId().getId(),
375+
primaryTerm,
376+
replica.shardId()
377+
);
378+
final ActionListenerResponseHandler<ReplicaResponse> handler = new ActionListenerResponseHandler<>(
379+
listener,
380+
ReplicaResponse::new
381+
);
382+
transportService.sendRequest(node, transportPrimaryTermValidationAction, validationRequest, transportOptions, handler);
383+
}
152384
}
153385

154386
@Override
@@ -196,7 +428,7 @@ protected long primaryOperationSize(BulkShardRequest request) {
196428
}
197429

198430
@Override
199-
protected ReplicationMode getReplicationMode(IndexShard indexShard) {
431+
public ReplicationMode getReplicationMode(IndexShard indexShard) {
200432
if (indexShard.isRemoteTranslogEnabled()) {
201433
return ReplicationMode.PRIMARY_TERM_VALIDATION;
202434
}

server/src/main/java/org/opensearch/action/support/replication/FanoutReplicationProxy.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,35 @@
88

99
package org.opensearch.action.support.replication;
1010

11+
import org.opensearch.action.ActionListener;
12+
import org.opensearch.action.support.replication.ReplicationOperation.ReplicaResponse;
13+
import org.opensearch.action.support.replication.ReplicationOperation.Replicas;
1114
import org.opensearch.cluster.routing.ShardRouting;
1215

16+
import java.util.function.BiConsumer;
17+
import java.util.function.Consumer;
18+
1319
/**
1420
* This implementation of {@link ReplicationProxy} fans out the replication request to current shard routing if
1521
* it is not the primary and has replication mode as {@link ReplicationMode#FULL_REPLICATION}.
1622
*
1723
* @opensearch.internal
1824
*/
19-
public class FanoutReplicationProxy<ReplicaRequest> extends ReplicationProxy<ReplicaRequest> {
25+
public class FanoutReplicationProxy<ReplicaRequest extends ReplicationRequest<ReplicaRequest>> extends ReplicationProxy<ReplicaRequest> {
26+
27+
public FanoutReplicationProxy(Replicas<ReplicaRequest> replicasProxy) {
28+
super(replicasProxy);
29+
}
30+
31+
@Override
32+
protected void performOnReplicaProxy(
33+
ReplicationProxyRequest<ReplicaRequest> proxyRequest,
34+
ReplicationMode replicationMode,
35+
BiConsumer<Consumer<ActionListener<ReplicaResponse>>, ReplicationProxyRequest<ReplicaRequest>> performOnReplicaConsumer
36+
) {
37+
assert replicationMode == ReplicationMode.FULL_REPLICATION : "FanoutReplicationProxy allows only full replication mode";
38+
performOnReplicaConsumer.accept(getReplicasProxyConsumer(fullReplicationProxy, proxyRequest), proxyRequest);
39+
}
2040

2141
@Override
2242
ReplicationMode determineReplicationMode(ShardRouting shardRouting, ShardRouting primaryRouting) {

0 commit comments

Comments
 (0)