Skip to content

Commit 49413c3

Browse files
committed
remove ImmutableSetSerializer (+1 squashed commit)
Squashed commits: [ba499d3] Optimizations to spatial join for other geometry types. Changes include trimming insertion keys, and reducing overall algorithm stages.
1 parent f1d32f1 commit 49413c3

11 files changed

Lines changed: 583 additions & 608 deletions

File tree

analytics/spark/src/main/java/mil/nga/giat/geowave/analytic/spark/GeoWaveIndexedRDD.java

Lines changed: 72 additions & 113 deletions
Original file line numberDiff line numberDiff line change
@@ -9,19 +9,18 @@
99
import org.apache.spark.api.java.function.PairFlatMapFunction;
1010
import org.apache.spark.broadcast.Broadcast;
1111
import org.opengis.feature.simple.SimpleFeature;
12-
import org.opengis.geometry.BoundingBox;
1312
import org.slf4j.Logger;
1413
import org.slf4j.LoggerFactory;
1514

15+
import com.google.common.collect.Iterators;
16+
import com.google.common.collect.Lists;
1617
import com.vividsolutions.jts.geom.Envelope;
1718
import com.vividsolutions.jts.geom.Geometry;
18-
19+
import mil.nga.giat.geowave.core.geotime.GeometryUtils;
1920
import mil.nga.giat.geowave.core.index.ByteArrayId;
2021
import mil.nga.giat.geowave.core.index.InsertionIds;
2122
import mil.nga.giat.geowave.core.index.NumericIndexStrategy;
22-
import mil.nga.giat.geowave.core.index.sfc.data.BasicNumericDataset;
23-
import mil.nga.giat.geowave.core.index.sfc.data.NumericData;
24-
import mil.nga.giat.geowave.core.index.sfc.data.NumericRange;
23+
import mil.nga.giat.geowave.core.index.sfc.data.MultiDimensionalNumericData;
2524
import mil.nga.giat.geowave.mapreduce.input.GeoWaveInputKey;
2625
import scala.Tuple2;
2726

@@ -85,64 +84,45 @@ public Iterator<Tuple2<ByteArrayId, Tuple2<GeoWaveInputKey, SimpleFeature>>> cal
8584

8685
// Pull feature to index from tuple
8786
SimpleFeature inputFeature = t._2;
88-
87+
// If we are dealing with null or empty
88+
// geometry we can't properly compare this
89+
// feature.
8990
Geometry geom = (Geometry) inputFeature.getDefaultGeometry();
9091
if (geom == null) {
91-
return result.iterator();
92+
return Iterators.emptyIterator();
9293
}
93-
// Extract bounding box from input feature
94-
BoundingBox bounds = inputFeature.getBounds();
95-
NumericRange xRange = new NumericRange(
96-
bounds.getMinX() - bufferAmount,
97-
bounds.getMaxX() + bufferAmount);
98-
NumericRange yRange = new NumericRange(
99-
bounds.getMinY() - bufferAmount,
100-
bounds.getMaxY() + bufferAmount);
101-
102-
if (bounds.isEmpty()) {
103-
Envelope internalEnvelope = geom.getEnvelopeInternal();
104-
xRange = new NumericRange(
105-
internalEnvelope.getMinX() - bufferAmount,
106-
internalEnvelope.getMaxX() + bufferAmount);
107-
yRange = new NumericRange(
108-
internalEnvelope.getMinY() - bufferAmount,
109-
internalEnvelope.getMaxY() + bufferAmount);
11094

95+
Envelope internalEnvelope = geom.getEnvelopeInternal();
96+
if (internalEnvelope.isNull()) {
97+
return Iterators.emptyIterator();
98+
}
99+
// If we have to buffer geometry for
100+
// predicate expand bounds
101+
internalEnvelope.expandBy(bufferAmount);
102+
103+
// Get data range from expanded envelope
104+
MultiDimensionalNumericData boundsRange = GeometryUtils
105+
.getBoundsFromEnvelope(internalEnvelope);
106+
107+
NumericIndexStrategy index = indexStrategy.value();
108+
InsertionIds insertIds = index.getInsertionIds(
109+
boundsRange,
110+
80);
111+
112+
// If we didnt expand the envelope for
113+
// buffering we can trim the indexIds by the
114+
// geometry
115+
if (bufferAmount == 0.0) {
116+
insertIds = RDDUtils.trimIndexIds(
117+
insertIds,
118+
geom,
119+
index);
111120
}
112121

113-
NumericData[] boundsRange = {
114-
xRange,
115-
yRange
116-
};
117-
118-
// Convert the data to how the api expects
119-
// and index
120-
// using strategy above
121-
BasicNumericDataset convertedBounds = new BasicNumericDataset(
122-
boundsRange);
123-
InsertionIds insertIds = indexStrategy.value().getInsertionIds(
124-
convertedBounds);
125-
126-
// Sometimes the result can span more than
127-
// one row/cell
128-
// of a tier
129-
// When we span more than one row each
130-
// individual get
131-
// added as a separate output pair
132-
// TODO should this use composite IDs or
133-
// just the sort
134-
// keys
135122
for (Iterator<ByteArrayId> iter = insertIds.getCompositeInsertionIds().iterator(); iter
136123
.hasNext();) {
137124
ByteArrayId id = iter.next();
138-
// Id decomposes to byte array of Tier,
139-
// Bin, SFC
140-
// (Hilbert in this case) id)
141-
// There may be value in decomposing the
142-
// id and
143-
// storing tier + sfcIndex as a tuple
144-
// key of the new
145-
// RDD
125+
146126
Tuple2<GeoWaveInputKey, SimpleFeature> valuePair = new Tuple2<>(
147127
t._1,
148128
inputFeature);
@@ -163,90 +143,70 @@ public Iterator<Tuple2<ByteArrayId, Tuple2<GeoWaveInputKey, SimpleFeature>>> cal
163143
}
164144

165145
public JavaPairRDD<ByteArrayId, Tuple2<GeoWaveInputKey, Geometry>> getIndexedGeometryRDD() {
166-
return this.getIndexedGeometryRDD(0.0);
146+
return this.getIndexedGeometryRDD(
147+
0.0,
148+
false);
167149
}
168150

169151
public JavaPairRDD<ByteArrayId, Tuple2<GeoWaveInputKey, Geometry>> getIndexedGeometryRDD(
170-
double bufferAmount ) {
152+
double bufferAmount,
153+
boolean recalculate ) {
171154
verifyParameters();
172155

173156
if (!geowaveRDD.isLoaded()) {
174157
LOGGER.error("Must provide a loaded RDD.");
175158
return null;
176159
}
177-
if (rawGeometryRDD == null) {
178-
JavaPairRDD<ByteArrayId, Tuple2<GeoWaveInputKey, Geometry>> indexedData = geowaveRDD
160+
if (rawGeometryRDD == null || recalculate) {
161+
rawGeometryRDD = geowaveRDD
179162
.getRawRDD()
163+
.filter(t -> (t._2.getDefaultGeometry() != null && !((Geometry)t._2.getDefaultGeometry()).getEnvelopeInternal().isNull()))
180164
.flatMapToPair(
181165
new PairFlatMapFunction<Tuple2<GeoWaveInputKey, SimpleFeature>, ByteArrayId, Tuple2<GeoWaveInputKey, Geometry>>() {
182166
@Override
183167
public Iterator<Tuple2<ByteArrayId, Tuple2<GeoWaveInputKey, Geometry>>> call(
184168
Tuple2<GeoWaveInputKey, SimpleFeature> t )
185169
throws Exception {
186170

187-
// Flattened output array.
188-
List<Tuple2<ByteArrayId, Tuple2<GeoWaveInputKey, Geometry>>> result = new ArrayList<>();
189-
190171
// Pull feature to index from tuple
191172
SimpleFeature inputFeature = t._2;
192-
173+
// If we are dealing with null or empty
174+
// geometry we can't properly compare this
175+
// feature.
193176
Geometry geom = (Geometry) inputFeature.getDefaultGeometry();
194-
if (geom == null) {
195-
return result.iterator();
196-
}
197-
// Extract bounding box from input feature
198-
BoundingBox bounds = inputFeature.getBounds();
199-
NumericRange xRange = new NumericRange(
200-
bounds.getMinX() - bufferAmount,
201-
bounds.getMaxX() + bufferAmount);
202-
NumericRange yRange = new NumericRange(
203-
bounds.getMinY() - bufferAmount,
204-
bounds.getMaxY() + bufferAmount);
205-
206-
if (bounds.isEmpty()) {
207-
Envelope internalEnvelope = geom.getEnvelopeInternal();
208-
xRange = new NumericRange(
209-
internalEnvelope.getMinX() - bufferAmount,
210-
internalEnvelope.getMaxX() + bufferAmount);
211-
yRange = new NumericRange(
212-
internalEnvelope.getMinY() - bufferAmount,
213-
internalEnvelope.getMaxY() + bufferAmount);
214177

178+
Envelope internalEnvelope = geom.getEnvelopeInternal();
179+
// If we have to buffer geometry for
180+
// predicate expand bounds
181+
internalEnvelope.expandBy(bufferAmount);
182+
183+
// Get data range from expanded envelope
184+
MultiDimensionalNumericData boundsRange = GeometryUtils
185+
.getBoundsFromEnvelope(internalEnvelope);
186+
187+
NumericIndexStrategy index = indexStrategy.value();
188+
InsertionIds insertIds = index.getInsertionIds(
189+
boundsRange,
190+
80);
191+
192+
// If we didnt expand the envelope for
193+
// buffering we can trim the indexIds by the
194+
// geometry
195+
if (bufferAmount == 0.0) {
196+
insertIds = RDDUtils.trimIndexIds(
197+
insertIds,
198+
geom,
199+
index);
215200
}
216201

217-
NumericData[] boundsRange = {
218-
xRange,
219-
yRange
220-
};
221-
222-
// Convert the data to how the api expects
223-
// and index
224-
// using strategy above
225-
BasicNumericDataset convertedBounds = new BasicNumericDataset(
226-
boundsRange);
227-
InsertionIds insertIds = indexStrategy.value().getInsertionIds(
228-
convertedBounds);
229-
230-
// Sometimes the result can span more than
231-
// one row/cell
232-
// of a tier
233-
// When we span more than one row each
234-
// individual get
235-
// added as a separate output pair
236-
// TODO should this use composite IDs or
237-
// just the sort
238-
// keys
202+
// Flattened output array.
203+
List<Tuple2<ByteArrayId, Tuple2<GeoWaveInputKey, Geometry>>> result = Lists
204+
.newArrayListWithCapacity(insertIds.getSize());
205+
239206
for (Iterator<ByteArrayId> iter = insertIds.getCompositeInsertionIds().iterator(); iter
240207
.hasNext();) {
241208
ByteArrayId id = iter.next();
242-
// Id decomposes to byte array of Tier,
243-
// Bin, SFC
244-
// (Hilbert in this case) id)
245-
// There may be value in decomposing the
246-
// id and
247-
// storing tier + sfcIndex as a tuple
248-
// key of the new
249-
// RDD
209+
250210
Tuple2<GeoWaveInputKey, Geometry> valuePair = new Tuple2<>(
251211
t._1,
252212
geom);
@@ -260,7 +220,6 @@ public Iterator<Tuple2<ByteArrayId, Tuple2<GeoWaveInputKey, Geometry>>> call(
260220
}
261221

262222
});
263-
rawGeometryRDD = indexedData;
264223
}
265224

266225
return rawGeometryRDD;

analytics/spark/src/main/java/mil/nga/giat/geowave/analytic/spark/GeoWaveRDDLoader.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import org.slf4j.Logger;
1212
import org.slf4j.LoggerFactory;
1313

14+
import com.vividsolutions.jts.geom.prep.PreparedGeometryFactory;
15+
1416
import mil.nga.giat.geowave.core.index.NumericIndexStrategy;
1517
import mil.nga.giat.geowave.core.store.cli.remote.options.DataStorePluginOptions;
1618
import mil.nga.giat.geowave.mapreduce.input.GeoWaveInputFormat;
@@ -65,6 +67,7 @@ public static GeoWaveIndexedRDD loadIndexedRDD(
6567
sc,
6668
indexStrategy);
6769
}
70+
6871
GeoWaveIndexedRDD returnRDD = new GeoWaveIndexedRDD(
6972
wrappedRDD,
7073
broadcastStrategy);
@@ -86,6 +89,7 @@ public static GeoWaveIndexedRDD loadIndexedRDD(
8689
sc,
8790
indexStrategy);
8891
}
92+
8993
GeoWaveIndexedRDD returnRDD = new GeoWaveIndexedRDD(
9094
inputRDD,
9195
broadcastStrategy);

analytics/spark/src/main/java/mil/nga/giat/geowave/analytic/spark/GeoWaveRegistrator.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,14 @@
44
import org.geotools.feature.simple.SimpleFeatureImpl;
55

66
import com.esotericsoftware.kryo.Kryo;
7+
import com.vividsolutions.jts.geom.Geometry;
8+
import com.vividsolutions.jts.geom.prep.PreparedGeometry;
79

810
import mil.nga.giat.geowave.analytic.kryo.FeatureSerializer;
911
import mil.nga.giat.geowave.analytic.kryo.PersistableSerializer;
12+
import mil.nga.giat.geowave.core.index.ByteArrayId;
1013
import mil.nga.giat.geowave.core.index.persist.PersistableFactory;
14+
import mil.nga.giat.geowave.mapreduce.input.GeoWaveInputKey;
1115

1216
public class GeoWaveRegistrator implements
1317
KryoRegistrator
@@ -23,6 +27,10 @@ public void registerClasses(
2327

2428
kryo.register(GeoWaveRDD.class);
2529
kryo.register(GeoWaveIndexedRDD.class);
30+
kryo.register(Geometry.class);
31+
kryo.register(PreparedGeometry.class);
32+
kryo.register(ByteArrayId.class);
33+
kryo.register(GeoWaveInputKey.class);
2634
kryo.register(
2735
SimpleFeatureImpl.class,
2836
simpleSerializer);

analytics/spark/src/main/java/mil/nga/giat/geowave/analytic/spark/RDDUtils.java

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import java.io.IOException;
44
import java.util.Date;
5+
import java.util.Iterator;
56

67
import org.apache.hadoop.conf.Configuration;
78
import org.apache.hadoop.mapreduce.Job;
@@ -10,17 +11,26 @@
1011
import org.apache.spark.broadcast.Broadcast;
1112
import org.apache.spark.mllib.linalg.Vector;
1213
import org.apache.spark.mllib.linalg.Vectors;
14+
import org.geotools.geometry.jts.JTS;
1315
import org.opengis.feature.simple.SimpleFeature;
1416
import org.slf4j.Logger;
1517
import org.slf4j.LoggerFactory;
1618

19+
import com.vividsolutions.jts.geom.Envelope;
1720
import com.vividsolutions.jts.geom.Geometry;
21+
import com.vividsolutions.jts.geom.LineSegment;
22+
import com.vividsolutions.jts.geom.LineString;
1823
import com.vividsolutions.jts.geom.Point;
24+
import com.vividsolutions.jts.geom.Polygon;
25+
import com.vividsolutions.jts.operation.predicate.RectangleIntersects;
1926

2027
import mil.nga.giat.geowave.adapter.vector.FeatureDataAdapter;
2128
import mil.nga.giat.geowave.core.geotime.store.query.ScaledTemporalRange;
2229
import mil.nga.giat.geowave.core.index.ByteArrayId;
30+
import mil.nga.giat.geowave.core.index.InsertionIds;
2331
import mil.nga.giat.geowave.core.index.NumericIndexStrategy;
32+
import mil.nga.giat.geowave.core.index.SinglePartitionInsertionIds;
33+
import mil.nga.giat.geowave.core.index.sfc.data.MultiDimensionalNumericData;
2434
import mil.nga.giat.geowave.core.store.adapter.DataAdapter;
2535
import mil.nga.giat.geowave.core.store.cli.remote.options.DataStorePluginOptions;
2636
import mil.nga.giat.geowave.core.store.index.PrimaryIndex;
@@ -170,6 +180,38 @@ public static JavaRDD<Vector> rddFeatureVectors(
170180
return vectorRDD;
171181
}
172182

183+
public static InsertionIds trimIndexIds(
184+
InsertionIds rawIds,
185+
Geometry geom,
186+
NumericIndexStrategy index ) {
187+
for (final SinglePartitionInsertionIds insertionId : rawIds.getPartitionKeys()) {
188+
final ByteArrayId partitionKey = insertionId.getPartitionKey();
189+
final int size = insertionId.getSortKeys().size();
190+
if (size > 3) {
191+
final Iterator<ByteArrayId> it = insertionId.getSortKeys().iterator();
192+
while (it.hasNext()) {
193+
final ByteArrayId sortKey = it.next();
194+
MultiDimensionalNumericData keyTile = index.getRangeForId(
195+
partitionKey,
196+
sortKey);
197+
Envelope other = new Envelope();
198+
other.init(
199+
keyTile.getMinValuesPerDimension()[0],
200+
keyTile.getMaxValuesPerDimension()[0],
201+
keyTile.getMinValuesPerDimension()[1],
202+
keyTile.getMaxValuesPerDimension()[1]);
203+
Polygon rect = JTS.toGeometry(other);
204+
if (!RectangleIntersects.intersects(
205+
rect,
206+
geom)) {
207+
it.remove();
208+
}
209+
}
210+
}
211+
}
212+
return rawIds;
213+
}
214+
173215
/**
174216
* Translate a set of objects in a JavaRDD to a provided type and push to
175217
* GeoWave

analytics/spark/src/main/java/mil/nga/giat/geowave/analytic/spark/sparksql/udf/GeomFunction.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
package mil.nga.giat.geowave.analytic.spark.sparksql.udf;
22

33
import org.apache.spark.sql.api.java.UDF2;
4-
import org.slf4j.Logger;
5-
import org.slf4j.LoggerFactory;
6-
74
import com.vividsolutions.jts.geom.Geometry;
85
import com.vividsolutions.jts.io.ParseException;
96

0 commit comments

Comments
 (0)