Skip to content

Commit 6de5edd

Browse files
authored
Fix several potential circuit breaker leaks in Aggregators (#79676)
This commit adds a new CircuitBreaker implementation in the test that throws CircuitBreaker Exceptions randomly. This new circuit breaker helps uncover several places where we might leak if the circuit breaker throws such exception.
1 parent 564ff9d commit 6de5edd

18 files changed

Lines changed: 391 additions & 91 deletions

File tree

server/src/main/java/org/elasticsearch/search/aggregations/bucket/BucketsAggregator.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,8 +99,11 @@ public final void collectExistingBucket(LeafBucketCollector subCollector, int do
9999
* If a bucket's ordinal is mapped to -1 then the bucket is removed entirely.
100100
*/
101101
public final void rewriteBuckets(long newNumBuckets, LongUnaryOperator mergeMap) {
102-
try (LongArray oldDocCounts = docCounts) {
102+
LongArray oldDocCounts = docCounts;
103+
boolean success = false;
104+
try {
103105
docCounts = bigArrays().newLongArray(newNumBuckets, true);
106+
success = true;
104107
docCounts.fill(0, newNumBuckets, 0);
105108
for (long i = 0; i < oldDocCounts.size(); i++) {
106109
long docCount = oldDocCounts.get(i);
@@ -113,6 +116,10 @@ public final void rewriteBuckets(long newNumBuckets, LongUnaryOperator mergeMap)
113116
docCounts.increment(destinationOrdinal, docCount);
114117
}
115118
}
119+
} finally {
120+
if (success) {
121+
oldDocCounts.close();
122+
}
116123
}
117124
}
118125

server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/BinaryValuesSource.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,15 @@ class BinaryValuesSource extends SingleDimensionValuesSource<BytesRef> {
5252
this.breakerConsumer = breakerConsumer;
5353
this.docValuesFunc = docValuesFunc;
5454
this.values = bigArrays.newObjectArray(Math.min(size, 100));
55-
this.valueBuilders = bigArrays.newObjectArray(Math.min(size, 100));
55+
boolean success = false;
56+
try {
57+
this.valueBuilders = bigArrays.newObjectArray(Math.min(size, 100));
58+
success = true;
59+
} finally {
60+
if (success == false) {
61+
close();
62+
}
63+
}
5664
}
5765

5866
@Override

server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/CompositeAggregator.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,9 @@ protected void doClose() {
149149
try {
150150
Releasables.close(queue);
151151
} finally {
152-
Releasables.close(sources);
152+
if (sources != null) {
153+
Releasables.close(sources);
154+
}
153155
}
154156
}
155157

server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/DoubleValuesSource.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,15 @@ class DoubleValuesSource extends SingleDimensionValuesSource<Double> {
4646
super(bigArrays, format, fieldType, missingBucket, missingOrder, size, reverseMul);
4747
this.docValuesFunc = docValuesFunc;
4848
this.bits = this.missingBucket ? new BitArray(100, bigArrays) : null;
49-
this.values = bigArrays.newDoubleArray(Math.min(size, 100), false);
49+
boolean success = false;
50+
try {
51+
this.values = bigArrays.newDoubleArray(Math.min(size, 100), false);
52+
success = true;
53+
} finally {
54+
if (success == false) {
55+
close();
56+
}
57+
}
5058
}
5159

5260
@Override

server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/LongValuesSource.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,15 @@ class LongValuesSource extends SingleDimensionValuesSource<Long> {
6464
this.docValuesFunc = docValuesFunc;
6565
this.rounding = rounding;
6666
this.bits = missingBucket ? new BitArray(Math.min(size, 100), bigArrays) : null;
67-
this.values = bigArrays.newLongArray(Math.min(size, 100), false);
67+
boolean success = false;
68+
try {
69+
this.values = bigArrays.newLongArray(Math.min(size, 100), false);
70+
success = true;
71+
} finally {
72+
if (success == false) {
73+
close();
74+
}
75+
}
6876
}
6977

7078
@Override

server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/AutoDateHistogramAggregator.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -276,10 +276,13 @@ private void increaseRoundingIfNeeded(long rounded) {
276276
return;
277277
}
278278
do {
279-
try (LongKeyedBucketOrds oldOrds = bucketOrds) {
279+
LongKeyedBucketOrds oldOrds = bucketOrds;
280+
boolean success = false;
281+
try {
280282
preparedRounding = prepareRounding(++roundingIdx);
281283
long[] mergeMap = new long[Math.toIntExact(oldOrds.size())];
282284
bucketOrds = new LongKeyedBucketOrds.FromSingle(bigArrays());
285+
success = true; // now it is safe to close oldOrds after we finish
283286
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = oldOrds.ordsEnum(0);
284287
while (ordsEnum.next()) {
285288
long oldKey = ordsEnum.value();
@@ -288,6 +291,10 @@ private void increaseRoundingIfNeeded(long rounded) {
288291
mergeMap[(int) ordsEnum.ord()] = newBucketOrd >= 0 ? newBucketOrd : -1 - newBucketOrd;
289292
}
290293
merge(mergeMap, bucketOrds.size());
294+
} finally {
295+
if (success) {
296+
oldOrds.close();
297+
}
291298
}
292299
} while (roundingIdx < roundingInfos.length - 1
293300
&& (bucketOrds.size() > targetBuckets * roundingInfos[roundingIdx].getMaximumInnerInterval()
@@ -527,9 +534,12 @@ private int increaseRoundingIfNeeded(long owningBucketOrd, int oldEstimatedBucke
527534

528535
private void rebucket() {
529536
rebucketCount++;
530-
try (LongKeyedBucketOrds oldOrds = bucketOrds) {
537+
LongKeyedBucketOrds oldOrds = bucketOrds;
538+
boolean success = false;
539+
try {
531540
long[] mergeMap = new long[Math.toIntExact(oldOrds.size())];
532541
bucketOrds = new LongKeyedBucketOrds.FromMany(bigArrays());
542+
success = true;
533543
for (long owningBucketOrd = 0; owningBucketOrd <= oldOrds.maxOwningBucketOrd(); owningBucketOrd++) {
534544
LongKeyedBucketOrds.BucketOrdsEnum ordsEnum = oldOrds.ordsEnum(owningBucketOrd);
535545
Rounding.Prepared preparedRounding = preparedRoundings[roundingIndexFor(owningBucketOrd)];
@@ -543,6 +553,10 @@ private void rebucket() {
543553
liveBucketCountUnderestimate.set(owningBucketOrd, Math.toIntExact(bucketOrds.bucketsInOrd(owningBucketOrd)));
544554
}
545555
merge(mergeMap, bucketOrds.size());
556+
} finally {
557+
if (success) {
558+
oldOrds.close();
559+
}
546560
}
547561
}
548562

server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/VariableWidthHistogramAggregator.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,16 @@ private class MergeBucketsPhase extends CollectionPhase {
159159

160160
MergeBucketsPhase(DoubleArray buffer, int bufferSize) {
161161
// Cluster the documents to reduce the number of buckets
162-
bucketBufferedDocs(buffer, bufferSize, mergePhaseInitialBucketCount(shardSize));
162+
boolean success = false;
163+
try {
164+
bucketBufferedDocs(buffer, bufferSize, mergePhaseInitialBucketCount(shardSize));
165+
success = true;
166+
} finally {
167+
if (success == false) {
168+
close();
169+
clusterMaxes = clusterMins = clusterCentroids = clusterSizes = null;
170+
}
171+
}
163172

164173
if (bufferSize > 1) {
165174
updateAvgBucketDistance();

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/BytesKeyedBucketOrds.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,15 @@ private static class FromMany extends BytesKeyedBucketOrds {
159159

160160
private FromMany(BigArrays bigArrays) {
161161
bytesToLong = new BytesRefHash(1, bigArrays);
162-
longToBucketOrds = LongKeyedBucketOrds.build(bigArrays, CardinalityUpperBound.MANY);
162+
boolean success = false;
163+
try {
164+
longToBucketOrds = LongKeyedBucketOrds.build(bigArrays, CardinalityUpperBound.MANY);
165+
success = true;
166+
} finally {
167+
if (success == false) {
168+
close();
169+
}
170+
}
163171
}
164172

165173
@Override

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/GlobalOrdinalsStringTermsAggregator.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -812,7 +812,7 @@ class SignificantTermsResults extends ResultStrategy<
812812
private final long supersetSize;
813813
private final SignificanceHeuristic significanceHeuristic;
814814

815-
private LongArray subsetSizes = bigArrays().newLongArray(1, true);
815+
private LongArray subsetSizes;
816816

817817
SignificantTermsResults(
818818
SignificanceLookup significanceLookup,
@@ -822,6 +822,15 @@ class SignificantTermsResults extends ResultStrategy<
822822
backgroundFrequencies = significanceLookup.bytesLookup(bigArrays(), cardinality);
823823
supersetSize = significanceLookup.supersetSize();
824824
this.significanceHeuristic = significanceHeuristic;
825+
boolean success = false;
826+
try {
827+
subsetSizes = bigArrays().newLongArray(1, true);
828+
success = true;
829+
} finally {
830+
if (success == false) {
831+
close();
832+
}
833+
}
825834
}
826835

827836
@Override

server/src/main/java/org/elasticsearch/search/aggregations/bucket/terms/MapStringTermsAggregator.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,11 @@ public MapStringTermsAggregator(
6969
Map<String, Object> metadata
7070
) throws IOException {
7171
super(name, factories, context, parent, order, format, bucketCountThresholds, collectionMode, showTermDocCountError, metadata);
72-
this.collectorSource = collectorSource;
7372
this.resultStrategy = resultStrategy.apply(this); // ResultStrategy needs a reference to the Aggregator to do its job.
7473
this.includeExclude = includeExclude;
7574
bucketOrds = BytesKeyedBucketOrds.build(context.bigArrays(), cardinality);
75+
// set last because if there is an error during construction the collector gets release outside the constructor.
76+
this.collectorSource = collectorSource;
7677
}
7778

7879
@Override
@@ -478,7 +479,7 @@ class SignificantTermsResults extends ResultStrategy<SignificantStringTerms, Sig
478479
private final long supersetSize;
479480
private final SignificanceHeuristic significanceHeuristic;
480481

481-
private LongArray subsetSizes = bigArrays().newLongArray(1, true);
482+
private LongArray subsetSizes;
482483

483484
SignificantTermsResults(
484485
SignificanceLookup significanceLookup,
@@ -488,6 +489,15 @@ class SignificantTermsResults extends ResultStrategy<SignificantStringTerms, Sig
488489
backgroundFrequencies = significanceLookup.bytesLookup(bigArrays(), cardinality);
489490
supersetSize = significanceLookup.supersetSize();
490491
this.significanceHeuristic = significanceHeuristic;
492+
boolean success = false;
493+
try {
494+
subsetSizes = bigArrays().newLongArray(1, true);
495+
success = true;
496+
} finally {
497+
if (success == false) {
498+
close();
499+
}
500+
}
491501
}
492502

493503
@Override

0 commit comments

Comments
 (0)