Skip to content

Commit 0e648e8

Browse files
author
Derek Yeager
committed
Added support for tablets to return a subset of fieldIds
1 parent e197482 commit 0e648e8

File tree

6 files changed

+513
-6
lines changed

6 files changed

+513
-6
lines changed

core/store/src/main/java/mil/nga/giat/geowave/core/store/DataStore.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import mil.nga.giat.geowave.core.store.data.VisibilityWriter;
1010
import mil.nga.giat.geowave.core.store.index.Index;
1111
import mil.nga.giat.geowave.core.store.query.Query;
12+
import mil.nga.giat.geowave.core.store.query.QueryOptions;
1213

1314
/**
1415
* A DataStore can both ingest and query data based on persisted indices and
@@ -265,6 +266,29 @@ public <T> CloseableIterator<T> query(
265266
Index index,
266267
final Query query );
267268

269+
/**
270+
* Returns all data in this data store that matches the query parameter
271+
* within the index described by the index passed in. All data types that
272+
* match the query will be returned as an instance of the native data type
273+
* that was originally ingested. Additional query options will be applied to
274+
* results
275+
*
276+
* @param index
277+
* The index information to query against. All data within the
278+
* index of this index ID will be queried and returned.
279+
* @param query
280+
* The description of the query to be performed
281+
* @param queryOptions
282+
* Additional options to be applied to the query results
283+
* @return An iterator on all results that match the query. The iterator
284+
* implements Closeable and it is best practice to close the
285+
* iterator after it is no longer needed.
286+
*/
287+
public <T> CloseableIterator<T> query(
288+
Index index,
289+
final Query query,
290+
final QueryOptions queryOptions );
291+
268292
/**
269293
* Returns all data in this data store that matches the query parameter
270294
* within the index described by the index passed in and matches the adapter
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package mil.nga.giat.geowave.core.store.query;
2+
3+
import java.util.Collection;
4+
import java.util.Collections;
5+
6+
/**
7+
* Container object that encapsulates additional options to be applied to a
8+
* {@link Query}
9+
*
10+
* @since 0.8.7
11+
*/
12+
public class QueryOptions
13+
{
14+
private Collection<String> fieldIds;
15+
16+
/**
17+
* @param fieldIds
18+
* the desired subset of fieldIds to be included in query results
19+
*/
20+
public QueryOptions(
21+
Collection<String> fieldIds ) {
22+
super();
23+
this.fieldIds = fieldIds;
24+
}
25+
26+
/**
27+
* @return the fieldIds or an empty List, will never return null
28+
*/
29+
public Collection<String> getFieldIds() {
30+
if (fieldIds == null) {
31+
fieldIds = Collections.emptyList();
32+
}
33+
return fieldIds;
34+
}
35+
36+
/**
37+
* @param fieldIds
38+
* the desired subset of fieldIds to be included in query results
39+
*/
40+
public void setFieldIds(
41+
Collection<String> fieldIds ) {
42+
this.fieldIds = fieldIds;
43+
}
44+
45+
}

extensions/datastores/accumulo/src/main/java/mil/nga/giat/geowave/datastore/accumulo/AccumuloDataStore.java

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import java.util.Map;
1111
import java.util.Map.Entry;
1212

13-
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
1413
import mil.nga.giat.geowave.core.index.ByteArrayId;
1514
import mil.nga.giat.geowave.core.index.ByteArrayRange;
1615
import mil.nga.giat.geowave.core.index.ByteArrayUtils;
@@ -37,6 +36,7 @@
3736
import mil.nga.giat.geowave.core.store.index.Index;
3837
import mil.nga.giat.geowave.core.store.index.IndexStore;
3938
import mil.nga.giat.geowave.core.store.query.Query;
39+
import mil.nga.giat.geowave.core.store.query.QueryOptions;
4040
import mil.nga.giat.geowave.datastore.accumulo.metadata.AccumuloAdapterStore;
4141
import mil.nga.giat.geowave.datastore.accumulo.metadata.AccumuloDataStatisticsStore;
4242
import mil.nga.giat.geowave.datastore.accumulo.metadata.AccumuloIndexStore;
@@ -73,6 +73,8 @@
7373

7474
import com.google.common.collect.Iterators;
7575

76+
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
77+
7678
/**
7779
* This is the Accumulo implementation of the data store. It requires an
7880
* AccumuloOperations instance that describes how to connect (read/write data)
@@ -998,6 +1000,7 @@ private CloseableIterator<?> query(
9981000
adapterStore,
9991001
limit,
10001002
scanCallback,
1003+
null,
10011004
authorizations);
10021005
}
10031006
catch (final IOException e) {
@@ -1021,6 +1024,7 @@ private CloseableIterator<?> query(
10211024
final AdapterStore adapterStore,
10221025
final Integer limit,
10231026
final ScanCallback<?> scanCallback,
1027+
final QueryOptions queryOptions,
10241028
final String... authorizations ) {
10251029
// query the indices that are supported for this query object, and these
10261030
// data adapter Ids
@@ -1055,6 +1059,10 @@ else if (query.isSupported(index)) {
10551059
else {
10561060
continue;
10571061
}
1062+
if ((queryOptions != null) && (!queryOptions.getFieldIds().isEmpty())) {
1063+
// results should contain subset of fieldIds
1064+
accumuloQuery.setFieldIds(queryOptions.getFieldIds());
1065+
}
10581066
results.add(accumuloQuery.query(
10591067
accumuloOperations,
10601068
adapterStore,
@@ -1095,9 +1103,22 @@ public <T> CloseableIterator<T> query(
10951103
return query(
10961104
index,
10971105
query,
1106+
null,
10981107
null);
10991108
}
11001109

1110+
@Override
1111+
public <T> CloseableIterator<T> query(
1112+
Index index,
1113+
final Query query,
1114+
final QueryOptions queryOptions ) {
1115+
return query(
1116+
index,
1117+
query,
1118+
null,
1119+
queryOptions);
1120+
}
1121+
11011122
@Override
11021123
public <T> CloseableIterator<T> query(
11031124
final Index index,
@@ -1106,7 +1127,8 @@ public <T> CloseableIterator<T> query(
11061127
return query(
11071128
index,
11081129
query,
1109-
(Integer) limit);
1130+
(Integer) limit,
1131+
null);
11101132
}
11111133

11121134
@Override
@@ -1123,7 +1145,8 @@ public CloseableIterator<?> query(
11231145
private <T> CloseableIterator<T> query(
11241146
final Index index,
11251147
final Query query,
1126-
final Integer limit ) {
1148+
final Integer limit,
1149+
final QueryOptions queryOptions ) {
11271150
if ((query != null) && !query.isSupported(index)) {
11281151
throw new IllegalArgumentException(
11291152
"Index does not support the query");
@@ -1138,6 +1161,8 @@ private <T> CloseableIterator<T> query(
11381161
}).iterator()),
11391162
adapterStore,
11401163
limit,
1164+
null,
1165+
queryOptions,
11411166
null);
11421167
}
11431168

@@ -1199,6 +1224,7 @@ public <T> CloseableIterator<T> query(
11991224
}),
12001225
limit,
12011226
scanCallback,
1227+
null,
12021228
authorizations);
12031229
}
12041230

extensions/datastores/accumulo/src/main/java/mil/nga/giat/geowave/datastore/accumulo/query/AccumuloFilteredIndexQuery.java

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,30 @@
11
package mil.nga.giat.geowave.datastore.accumulo.query;
22

3+
import java.io.IOException;
4+
import java.util.Collection;
5+
import java.util.HashSet;
36
import java.util.Iterator;
47
import java.util.List;
8+
import java.util.Set;
59

610
import mil.nga.giat.geowave.core.index.ByteArrayId;
711
import mil.nga.giat.geowave.core.index.StringUtils;
812
import mil.nga.giat.geowave.core.store.CloseableIterator;
913
import mil.nga.giat.geowave.core.store.ScanCallback;
1014
import mil.nga.giat.geowave.core.store.adapter.AdapterStore;
15+
import mil.nga.giat.geowave.core.store.adapter.DataAdapter;
16+
import mil.nga.giat.geowave.core.store.dimension.DimensionField;
1117
import mil.nga.giat.geowave.core.store.filter.FilterList;
1218
import mil.nga.giat.geowave.core.store.filter.QueryFilter;
19+
import mil.nga.giat.geowave.core.store.index.CommonIndexValue;
1320
import mil.nga.giat.geowave.core.store.index.Index;
1421
import mil.nga.giat.geowave.datastore.accumulo.AccumuloOperations;
1522
import mil.nga.giat.geowave.datastore.accumulo.util.CloseableIteratorWrapper;
16-
import mil.nga.giat.geowave.datastore.accumulo.util.EntryIteratorWrapper;
1723
import mil.nga.giat.geowave.datastore.accumulo.util.CloseableIteratorWrapper.ScannerClosableWrapper;
24+
import mil.nga.giat.geowave.datastore.accumulo.util.EntryIteratorWrapper;
1825

1926
import org.apache.accumulo.core.client.ScannerBase;
27+
import org.apache.hadoop.io.Text;
2028
import org.apache.log4j.Logger;
2129

2230
import com.google.common.collect.Iterators;
@@ -27,6 +35,7 @@ public abstract class AccumuloFilteredIndexQuery extends
2735
protected List<QueryFilter> clientFilters;
2836
private final static Logger LOGGER = Logger.getLogger(AccumuloFilteredIndexQuery.class);
2937
protected final ScanCallback<?> scanCallback;
38+
private Collection<String> fieldIds = null;
3039

3140
public AccumuloFilteredIndexQuery(
3241
final Index index,
@@ -59,6 +68,15 @@ protected void setClientFilters(
5968
this.clientFilters = clientFilters;
6069
}
6170

71+
public Collection<String> getFieldIds() {
72+
return fieldIds;
73+
}
74+
75+
public void setFieldIds(
76+
Collection<String> fieldIds ) {
77+
this.fieldIds = fieldIds;
78+
}
79+
6280
protected abstract void addScanIteratorSettings(
6381
final ScannerBase scanner );
6482

@@ -87,6 +105,14 @@ public CloseableIterator<?> query(
87105
accumuloOperations,
88106
limit);
89107

108+
// a subset of fieldIds is being requested
109+
if (fieldIds != null && !fieldIds.isEmpty()) {
110+
// configure scanner to fetch only the fieldIds specified
111+
handleSubsetOfFieldIds(
112+
scanner,
113+
adapterStore.getAdapters());
114+
}
115+
90116
if (scanner == null) {
91117
LOGGER.error("Could not get scanner instance, getScanner returned null");
92118
return new CloseableIterator.Empty();
@@ -117,4 +143,46 @@ protected Iterator initIterator(
117143
clientFilters),
118144
scanCallback);
119145
}
146+
147+
private void handleSubsetOfFieldIds(
148+
final ScannerBase scanner,
149+
final CloseableIterator<DataAdapter<?>> dataAdapters ) {
150+
151+
Set<ByteArrayId> uniqueDimensions = new HashSet<>();
152+
for (final DimensionField<? extends CommonIndexValue> dimension : index.getIndexModel().getDimensions()) {
153+
uniqueDimensions.add(dimension.getFieldId());
154+
}
155+
156+
while (dataAdapters.hasNext()) {
157+
158+
final Text colFam = new Text(
159+
dataAdapters.next().getAdapterId().getBytes());
160+
161+
// dimension fields must be included
162+
for (ByteArrayId dimension : uniqueDimensions) {
163+
scanner.fetchColumn(
164+
colFam,
165+
new Text(
166+
dimension.getBytes()));
167+
}
168+
169+
// configure scanner to fetch only the specified fieldIds
170+
for (String fieldId : fieldIds) {
171+
scanner.fetchColumn(
172+
colFam,
173+
new Text(
174+
StringUtils.stringToBinary(fieldId)));
175+
}
176+
}
177+
178+
try {
179+
dataAdapters.close();
180+
}
181+
catch (IOException e) {
182+
LOGGER.error(
183+
"Unable to close iterator",
184+
e);
185+
}
186+
187+
}
120188
}

test/src/test/java/mil/nga/giat/geowave/test/GeoWaveITSuite.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@
33
import mil.nga.giat.geowave.test.mapreduce.BasicMapReduceIT;
44
import mil.nga.giat.geowave.test.mapreduce.GeoWaveKMeansIT;
55
import mil.nga.giat.geowave.test.mapreduce.KDERasterResizeIT;
6+
import mil.nga.giat.geowave.test.query.AttributesSubsetQueryIT;
67
import mil.nga.giat.geowave.test.service.GeoServerIT;
78
import mil.nga.giat.geowave.test.service.GeoWaveIngestGeoserverIT;
89
import mil.nga.giat.geowave.test.service.GeoWaveServicesIT;
910
import mil.nga.giat.geowave.test.service.ServicesTestEnvironment;
1011

1112
import org.junit.AfterClass;
12-
import org.junit.Assert;
1313
import org.junit.BeforeClass;
1414
import org.junit.runner.RunWith;
1515
import org.junit.runners.Suite;
@@ -25,7 +25,8 @@
2525
GeoWaveKMeansIT.class,
2626
GeoServerIT.class,
2727
GeoWaveServicesIT.class,
28-
GeoWaveIngestGeoserverIT.class
28+
GeoWaveIngestGeoserverIT.class,
29+
AttributesSubsetQueryIT.class
2930
})
3031
public class GeoWaveITSuite
3132
{

0 commit comments

Comments
 (0)