Skip to content

Commit daa9fc7

Browse files
committed
Refactor
1 parent f7a5171 commit daa9fc7

2 files changed

Lines changed: 49 additions & 76 deletions

File tree

server/src/main/java/org/elasticsearch/index/shard/IndexShard.java

Lines changed: 25 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -607,12 +607,10 @@ public IndexShardState markAsRecovering(String reason, RecoveryState recoverySta
607607
public void relocated(final Consumer<ReplicationTracker.PrimaryContext> consumer)
608608
throws IllegalIndexShardStateException, InterruptedException {
609609
assert shardRouting.primary() : "only primaries can be marked as relocated: " + shardRouting;
610-
refreshListeners.disallowAdd();
610+
final Releasable forceRefreshes = refreshListeners.forceRefreshes();
611611
try {
612-
if (refreshListeners.refreshNeeded()) {
613-
refresh("relocated");
614-
}
615612
indexShardOperationPermits.blockOperations(30, TimeUnit.MINUTES, () -> {
613+
forceRefreshes.close();
616614
// no shard operation permits are being held here, move state from started to relocated
617615
assert indexShardOperationPermits.getActiveOperationsCount() == 0 :
618616
"in-flight operations in progress while moving shard state to relocated";
@@ -644,7 +642,7 @@ public void relocated(final Consumer<ReplicationTracker.PrimaryContext> consumer
644642
failShard("timed out waiting for relocation hand-off to complete", null);
645643
throw new IndexShardClosedException(shardId(), "timed out waiting for relocation hand-off to complete");
646644
} finally {
647-
refreshListeners.allowAdd();
645+
forceRefreshes.close();
648646
}
649647
}
650648

@@ -2345,15 +2343,22 @@ public void acquireAllPrimaryOperationsPermits(final ActionListener<Releasable>
23452343
verifyNotClosed();
23462344
assert shardRouting.primary() : "acquireAllPrimaryOperationsPermits should only be called on primary shard: " + shardRouting;
23472345

2348-
refreshListeners.disallowAdd();
2346+
asyncBlockOperations(onPermitAcquired, timeout.duration(), timeout.timeUnit());
2347+
}
2348+
2349+
private void asyncBlockOperations(ActionListener<Releasable> onPermitAcquired, long timeout, TimeUnit timeUnit) {
2350+
final Releasable forceRefreshes = refreshListeners.forceRefreshes();
2351+
final ActionListener<Releasable> wrappedListener = ActionListener.wrap(r -> {
2352+
forceRefreshes.close();
2353+
onPermitAcquired.onResponse(r);
2354+
}, e -> {
2355+
forceRefreshes.close();
2356+
onPermitAcquired.onFailure(e);
2357+
});
23492358
try {
2350-
if (refreshListeners.refreshNeeded()) {
2351-
refresh("acquire all primary operations permits");
2352-
}
2353-
indexShardOperationPermits.asyncBlockOperations(
2354-
ensuringAllowRefreshListeners(onPermitAcquired), timeout.duration(), timeout.timeUnit());
2359+
indexShardOperationPermits.asyncBlockOperations(wrappedListener, timeout, timeUnit);
23552360
} catch (Exception e) {
2356-
refreshListeners.allowAdd();
2361+
forceRefreshes.close();
23572362
throw e;
23582363
}
23592364
}
@@ -2365,7 +2370,7 @@ private <E extends Exception> void bumpPrimaryTerm(final long newPrimaryTerm,
23652370
assert newPrimaryTerm > pendingPrimaryTerm || (newPrimaryTerm >= pendingPrimaryTerm && combineWithAction != null);
23662371
assert operationPrimaryTerm <= pendingPrimaryTerm;
23672372
final CountDownLatch termUpdated = new CountDownLatch(1);
2368-
final ActionListener<Releasable> actionListener = new ActionListener<Releasable>() {
2373+
asyncBlockOperations(new ActionListener<Releasable>() {
23692374
@Override
23702375
public void onFailure(final Exception e) {
23712376
try {
@@ -2411,41 +2416,9 @@ public void onResponse(final Releasable releasable) {
24112416
}
24122417
}
24132418
}
2414-
};
2415-
refreshListeners.disallowAdd();
2416-
try {
2417-
if (refreshListeners.refreshNeeded()) {
2418-
refresh("bump primary term");
2419-
}
2420-
indexShardOperationPermits.asyncBlockOperations(ensuringAllowRefreshListeners(actionListener), 30, TimeUnit.MINUTES);
2421-
pendingPrimaryTerm = newPrimaryTerm;
2422-
termUpdated.countDown();
2423-
} catch (Exception e) {
2424-
refreshListeners.allowAdd();
2425-
throw e;
2426-
}
2427-
}
2428-
2429-
/**
2430-
* Wraps an ActionListener so that {@link RefreshListeners#allowAdd()} is always called on {@link #refreshListeners}
2431-
* after the listeners actions have executed.
2432-
* @param actionListener ActionListener to wrap
2433-
* @return Wrapped ActionListener
2434-
*/
2435-
private ActionListener<Releasable> ensuringAllowRefreshListeners(ActionListener<Releasable> actionListener) {
2436-
return ActionListener.wrap(r -> {
2437-
try {
2438-
actionListener.onResponse(r);
2439-
} finally {
2440-
refreshListeners.allowAdd();
2441-
}
2442-
}, e -> {
2443-
try {
2444-
actionListener.onFailure(e);
2445-
} finally {
2446-
refreshListeners.allowAdd();
2447-
}
2448-
});
2419+
}, 30, TimeUnit.MINUTES);
2420+
pendingPrimaryTerm = newPrimaryTerm;
2421+
termUpdated.countDown();
24492422
}
24502423

24512424
/**
@@ -2490,21 +2463,10 @@ public void acquireAllReplicaOperationsPermits(final long opPrimaryTerm,
24902463
final long maxSeqNoOfUpdatesOrDeletes,
24912464
final ActionListener<Releasable> onPermitAcquired,
24922465
final TimeValue timeout) {
2493-
refreshListeners.disallowAdd();
2494-
try {
2495-
if (refreshListeners.refreshNeeded()) {
2496-
refresh("acquire all replica operations permits");
2497-
}
2498-
innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes,
2499-
onPermitAcquired, true,
2500-
listener -> indexShardOperationPermits.asyncBlockOperations(
2501-
ensuringAllowRefreshListeners(listener), timeout.duration(), timeout.timeUnit()
2502-
)
2503-
);
2504-
} catch (Exception e) {
2505-
refreshListeners.allowAdd();
2506-
throw e;
2507-
}
2466+
innerAcquireReplicaOperationPermit(opPrimaryTerm, globalCheckpoint, maxSeqNoOfUpdatesOrDeletes,
2467+
onPermitAcquired, true,
2468+
listener -> asyncBlockOperations(listener, timeout.duration(), timeout.timeUnit())
2469+
);
25082470
}
25092471

25102472
private void innerAcquireReplicaOperationPermit(final long opPrimaryTerm,

server/src/main/java/org/elasticsearch/index/shard/RefreshListeners.java

Lines changed: 24 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import org.apache.logging.log4j.Logger;
2323
import org.apache.lucene.search.ReferenceManager;
2424
import org.elasticsearch.common.collect.Tuple;
25+
import org.elasticsearch.common.lease.Releasable;
26+
import org.elasticsearch.common.util.concurrent.RunOnce;
2527
import org.elasticsearch.common.util.concurrent.ThreadContext;
2628
import org.elasticsearch.index.translog.Translog;
2729

@@ -55,7 +57,7 @@ public final class RefreshListeners implements ReferenceManager.RefreshListener,
5557
private volatile boolean closed = false;
5658

5759
/**
58-
* Prevents new refresh listeners from being registered while {@code >= 0}. Used to prevent becoming blocked on operations waiting for
60+
* Force-refreshes new refresh listeners that are added while {@code >= 0}. Used to prevent becoming blocked on operations waiting for
5961
* refresh during relocation.
6062
*/
6163
private int refreshForcers;
@@ -83,19 +85,28 @@ public RefreshListeners(IntSupplier getMaxRefreshListeners, Runnable forceRefres
8385
}
8486

8587
/**
86-
* Prohibit adding new refresh listeners. See {@link #refreshForcers}.
88+
* Force-refreshes newly added listeners and forces a refresh if there are currently listeners registered. See {@link #refreshForcers}.
8789
*/
88-
public synchronized void disallowAdd() {
89-
refreshForcers += 1;
90-
assert refreshForcers >= 0;
91-
}
92-
93-
/**
94-
* Enable adding new refresh listeners. See {@link #refreshForcers}.
95-
*/
96-
public synchronized void allowAdd() {
97-
refreshForcers -= 1;
98-
assert refreshForcers >= 0;
90+
public Releasable forceRefreshes() {
91+
synchronized (this) {
92+
assert refreshForcers >= 0;
93+
refreshForcers += 1;
94+
}
95+
final RunOnce runOnce = new RunOnce(() -> {
96+
synchronized (RefreshListeners.this) {
97+
assert refreshForcers > 0;
98+
refreshForcers -= 1;
99+
}
100+
});
101+
if (refreshNeeded()) {
102+
try {
103+
forceRefresh.run();
104+
} catch (Exception e) {
105+
runOnce.run();
106+
throw e;
107+
}
108+
}
109+
return () -> runOnce.run();
99110
}
100111

101112
/**

0 commit comments

Comments
 (0)