Skip to content

Commit 9626dd0

Browse files
committed
make the version map cache per-segment
1 parent affe2f2 commit 9626dd0

2 files changed

Lines changed: 96 additions & 98 deletions

File tree

core/src/main/java/org/elasticsearch/common/lucene/uid/PerThreadIDAndVersionLookup.java

Lines changed: 66 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import org.apache.lucene.index.Fields;
2727
import org.apache.lucene.index.IndexReader;
28+
import org.apache.lucene.index.LeafReader;
2829
import org.apache.lucene.index.LeafReaderContext;
2930
import org.apache.lucene.index.NumericDocValues;
3031
import org.apache.lucene.index.PostingsEnum;
@@ -47,107 +48,86 @@
4748
* finds. */
4849

4950
final class PerThreadIDAndVersionLookup {
50-
51-
private final LeafReaderContext[] readerContexts;
52-
private final TermsEnum[] termsEnums;
53-
private final PostingsEnum[] docsEnums;
54-
// Only used for back compat, to lookup a version from payload:
55-
private final PostingsEnum[] posEnums;
56-
private final Bits[] liveDocs;
57-
private final NumericDocValues[] versions;
58-
private final int numSegs;
59-
private final boolean hasDeletions;
60-
private final boolean[] hasPayloads;
61-
62-
public PerThreadIDAndVersionLookup(IndexReader r) throws IOException {
63-
64-
List<LeafReaderContext> leaves = new ArrayList<>(r.leaves());
65-
66-
readerContexts = leaves.toArray(new LeafReaderContext[leaves.size()]);
67-
termsEnums = new TermsEnum[leaves.size()];
68-
docsEnums = new PostingsEnum[leaves.size()];
69-
posEnums = new PostingsEnum[leaves.size()];
70-
liveDocs = new Bits[leaves.size()];
71-
versions = new NumericDocValues[leaves.size()];
72-
hasPayloads = new boolean[leaves.size()];
73-
int numSegs = 0;
74-
boolean hasDeletions = false;
75-
// iterate backwards to optimize for the frequently updated documents
76-
// which are likely to be in the last segments
77-
for(int i=leaves.size()-1;i>=0;i--) {
78-
LeafReaderContext readerContext = leaves.get(i);
79-
Fields fields = readerContext.reader().fields();
80-
if (fields != null) {
81-
Terms terms = fields.terms(UidFieldMapper.NAME);
82-
if (terms != null) {
83-
readerContexts[numSegs] = readerContext;
84-
hasPayloads[numSegs] = terms.hasPayloads();
85-
termsEnums[numSegs] = terms.iterator();
86-
assert termsEnums[numSegs] != null;
87-
liveDocs[numSegs] = readerContext.reader().getLiveDocs();
88-
hasDeletions |= readerContext.reader().hasDeletions();
89-
versions[numSegs] = readerContext.reader().getNumericDocValues(VersionFieldMapper.NAME);
90-
numSegs++;
91-
}
51+
// TODO: do we really need to store all this stuff? some if it might not speed up anything.
52+
// we keep it around for now, to reduce the amount of e.g. hash lookups by field and stuff
53+
54+
/** terms enum for uid field */
55+
private final TermsEnum termsEnum;
56+
/** _version data */
57+
private final NumericDocValues versions;
58+
/** Only true when versions are indexed as payloads instead of docvalues */
59+
private final boolean hasPayloads;
60+
/** Reused for iteration (when the term exists) */
61+
private PostingsEnum docsEnum;
62+
/** Only used for back compat, to lookup a version from payload */
63+
private PostingsEnum posEnum;
64+
65+
/**
66+
* Initialize lookup for the provided segment
67+
*/
68+
public PerThreadIDAndVersionLookup(LeafReader reader) throws IOException {
69+
TermsEnum termsEnum = null;
70+
NumericDocValues versions = null;
71+
boolean hasPayloads = false;
72+
73+
Fields fields = reader.fields();
74+
if (fields != null) {
75+
Terms terms = fields.terms(UidFieldMapper.NAME);
76+
if (terms != null) {
77+
hasPayloads = terms.hasPayloads();
78+
termsEnum = terms.iterator();
79+
assert termsEnum != null;
80+
versions = reader.getNumericDocValues(VersionFieldMapper.NAME);
9281
}
9382
}
94-
this.numSegs = numSegs;
95-
this.hasDeletions = hasDeletions;
83+
84+
this.versions = versions;
85+
this.termsEnum = termsEnum;
86+
this.hasPayloads = hasPayloads;
9687
}
9788

9889
/** Return null if id is not found. */
99-
public DocIdAndVersion lookup(BytesRef id) throws IOException {
100-
for(int seg=0;seg<numSegs;seg++) {
101-
if (termsEnums[seg].seekExact(id)) {
102-
103-
NumericDocValues segVersions = versions[seg];
104-
if (segVersions != null || hasPayloads[seg] == false) {
105-
// Use NDV to retrieve the version, in which case we only need PostingsEnum:
106-
107-
// there may be more than one matching docID, in the case of nested docs, so we want the last one:
108-
PostingsEnum docs = docsEnums[seg] = termsEnums[seg].postings(docsEnums[seg], 0);
109-
final Bits liveDocs = this.liveDocs[seg];
110-
int docID = DocIdSetIterator.NO_MORE_DOCS;
111-
for (int d = docs.nextDoc(); d != DocIdSetIterator.NO_MORE_DOCS; d = docs.nextDoc()) {
112-
if (liveDocs != null && liveDocs.get(d) == false) {
113-
continue;
114-
}
115-
docID = d;
90+
public DocIdAndVersion lookup(BytesRef id, Bits liveDocs, LeafReaderContext context) throws IOException {
91+
if (termsEnum.seekExact(id)) {
92+
if (versions != null || hasPayloads == false) {
93+
// Use NDV to retrieve the version, in which case we only need PostingsEnum:
94+
95+
// there may be more than one matching docID, in the case of nested docs, so we want the last one:
96+
docsEnum = termsEnum.postings(docsEnum, 0);
97+
int docID = DocIdSetIterator.NO_MORE_DOCS;
98+
for (int d = docsEnum.nextDoc(); d != DocIdSetIterator.NO_MORE_DOCS; d = docsEnum.nextDoc()) {
99+
if (liveDocs != null && liveDocs.get(d) == false) {
100+
continue;
116101
}
102+
docID = d;
103+
}
117104

118-
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
119-
if (segVersions != null) {
120-
return new DocIdAndVersion(docID, segVersions.get(docID), readerContexts[seg]);
121-
} else {
122-
// _uid found, but no doc values and no payloads
123-
return new DocIdAndVersion(docID, Versions.NOT_SET, readerContexts[seg]);
124-
}
105+
if (docID != DocIdSetIterator.NO_MORE_DOCS) {
106+
if (versions != null) {
107+
return new DocIdAndVersion(docID, versions.get(docID), context);
125108
} else {
126-
assert hasDeletions;
127-
continue;
109+
// _uid found, but no doc values and no payloads
110+
return new DocIdAndVersion(docID, Versions.NOT_SET, context);
128111
}
129112
}
113+
}
130114

131-
// ... but used to be stored as payloads; in this case we must use PostingsEnum
132-
PostingsEnum dpe = posEnums[seg] = termsEnums[seg].postings(posEnums[seg], PostingsEnum.PAYLOADS);
133-
assert dpe != null; // terms has payloads
134-
final Bits liveDocs = this.liveDocs[seg];
135-
for (int d = dpe.nextDoc(); d != DocIdSetIterator.NO_MORE_DOCS; d = dpe.nextDoc()) {
136-
if (liveDocs != null && liveDocs.get(d) == false) {
137-
continue;
138-
}
139-
dpe.nextPosition();
140-
final BytesRef payload = dpe.getPayload();
141-
if (payload != null && payload.length == 8) {
142-
// TODO: does this break the nested docs case? we are not returning the last matching docID here?
143-
return new DocIdAndVersion(d, Numbers.bytesToLong(payload), readerContexts[seg]);
144-
}
115+
// ... but used to be stored as payloads; in this case we must use PostingsEnum
116+
posEnum = termsEnum.postings(posEnum, PostingsEnum.PAYLOADS);
117+
assert posEnum != null; // terms has payloads
118+
for (int d = posEnum.nextDoc(); d != DocIdSetIterator.NO_MORE_DOCS; d = posEnum.nextDoc()) {
119+
if (liveDocs != null && liveDocs.get(d) == false) {
120+
continue;
121+
}
122+
posEnum.nextPosition();
123+
final BytesRef payload = posEnum.getPayload();
124+
if (payload != null && payload.length == 8) {
125+
// TODO: does this break the nested docs case? we are not returning the last matching docID here?
126+
return new DocIdAndVersion(d, Numbers.bytesToLong(payload), context);
145127
}
146128
}
147129
}
148130

149131
return null;
150132
}
151-
152-
// TODO: add reopen method to carry over re-used enums...?
153133
}

core/src/main/java/org/elasticsearch/common/lucene/uid/Versions.java

Lines changed: 30 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,16 @@
2020
package org.elasticsearch.common.lucene.uid;
2121

2222
import org.apache.lucene.index.IndexReader;
23-
import org.apache.lucene.index.IndexReader.ReaderClosedListener;
23+
import org.apache.lucene.index.LeafReader;
24+
import org.apache.lucene.index.LeafReader.CoreClosedListener;
2425
import org.apache.lucene.index.LeafReaderContext;
2526
import org.apache.lucene.index.Term;
2627
import org.apache.lucene.util.CloseableThreadLocal;
2728
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
2829
import org.elasticsearch.index.mapper.internal.UidFieldMapper;
2930

3031
import java.io.IOException;
32+
import java.util.List;
3133
import java.util.concurrent.ConcurrentMap;
3234

3335
/** Utility class to resolve the Lucene doc ID and version for a given uid. */
@@ -52,30 +54,31 @@ public class Versions {
5254
public static final long MATCH_DELETED = -4L;
5355

5456
// TODO: is there somewhere else we can store these?
55-
private static final ConcurrentMap<IndexReader, CloseableThreadLocal<PerThreadIDAndVersionLookup>> lookupStates = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
57+
private static final ConcurrentMap<Object, CloseableThreadLocal<PerThreadIDAndVersionLookup>> lookupStates = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
5658

5759
// Evict this reader from lookupStates once it's closed:
58-
private static final ReaderClosedListener removeLookupState = new ReaderClosedListener() {
60+
private static final CoreClosedListener removeLookupState = new CoreClosedListener() {
5961
@Override
60-
public void onClose(IndexReader reader) {
61-
CloseableThreadLocal<PerThreadIDAndVersionLookup> ctl = lookupStates.remove(reader);
62+
public void onClose(Object key) {
63+
CloseableThreadLocal<PerThreadIDAndVersionLookup> ctl = lookupStates.remove(key);
6264
if (ctl != null) {
6365
ctl.close();
6466
}
6567
}
6668
};
6769

68-
private static PerThreadIDAndVersionLookup getLookupState(IndexReader reader) throws IOException {
69-
CloseableThreadLocal<PerThreadIDAndVersionLookup> ctl = lookupStates.get(reader);
70+
private static PerThreadIDAndVersionLookup getLookupState(LeafReader reader) throws IOException {
71+
Object key = reader.getCoreCacheKey();
72+
CloseableThreadLocal<PerThreadIDAndVersionLookup> ctl = lookupStates.get(key);
7073
if (ctl == null) {
71-
// First time we are seeing this reader; make a
74+
// First time we are seeing this reader's core; make a
7275
// new CTL:
7376
ctl = new CloseableThreadLocal<>();
74-
CloseableThreadLocal<PerThreadIDAndVersionLookup> other = lookupStates.putIfAbsent(reader, ctl);
77+
CloseableThreadLocal<PerThreadIDAndVersionLookup> other = lookupStates.putIfAbsent(key, ctl);
7578
if (other == null) {
7679
// Our CTL won, we must remove it when the
77-
// reader is closed:
78-
reader.addReaderClosedListener(removeLookupState);
80+
// core is closed:
81+
reader.addCoreClosedListener(removeLookupState);
7982
} else {
8083
// Another thread beat us to it: just use
8184
// their CTL:
@@ -116,7 +119,22 @@ public DocIdAndVersion(int docId, long version, LeafReaderContext context) {
116119
*/
117120
public static DocIdAndVersion loadDocIdAndVersion(IndexReader reader, Term term) throws IOException {
118121
assert term.field().equals(UidFieldMapper.NAME);
119-
return getLookupState(reader).lookup(term.bytes());
122+
List<LeafReaderContext> leaves = reader.leaves();
123+
if (leaves.isEmpty()) {
124+
return null;
125+
}
126+
// iterate backwards to optimize for the frequently updated documents
127+
// which are likely to be in the last segments
128+
for (int i = leaves.size() - 1; i >= 0; i--) {
129+
LeafReaderContext context = leaves.get(i);
130+
LeafReader leaf = context.reader();
131+
PerThreadIDAndVersionLookup lookup = getLookupState(leaf);
132+
DocIdAndVersion result = lookup.lookup(term.bytes(), leaf.getLiveDocs(), context);
133+
if (result != null) {
134+
return result;
135+
}
136+
}
137+
return null;
120138
}
121139

122140
/**

0 commit comments

Comments
 (0)