Skip to content

Commit d128869

Browse files
committed
Avoid forking in AbstractSearchAsyncAction
1 parent 7b5cf95 commit d128869

2 files changed

Lines changed: 63 additions & 46 deletions

File tree

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 52 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -223,7 +223,15 @@ public final void run() {
223223
assert shardRoutings.skip() == false;
224224
assert shardItIndexMap.containsKey(shardRoutings);
225225
int shardIndex = shardItIndexMap.get(shardRoutings);
226-
performPhaseOnShard(shardIndex, shardRoutings, shardRoutings.nextOrNull());
226+
final SearchShardTarget shard = shardRoutings.nextOrNull();
227+
if (shard != null) {
228+
performPhaseOnShard(shardIndex, shardRoutings, shard);
229+
} else {
230+
SearchShardTarget unassignedShard = new SearchShardTarget(null, shardRoutings.shardId(),
231+
shardRoutings.getClusterAlias(), shardRoutings.getOriginalIndices());
232+
onShardFailure(shardIndex, unassignedShard, shardRoutings,
233+
new NoShardAvailableActionException(shardRoutings.shardId()));
234+
}
227235
}
228236
}
229237
}
@@ -252,63 +260,61 @@ private boolean checkMinimumVersion(GroupShardsIterator<SearchShardIterator> sha
252260
}
253261

254262
protected void performPhaseOnShard(final int shardIndex, final SearchShardIterator shardIt, final SearchShardTarget shard) {
263+
if (shard == null) {
264+
assert false : "Target shard must be non-null: " + shardIt;
265+
throw new IllegalStateException("");
266+
}
255267
/*
256268
* We capture the thread that this phase is starting on. When we are called back after executing the phase, we are either on the
257269
* same thread (because we never went async, or the same thread was selected from the thread pool) or a different thread. If we
258270
* continue on the same thread in the case that we never went async and this happens repeatedly we will end up recursing deeply and
259271
* could stack overflow. To prevent this, we fork if we are called back on the same thread that execution started on and otherwise
260272
* we can continue (cf. InitialSearchPhase#maybeFork).
261273
*/
262-
if (shard == null) {
263-
SearchShardTarget unassignedShard = new SearchShardTarget(null, shardIt.shardId(),
264-
shardIt.getClusterAlias(), shardIt.getOriginalIndices());
265-
fork(() -> onShardFailure(shardIndex, unassignedShard, shardIt, new NoShardAvailableActionException(shardIt.shardId())));
266-
} else {
267-
final PendingExecutions pendingExecutions = throttleConcurrentRequests ?
268-
pendingExecutionsPerNode.computeIfAbsent(shard.getNodeId(), n -> new PendingExecutions(maxConcurrentRequestsPerNode))
269-
: null;
270-
Runnable r = () -> {
271-
final Thread thread = Thread.currentThread();
272-
try {
273-
executePhaseOnShard(shardIt, shard,
274-
new SearchActionListener<Result>(shard, shardIndex) {
275-
@Override
276-
public void innerOnResponse(Result result) {
277-
try {
278-
onShardResult(result, shardIt);
279-
} catch (Exception exc) {
280-
onShardFailure(shardIndex, shard, shardIt, exc);
281-
} finally {
282-
executeNext(pendingExecutions, thread);
283-
}
274+
final PendingExecutions pendingExecutions = throttleConcurrentRequests ?
275+
pendingExecutionsPerNode.computeIfAbsent(shard.getNodeId(), n -> new PendingExecutions(maxConcurrentRequestsPerNode))
276+
: null;
277+
Runnable r = () -> {
278+
final Thread thread = Thread.currentThread();
279+
try {
280+
executePhaseOnShard(shardIt, shard,
281+
new SearchActionListener<Result>(shard, shardIndex) {
282+
@Override
283+
public void innerOnResponse(Result result) {
284+
try {
285+
onShardResult(result, shardIt);
286+
} catch (Exception exc) {
287+
onShardFailure(shardIndex, shard, shardIt, exc);
288+
} finally {
289+
executeNext(pendingExecutions, thread);
284290
}
291+
}
285292

286-
@Override
287-
public void onFailure(Exception t) {
288-
try {
289-
onShardFailure(shardIndex, shard, shardIt, t);
290-
} finally {
291-
executeNext(pendingExecutions, thread);
292-
}
293+
@Override
294+
public void onFailure(Exception t) {
295+
try {
296+
onShardFailure(shardIndex, shard, shardIt, t);
297+
} finally {
298+
executeNext(pendingExecutions, thread);
293299
}
294-
});
295-
} catch (final Exception e) {
296-
try {
297-
/*
298-
* It is possible to run into connection exceptions here because we are getting the connection early and might
299-
* run into nodes that are not connected. In this case, on shard failure will move us to the next shard copy.
300-
*/
301-
fork(() -> onShardFailure(shardIndex, shard, shardIt, e));
302-
} finally {
303-
executeNext(pendingExecutions, thread);
304-
}
300+
}
301+
});
302+
} catch (final Exception e) {
303+
try {
304+
/*
305+
* It is possible to run into connection exceptions here because we are getting the connection early and might
306+
* run into nodes that are not connected. In this case, on shard failure will move us to the next shard copy.
307+
*/
308+
fork(() -> onShardFailure(shardIndex, shard, shardIt, e));
309+
} finally {
310+
executeNext(pendingExecutions, thread);
305311
}
306-
};
307-
if (throttleConcurrentRequests) {
308-
pendingExecutions.tryRun(r);
309-
} else {
310-
r.run();
311312
}
313+
};
314+
if (throttleConcurrentRequests) {
315+
pendingExecutions.tryRun(r);
316+
} else {
317+
r.run();
312318
}
313319
}
314320

server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,4 +158,15 @@ public int compareTo(SearchShardIterator o) {
158158
.thenComparing(SearchShardIterator::getClusterAlias, Comparator.nullsFirst(String::compareTo))
159159
.compare(this, o);
160160
}
161+
162+
@Override
163+
public String toString() {
164+
return "SearchShardIterator{" +
165+
"originalIndices=" + originalIndices +
166+
", clusterAlias='" + clusterAlias + '\'' +
167+
", shardId=" + shardId +
168+
", searchContextId=" + searchContextId +
169+
", searchContextKeepAlive=" + searchContextKeepAlive +
170+
'}';
171+
}
161172
}

0 commit comments

Comments
 (0)