3838import org .apache .logging .log4j .util .MessageSupplier ;
3939import org .opensearch .ExceptionsHelper ;
4040import org .opensearch .action .ActionListener ;
41+ import org .opensearch .action .ActionListenerResponseHandler ;
4142import org .opensearch .action .ActionRunnable ;
4243import org .opensearch .action .DocWriteRequest ;
4344import org .opensearch .action .DocWriteResponse ;
4647import org .opensearch .action .index .IndexRequest ;
4748import org .opensearch .action .index .IndexResponse ;
4849import org .opensearch .action .support .ActionFilters ;
50+ import org .opensearch .action .support .ChannelActionListener ;
51+ import org .opensearch .action .support .replication .ReplicationMode ;
52+ import org .opensearch .action .support .replication .ReplicationOperation ;
53+ import org .opensearch .action .support .replication .ReplicationTask ;
4954import org .opensearch .action .support .replication .TransportReplicationAction ;
5055import org .opensearch .action .support .replication .TransportWriteAction ;
5156import org .opensearch .action .update .UpdateHelper ;
5257import org .opensearch .action .update .UpdateRequest ;
5358import org .opensearch .action .update .UpdateResponse ;
59+ import org .opensearch .client .transport .NoNodeAvailableException ;
5460import org .opensearch .cluster .ClusterState ;
5561import org .opensearch .cluster .ClusterStateObserver ;
5662import org .opensearch .cluster .action .index .MappingUpdatedAction ;
5763import org .opensearch .cluster .action .shard .ShardStateAction ;
5864import org .opensearch .cluster .metadata .IndexMetadata ;
5965import org .opensearch .cluster .metadata .MappingMetadata ;
66+ import org .opensearch .cluster .node .DiscoveryNode ;
67+ import org .opensearch .cluster .routing .AllocationId ;
68+ import org .opensearch .cluster .routing .ShardRouting ;
6069import org .opensearch .cluster .service .ClusterService ;
6170import org .opensearch .common .bytes .BytesReference ;
6271import org .opensearch .common .collect .Tuple ;
6372import org .opensearch .common .compress .CompressedXContent ;
6473import org .opensearch .common .inject .Inject ;
6574import org .opensearch .common .io .stream .StreamInput ;
75+ import org .opensearch .common .io .stream .StreamOutput ;
76+ import org .opensearch .common .lease .Releasable ;
6677import org .opensearch .common .settings .Settings ;
6778import org .opensearch .common .unit .TimeValue ;
79+ import org .opensearch .common .util .concurrent .AbstractRunnable ;
6880import org .opensearch .common .xcontent .ToXContent ;
6981import org .opensearch .common .xcontent .XContentHelper ;
7082import org .opensearch .common .xcontent .XContentType ;
7890import org .opensearch .index .seqno .SequenceNumbers ;
7991import org .opensearch .index .shard .IndexShard ;
8092import org .opensearch .index .shard .ShardId ;
93+ import org .opensearch .index .shard .ShardNotFoundException ;
8194import org .opensearch .index .translog .Translog ;
8295import org .opensearch .indices .IndicesService ;
8396import org .opensearch .indices .SystemIndices ;
8497import org .opensearch .node .NodeClosedException ;
98+ import org .opensearch .tasks .Task ;
99+ import org .opensearch .tasks .TaskId ;
85100import org .opensearch .threadpool .ThreadPool ;
86101import org .opensearch .threadpool .ThreadPool .Names ;
102+ import org .opensearch .transport .TransportChannel ;
103+ import org .opensearch .transport .TransportRequest ;
87104import org .opensearch .transport .TransportRequestOptions ;
88105import org .opensearch .transport .TransportService ;
89106
90107import java .io .IOException ;
108+ import java .util .Locale ;
91109import java .util .Map ;
110+ import java .util .Objects ;
92111import java .util .concurrent .Executor ;
93112import java .util .function .Consumer ;
94113import java .util .function .Function ;
95114import java .util .function .LongSupplier ;
96115
97- import org .opensearch .action .support .replication .ReplicationMode ;
98-
99116/**
100117 * Performs shard-level bulk (index, delete or update) operations
101118 *
@@ -117,6 +134,15 @@ public class TransportShardBulkAction extends TransportWriteAction<BulkShardRequ
117134 private final UpdateHelper updateHelper ;
118135 private final MappingUpdatedAction mappingUpdatedAction ;
119136
137+ /**
138+ * This action is used for performing primary term validation. With remote translog enabled, the translogs would
139+ * be durably persisted in remote store. Without remote translog, current transport replication calls does primary
140+ * term validation as well as logically replicate the data. With remote translog, the primary would make calls to
141+ * replicas to perform primary term validation. This make sures an isolated primary fails to ack after primary
142+ * term validation in presence of a new primary.
143+ */
144+ private final String transportPrimaryTermValidationAction ;
145+
120146 @ Inject
121147 public TransportShardBulkAction (
122148 Settings settings ,
@@ -149,6 +175,212 @@ public TransportShardBulkAction(
149175 );
150176 this .updateHelper = updateHelper ;
151177 this .mappingUpdatedAction = mappingUpdatedAction ;
178+
179+ this .transportPrimaryTermValidationAction = ACTION_NAME + "[validate_primary_term]" ;
180+
181+ transportService .registerRequestHandler (
182+ transportPrimaryTermValidationAction ,
183+ executor ,
184+ true ,
185+ true ,
186+ PrimaryTermValidationRequest ::new ,
187+ this ::handlePrimaryTermValidationRequest
188+ );
189+ }
190+
191+ protected void handlePrimaryTermValidationRequest (
192+ final PrimaryTermValidationRequest request ,
193+ final TransportChannel channel ,
194+ final Task task
195+ ) {
196+ ActionListener <ReplicaResponse > listener = new ChannelActionListener <>(channel , transportPrimaryTermValidationAction , request );
197+ final ShardId shardId = request .getShardId ();
198+ assert shardId != null : "request shardId must be set" ;
199+ IndexShard replica = getIndexShard (shardId );
200+ try {
201+ new PrimaryTermValidationReplicaAction (listener , replica , (ReplicationTask ) task , request ).run ();
202+ } catch (RuntimeException e ) {
203+ listener .onFailure (e );
204+ }
205+ }
206+
207+ /**
208+ * This action is the primary term validation action which is used for doing primary term validation with replicas.
209+ * This is only applicable for TransportShardBulkAction because all writes (delete/update/single write/bulk)
210+ * ultimately boils down to TransportShardBulkAction and isolated primary could continue to acknowledge if it is not
211+ * aware that the primary has changed. This helps achieve the same. More details in java doc of
212+ * {@link TransportShardBulkAction#transportPrimaryTermValidationAction}.
213+ *
214+ * @opensearch.internal
215+ */
216+ private static final class PrimaryTermValidationReplicaAction extends AbstractRunnable implements ActionListener <Releasable > {
217+
218+ private final ActionListener <ReplicaResponse > onCompletionListener ;
219+ private final IndexShard replica ;
220+ private final ReplicationTask task ;
221+ private final PrimaryTermValidationRequest request ;
222+
223+ public PrimaryTermValidationReplicaAction (
224+ ActionListener <ReplicaResponse > onCompletionListener ,
225+ IndexShard replica ,
226+ ReplicationTask task ,
227+ PrimaryTermValidationRequest request
228+ ) {
229+ this .onCompletionListener = onCompletionListener ;
230+ this .replica = replica ;
231+ this .task = task ;
232+ this .request = request ;
233+ }
234+
235+ @ Override
236+ public void onResponse (Releasable releasable ) {
237+ setPhase (task , "finished" );
238+ onCompletionListener .onResponse (new ReplicaResponse (SequenceNumbers .NO_OPS_PERFORMED , SequenceNumbers .NO_OPS_PERFORMED ));
239+ }
240+
241+ @ Override
242+ public void onFailure (Exception e ) {
243+ setPhase (task , "failed" );
244+ onCompletionListener .onFailure (e );
245+ }
246+
247+ @ Override
248+ protected void doRun () throws Exception {
249+ setPhase (task , "primary-term-validation" );
250+ final String actualAllocationId = this .replica .routingEntry ().allocationId ().getId ();
251+ if (actualAllocationId .equals (request .getTargetAllocationID ()) == false ) {
252+ throw new ShardNotFoundException (
253+ this .replica .shardId (),
254+ "expected allocation id [{}] but found [{}]" ,
255+ request .getTargetAllocationID (),
256+ actualAllocationId
257+ );
258+ }
259+ // Check operation primary term against the incoming primary term
260+ // If the request primary term is low, then trigger lister failure
261+ if (request .getPrimaryTerm () < replica .getOperationPrimaryTerm ()) {
262+ final String message = String .format (
263+ Locale .ROOT ,
264+ "%s operation primary term [%d] is too old (current [%d])" ,
265+ request .getShardId (),
266+ request .getPrimaryTerm (),
267+ replica .getOperationPrimaryTerm ()
268+ );
269+ onFailure (new IllegalStateException (message ));
270+ } else {
271+ onResponse (null );
272+ }
273+ }
274+ }
275+
276+ /**
277+ * Primary term validation request sent to a specific allocation id
278+ *
279+ * @opensearch.internal
280+ */
281+ protected static final class PrimaryTermValidationRequest extends TransportRequest {
282+
283+ /**
284+ * {@link AllocationId#getId()} of the shard this request is sent to
285+ **/
286+ private final String targetAllocationID ;
287+ private final long primaryTerm ;
288+ private final ShardId shardId ;
289+
290+ public PrimaryTermValidationRequest (String targetAllocationID , long primaryTerm , ShardId shardId ) {
291+ this .targetAllocationID = Objects .requireNonNull (targetAllocationID );
292+ this .primaryTerm = primaryTerm ;
293+ this .shardId = Objects .requireNonNull (shardId );
294+ }
295+
296+ public PrimaryTermValidationRequest (StreamInput in ) throws IOException {
297+ super (in );
298+ targetAllocationID = in .readString ();
299+ primaryTerm = in .readVLong ();
300+ shardId = new ShardId (in );
301+ }
302+
303+ @ Override
304+ public void writeTo (StreamOutput out ) throws IOException {
305+ super .writeTo (out );
306+ out .writeString (targetAllocationID );
307+ out .writeVLong (primaryTerm );
308+ shardId .writeTo (out );
309+ }
310+
311+ @ Override
312+ public Task createTask (long id , String type , String action , TaskId parentTaskId , Map <String , String > headers ) {
313+ return new ReplicationTask (id , type , action , getDescription (), parentTaskId , headers );
314+ }
315+
316+ public String getTargetAllocationID () {
317+ return targetAllocationID ;
318+ }
319+
320+ public long getPrimaryTerm () {
321+ return primaryTerm ;
322+ }
323+
324+ public ShardId getShardId () {
325+ return shardId ;
326+ }
327+
328+ @ Override
329+ public String getDescription () {
330+ return toString ();
331+ }
332+
333+ @ Override
334+ public String toString () {
335+ return "PrimaryTermValidationRequest ["
336+ + shardId
337+ + "] for targetAllocationID ["
338+ + targetAllocationID
339+ + "] with primaryTerm ["
340+ + primaryTerm
341+ + "]" ;
342+ }
343+ }
344+
345+ @ Override
346+ protected ReplicationOperation .Replicas <BulkShardRequest > primaryTermValidationReplicasProxy () {
347+ return new PrimaryTermValidationProxy ();
348+ }
349+
350+ /**
351+ * This {@link org.opensearch.action.support.replication.TransportReplicationAction.ReplicasProxy} implementation is
352+ * used for primary term validation and is only relevant for TransportShardBulkAction replication action.
353+ *
354+ * @opensearch.internal
355+ */
356+ private final class PrimaryTermValidationProxy extends WriteActionReplicasProxy {
357+
358+ @ Override
359+ public void performOn (
360+ ShardRouting replica ,
361+ BulkShardRequest request ,
362+ long primaryTerm ,
363+ long globalCheckpoint ,
364+ long maxSeqNoOfUpdatesOrDeletes ,
365+ ActionListener <ReplicationOperation .ReplicaResponse > listener
366+ ) {
367+ String nodeId = replica .currentNodeId ();
368+ final DiscoveryNode node = clusterService .state ().nodes ().get (nodeId );
369+ if (node == null ) {
370+ listener .onFailure (new NoNodeAvailableException ("unknown node [" + nodeId + "]" ));
371+ return ;
372+ }
373+ final PrimaryTermValidationRequest validationRequest = new PrimaryTermValidationRequest (
374+ replica .allocationId ().getId (),
375+ primaryTerm ,
376+ replica .shardId ()
377+ );
378+ final ActionListenerResponseHandler <ReplicaResponse > handler = new ActionListenerResponseHandler <>(
379+ listener ,
380+ ReplicaResponse ::new
381+ );
382+ transportService .sendRequest (node , transportPrimaryTermValidationAction , validationRequest , transportOptions , handler );
383+ }
152384 }
153385
154386 @ Override
@@ -196,7 +428,7 @@ protected long primaryOperationSize(BulkShardRequest request) {
196428 }
197429
198430 @ Override
199- protected ReplicationMode getReplicationMode (IndexShard indexShard ) {
431+ public ReplicationMode getReplicationMode (IndexShard indexShard ) {
200432 if (indexShard .isRemoteTranslogEnabled ()) {
201433 return ReplicationMode .PRIMARY_TERM_VALIDATION ;
202434 }
0 commit comments