@@ -223,7 +223,15 @@ public final void run() {
223223 assert shardRoutings .skip () == false ;
224224 assert shardItIndexMap .containsKey (shardRoutings );
225225 int shardIndex = shardItIndexMap .get (shardRoutings );
226- performPhaseOnShard (shardIndex , shardRoutings , shardRoutings .nextOrNull ());
226+ final SearchShardTarget shard = shardRoutings .nextOrNull ();
227+ if (shard != null ) {
228+ performPhaseOnShard (shardIndex , shardRoutings , shard );
229+ } else {
230+ SearchShardTarget unassignedShard = new SearchShardTarget (null , shardRoutings .shardId (),
231+ shardRoutings .getClusterAlias (), shardRoutings .getOriginalIndices ());
232+ onShardFailure (shardIndex , unassignedShard , shardRoutings ,
233+ new NoShardAvailableActionException (shardRoutings .shardId ()));
234+ }
227235 }
228236 }
229237 }
@@ -252,63 +260,61 @@ private boolean checkMinimumVersion(GroupShardsIterator<SearchShardIterator> sha
252260 }
253261
254262 protected void performPhaseOnShard (final int shardIndex , final SearchShardIterator shardIt , final SearchShardTarget shard ) {
263+ if (shard == null ) {
264+ assert false : "Target shard must be non-null: " + shardIt ;
265+ throw new IllegalStateException ("" );
266+ }
255267 /*
256268 * We capture the thread that this phase is starting on. When we are called back after executing the phase, we are either on the
257269 * same thread (because we never went async, or the same thread was selected from the thread pool) or a different thread. If we
258270 * continue on the same thread in the case that we never went async and this happens repeatedly we will end up recursing deeply and
259271 * could stack overflow. To prevent this, we fork if we are called back on the same thread that execution started on and otherwise
260272 * we can continue (cf. InitialSearchPhase#maybeFork).
261273 */
262- if (shard == null ) {
263- SearchShardTarget unassignedShard = new SearchShardTarget (null , shardIt .shardId (),
264- shardIt .getClusterAlias (), shardIt .getOriginalIndices ());
265- fork (() -> onShardFailure (shardIndex , unassignedShard , shardIt , new NoShardAvailableActionException (shardIt .shardId ())));
266- } else {
267- final PendingExecutions pendingExecutions = throttleConcurrentRequests ?
268- pendingExecutionsPerNode .computeIfAbsent (shard .getNodeId (), n -> new PendingExecutions (maxConcurrentRequestsPerNode ))
269- : null ;
270- Runnable r = () -> {
271- final Thread thread = Thread .currentThread ();
272- try {
273- executePhaseOnShard (shardIt , shard ,
274- new SearchActionListener <Result >(shard , shardIndex ) {
275- @ Override
276- public void innerOnResponse (Result result ) {
277- try {
278- onShardResult (result , shardIt );
279- } catch (Exception exc ) {
280- onShardFailure (shardIndex , shard , shardIt , exc );
281- } finally {
282- executeNext (pendingExecutions , thread );
283- }
274+ final PendingExecutions pendingExecutions = throttleConcurrentRequests ?
275+ pendingExecutionsPerNode .computeIfAbsent (shard .getNodeId (), n -> new PendingExecutions (maxConcurrentRequestsPerNode ))
276+ : null ;
277+ Runnable r = () -> {
278+ final Thread thread = Thread .currentThread ();
279+ try {
280+ executePhaseOnShard (shardIt , shard ,
281+ new SearchActionListener <Result >(shard , shardIndex ) {
282+ @ Override
283+ public void innerOnResponse (Result result ) {
284+ try {
285+ onShardResult (result , shardIt );
286+ } catch (Exception exc ) {
287+ onShardFailure (shardIndex , shard , shardIt , exc );
288+ } finally {
289+ executeNext (pendingExecutions , thread );
284290 }
291+ }
285292
286- @ Override
287- public void onFailure (Exception t ) {
288- try {
289- onShardFailure (shardIndex , shard , shardIt , t );
290- } finally {
291- executeNext (pendingExecutions , thread );
292- }
293+ @ Override
294+ public void onFailure (Exception t ) {
295+ try {
296+ onShardFailure (shardIndex , shard , shardIt , t );
297+ } finally {
298+ executeNext (pendingExecutions , thread );
293299 }
294- });
295- } catch ( final Exception e ) {
296- try {
297- /*
298- * It is possible to run into connection exceptions here because we are getting the connection early and might
299- * run into nodes that are not connected. In this case, on shard failure will move us to the next shard copy.
300- */
301- fork (() -> onShardFailure ( shardIndex , shard , shardIt , e ));
302- } finally {
303- executeNext ( pendingExecutions , thread );
304- }
300+ }
301+ });
302+ } catch ( final Exception e ) {
303+ try {
304+ /*
305+ * It is possible to run into connection exceptions here because we are getting the connection early and might
306+ * run into nodes that are not connected. In this case, on shard failure will move us to the next shard copy.
307+ */
308+ fork (() -> onShardFailure ( shardIndex , shard , shardIt , e ));
309+ } finally {
310+ executeNext ( pendingExecutions , thread );
305311 }
306- };
307- if (throttleConcurrentRequests ) {
308- pendingExecutions .tryRun (r );
309- } else {
310- r .run ();
311312 }
313+ };
314+ if (throttleConcurrentRequests ) {
315+ pendingExecutions .tryRun (r );
316+ } else {
317+ r .run ();
312318 }
313319 }
314320
0 commit comments