99import org .apache .spark .api .java .function .PairFlatMapFunction ;
1010import org .apache .spark .broadcast .Broadcast ;
1111import org .opengis .feature .simple .SimpleFeature ;
12- import org .opengis .geometry .BoundingBox ;
1312import org .slf4j .Logger ;
1413import org .slf4j .LoggerFactory ;
1514
15+ import com .google .common .collect .Iterators ;
16+ import com .google .common .collect .Lists ;
1617import com .vividsolutions .jts .geom .Envelope ;
1718import com .vividsolutions .jts .geom .Geometry ;
18-
19+ import mil . nga . giat . geowave . core . geotime . GeometryUtils ;
1920import mil .nga .giat .geowave .core .index .ByteArrayId ;
2021import mil .nga .giat .geowave .core .index .InsertionIds ;
2122import mil .nga .giat .geowave .core .index .NumericIndexStrategy ;
22- import mil .nga .giat .geowave .core .index .sfc .data .BasicNumericDataset ;
23- import mil .nga .giat .geowave .core .index .sfc .data .NumericData ;
24- import mil .nga .giat .geowave .core .index .sfc .data .NumericRange ;
23+ import mil .nga .giat .geowave .core .index .sfc .data .MultiDimensionalNumericData ;
2524import mil .nga .giat .geowave .mapreduce .input .GeoWaveInputKey ;
2625import scala .Tuple2 ;
2726
@@ -85,64 +84,45 @@ public Iterator<Tuple2<ByteArrayId, Tuple2<GeoWaveInputKey, SimpleFeature>>> cal
8584
8685 // Pull feature to index from tuple
8786 SimpleFeature inputFeature = t ._2 ;
88-
87+ // If we are dealing with null or empty
88+ // geometry we can't properly compare this
89+ // feature.
8990 Geometry geom = (Geometry ) inputFeature .getDefaultGeometry ();
9091 if (geom == null ) {
91- return result . iterator ();
92+ return Iterators . emptyIterator ();
9293 }
93- // Extract bounding box from input feature
94- BoundingBox bounds = inputFeature .getBounds ();
95- NumericRange xRange = new NumericRange (
96- bounds .getMinX () - bufferAmount ,
97- bounds .getMaxX () + bufferAmount );
98- NumericRange yRange = new NumericRange (
99- bounds .getMinY () - bufferAmount ,
100- bounds .getMaxY () + bufferAmount );
101-
102- if (bounds .isEmpty ()) {
103- Envelope internalEnvelope = geom .getEnvelopeInternal ();
104- xRange = new NumericRange (
105- internalEnvelope .getMinX () - bufferAmount ,
106- internalEnvelope .getMaxX () + bufferAmount );
107- yRange = new NumericRange (
108- internalEnvelope .getMinY () - bufferAmount ,
109- internalEnvelope .getMaxY () + bufferAmount );
11094
95+ Envelope internalEnvelope = geom .getEnvelopeInternal ();
96+ if (internalEnvelope .isNull ()) {
97+ return Iterators .emptyIterator ();
98+ }
99+ // If we have to buffer geometry for
100+ // predicate expand bounds
101+ internalEnvelope .expandBy (bufferAmount );
102+
103+ // Get data range from expanded envelope
104+ MultiDimensionalNumericData boundsRange = GeometryUtils
105+ .getBoundsFromEnvelope (internalEnvelope );
106+
107+ NumericIndexStrategy index = indexStrategy .value ();
108+ InsertionIds insertIds = index .getInsertionIds (
109+ boundsRange ,
110+ 80 );
111+
112+ // If we didnt expand the envelope for
113+ // buffering we can trim the indexIds by the
114+ // geometry
115+ if (bufferAmount == 0.0 ) {
116+ insertIds = RDDUtils .trimIndexIds (
117+ insertIds ,
118+ geom ,
119+ index );
111120 }
112121
113- NumericData [] boundsRange = {
114- xRange ,
115- yRange
116- };
117-
118- // Convert the data to how the api expects
119- // and index
120- // using strategy above
121- BasicNumericDataset convertedBounds = new BasicNumericDataset (
122- boundsRange );
123- InsertionIds insertIds = indexStrategy .value ().getInsertionIds (
124- convertedBounds );
125-
126- // Sometimes the result can span more than
127- // one row/cell
128- // of a tier
129- // When we span more than one row each
130- // individual get
131- // added as a separate output pair
132- // TODO should this use composite IDs or
133- // just the sort
134- // keys
135122 for (Iterator <ByteArrayId > iter = insertIds .getCompositeInsertionIds ().iterator (); iter
136123 .hasNext ();) {
137124 ByteArrayId id = iter .next ();
138- // Id decomposes to byte array of Tier,
139- // Bin, SFC
140- // (Hilbert in this case) id)
141- // There may be value in decomposing the
142- // id and
143- // storing tier + sfcIndex as a tuple
144- // key of the new
145- // RDD
125+
146126 Tuple2 <GeoWaveInputKey , SimpleFeature > valuePair = new Tuple2 <>(
147127 t ._1 ,
148128 inputFeature );
@@ -163,90 +143,70 @@ public Iterator<Tuple2<ByteArrayId, Tuple2<GeoWaveInputKey, SimpleFeature>>> cal
163143 }
164144
165145 public JavaPairRDD <ByteArrayId , Tuple2 <GeoWaveInputKey , Geometry >> getIndexedGeometryRDD () {
166- return this .getIndexedGeometryRDD (0.0 );
146+ return this .getIndexedGeometryRDD (
147+ 0.0 ,
148+ false );
167149 }
168150
169151 public JavaPairRDD <ByteArrayId , Tuple2 <GeoWaveInputKey , Geometry >> getIndexedGeometryRDD (
170- double bufferAmount ) {
152+ double bufferAmount ,
153+ boolean recalculate ) {
171154 verifyParameters ();
172155
173156 if (!geowaveRDD .isLoaded ()) {
174157 LOGGER .error ("Must provide a loaded RDD." );
175158 return null ;
176159 }
177- if (rawGeometryRDD == null ) {
178- JavaPairRDD < ByteArrayId , Tuple2 < GeoWaveInputKey , Geometry >> indexedData = geowaveRDD
160+ if (rawGeometryRDD == null || recalculate ) {
161+ rawGeometryRDD = geowaveRDD
179162 .getRawRDD ()
163+ .filter (t -> (t ._2 .getDefaultGeometry () != null && !((Geometry )t ._2 .getDefaultGeometry ()).getEnvelopeInternal ().isNull ()))
180164 .flatMapToPair (
181165 new PairFlatMapFunction <Tuple2 <GeoWaveInputKey , SimpleFeature >, ByteArrayId , Tuple2 <GeoWaveInputKey , Geometry >>() {
182166 @ Override
183167 public Iterator <Tuple2 <ByteArrayId , Tuple2 <GeoWaveInputKey , Geometry >>> call (
184168 Tuple2 <GeoWaveInputKey , SimpleFeature > t )
185169 throws Exception {
186170
187- // Flattened output array.
188- List <Tuple2 <ByteArrayId , Tuple2 <GeoWaveInputKey , Geometry >>> result = new ArrayList <>();
189-
190171 // Pull feature to index from tuple
191172 SimpleFeature inputFeature = t ._2 ;
192-
173+ // If we are dealing with null or empty
174+ // geometry we can't properly compare this
175+ // feature.
193176 Geometry geom = (Geometry ) inputFeature .getDefaultGeometry ();
194- if (geom == null ) {
195- return result .iterator ();
196- }
197- // Extract bounding box from input feature
198- BoundingBox bounds = inputFeature .getBounds ();
199- NumericRange xRange = new NumericRange (
200- bounds .getMinX () - bufferAmount ,
201- bounds .getMaxX () + bufferAmount );
202- NumericRange yRange = new NumericRange (
203- bounds .getMinY () - bufferAmount ,
204- bounds .getMaxY () + bufferAmount );
205-
206- if (bounds .isEmpty ()) {
207- Envelope internalEnvelope = geom .getEnvelopeInternal ();
208- xRange = new NumericRange (
209- internalEnvelope .getMinX () - bufferAmount ,
210- internalEnvelope .getMaxX () + bufferAmount );
211- yRange = new NumericRange (
212- internalEnvelope .getMinY () - bufferAmount ,
213- internalEnvelope .getMaxY () + bufferAmount );
214177
178+ Envelope internalEnvelope = geom .getEnvelopeInternal ();
179+ // If we have to buffer geometry for
180+ // predicate expand bounds
181+ internalEnvelope .expandBy (bufferAmount );
182+
183+ // Get data range from expanded envelope
184+ MultiDimensionalNumericData boundsRange = GeometryUtils
185+ .getBoundsFromEnvelope (internalEnvelope );
186+
187+ NumericIndexStrategy index = indexStrategy .value ();
188+ InsertionIds insertIds = index .getInsertionIds (
189+ boundsRange ,
190+ 80 );
191+
192+ // If we didnt expand the envelope for
193+ // buffering we can trim the indexIds by the
194+ // geometry
195+ if (bufferAmount == 0.0 ) {
196+ insertIds = RDDUtils .trimIndexIds (
197+ insertIds ,
198+ geom ,
199+ index );
215200 }
216201
217- NumericData [] boundsRange = {
218- xRange ,
219- yRange
220- };
221-
222- // Convert the data to how the api expects
223- // and index
224- // using strategy above
225- BasicNumericDataset convertedBounds = new BasicNumericDataset (
226- boundsRange );
227- InsertionIds insertIds = indexStrategy .value ().getInsertionIds (
228- convertedBounds );
229-
230- // Sometimes the result can span more than
231- // one row/cell
232- // of a tier
233- // When we span more than one row each
234- // individual get
235- // added as a separate output pair
236- // TODO should this use composite IDs or
237- // just the sort
238- // keys
202+ // Flattened output array.
203+ List <Tuple2 <ByteArrayId , Tuple2 <GeoWaveInputKey , Geometry >>> result = Lists
204+ .newArrayListWithCapacity (insertIds .getSize ());
205+
239206 for (Iterator <ByteArrayId > iter = insertIds .getCompositeInsertionIds ().iterator (); iter
240207 .hasNext ();) {
241208 ByteArrayId id = iter .next ();
242- // Id decomposes to byte array of Tier,
243- // Bin, SFC
244- // (Hilbert in this case) id)
245- // There may be value in decomposing the
246- // id and
247- // storing tier + sfcIndex as a tuple
248- // key of the new
249- // RDD
209+
250210 Tuple2 <GeoWaveInputKey , Geometry > valuePair = new Tuple2 <>(
251211 t ._1 ,
252212 geom );
@@ -260,7 +220,6 @@ public Iterator<Tuple2<ByteArrayId, Tuple2<GeoWaveInputKey, Geometry>>> call(
260220 }
261221
262222 });
263- rawGeometryRDD = indexedData ;
264223 }
265224
266225 return rawGeometryRDD ;
0 commit comments