33import java .io .IOException ;
44import java .net .URI ;
55import java .net .URISyntaxException ;
6+ import java .util .Map ;
67
78import org .apache .hadoop .conf .Configuration ;
89import org .apache .hadoop .conf .Configured ;
3031import com .vividsolutions .jts .geom .Geometry ;
3132
3233import mil .nga .giat .geowave .adapter .raster .RasterUtils ;
34+ import mil .nga .giat .geowave .adapter .raster .adapter .merge .nodata .NoDataMergeStrategy ;
35+ import mil .nga .giat .geowave .adapter .raster .operations .ResizeCommand ;
3336import mil .nga .giat .geowave .adapter .vector .plugin .ExtractGeometryFilterVisitor ;
3437import mil .nga .giat .geowave .analytic .mapreduce .operations .KdeCommand ;
3538import mil .nga .giat .geowave .core .cli .operations .config .options .ConfigOptions ;
3639import mil .nga .giat .geowave .core .cli .parser .CommandLineOperationParams ;
40+ import mil .nga .giat .geowave .core .cli .parser .ManualOperationParams ;
3741import mil .nga .giat .geowave .core .cli .parser .OperationParser ;
3842import mil .nga .giat .geowave .core .geotime .GeometryUtils ;
3943import mil .nga .giat .geowave .core .geotime .ingest .SpatialDimensionalityTypeProvider .SpatialIndexBuilder ;
4044import mil .nga .giat .geowave .core .geotime .store .query .SpatialQuery ;
4145import mil .nga .giat .geowave .core .index .ByteArrayId ;
46+ import mil .nga .giat .geowave .core .store .StoreFactoryOptions ;
4247import mil .nga .giat .geowave .core .store .adapter .AdapterStore ;
4348import mil .nga .giat .geowave .core .store .adapter .WritableDataAdapter ;
49+ import mil .nga .giat .geowave .core .store .config .ConfigUtils ;
4450import mil .nga .giat .geowave .core .store .index .Index ;
4551import mil .nga .giat .geowave .core .store .index .IndexStore ;
4652import mil .nga .giat .geowave .core .store .index .PrimaryIndex ;
4753import mil .nga .giat .geowave .core .store .index .writer .IndexWriter ;
54+ import mil .nga .giat .geowave .core .store .operations .remote .ClearCommand ;
4855import mil .nga .giat .geowave .core .store .operations .remote .options .DataStorePluginOptions ;
4956import mil .nga .giat .geowave .core .store .query .QueryOptions ;
5057import mil .nga .giat .geowave .mapreduce .GeoWaveConfiguratorBase ;
@@ -58,10 +65,11 @@ public class KDEJobRunner extends
5865{
5966 private static final Logger LOGGER = LoggerFactory .getLogger (KDEJobRunner .class );
6067 public static final String GEOWAVE_CLASSPATH_JARS = "geowave.classpath.jars" ;
68+ private static final String TMP_COVERAGE_SUFFIX = "_tMp_CoVeRaGe" ;
69+ protected static int TILE_SIZE = 1 ;
6170 public static final String MAX_LEVEL_KEY = "MAX_LEVEL" ;
6271 public static final String MIN_LEVEL_KEY = "MIN_LEVEL" ;
6372 public static final String COVERAGE_NAME_KEY = "COVERAGE_NAME" ;
64- public static final String TILE_SIZE_KEY = "TILE_SIZE" ;
6573 protected KDECommandLineOptions kdeCommandLineOptions ;
6674 protected DataStorePluginOptions inputDataStoreOptions ;
6775 protected DataStorePluginOptions outputDataStoreOptions ;
@@ -86,6 +94,34 @@ public int runJob()
8694 conf = new Configuration ();
8795 setConf (conf );
8896 }
97+
98+ DataStorePluginOptions rasterResizeOutputDataStoreOptions ;
99+ String kdeCoverageName ;
100+ // so we don't need a no data merge strategy, use 1 for the tile size of
101+ // the KDE output and then run a resize operation
102+ if ((kdeCommandLineOptions .getTileSize () > 1 )) {
103+ // this is the ending data store options after resize, the KDE will
104+ // need to output to a temporary namespace, a resize operation
105+ // will use the outputDataStoreOptions
106+ rasterResizeOutputDataStoreOptions = outputDataStoreOptions ;
107+
108+ // first clone the outputDataStoreOptions, then set it to a tmp
109+ // namespace
110+ final Map <String , String > configOptions = outputDataStoreOptions .getFactoryOptionsAsMap ();
111+ final StoreFactoryOptions options = ConfigUtils .populateOptionsFromList (
112+ outputDataStoreOptions .getFactoryFamily ().getDataStoreFactory ().createOptionsInstance (),
113+ configOptions );
114+ options .setGeowaveNamespace (outputDataStoreOptions .getGeowaveNamespace () + "_tmp" );
115+ outputDataStoreOptions = new DataStorePluginOptions (
116+ outputDataStoreOptions .getType (),
117+ outputDataStoreOptions .getFactoryFamily (),
118+ options );
119+ kdeCoverageName = kdeCommandLineOptions .getCoverageName () + TMP_COVERAGE_SUFFIX ;
120+ }
121+ else {
122+ rasterResizeOutputDataStoreOptions = null ;
123+ kdeCoverageName = kdeCommandLineOptions .getCoverageName ();
124+ }
89125 GeoWaveConfiguratorBase .setRemoteInvocationParams (
90126 kdeCommandLineOptions .getHdfsHostPort (),
91127 kdeCommandLineOptions .getJobTrackerOrResourceManHostPort (),
@@ -98,10 +134,7 @@ public int runJob()
98134 kdeCommandLineOptions .getMinLevel ());
99135 conf .set (
100136 COVERAGE_NAME_KEY ,
101- kdeCommandLineOptions .getCoverageName ());
102- conf .setInt (
103- TILE_SIZE_KEY ,
104- kdeCommandLineOptions .getTileSize ());
137+ kdeCoverageName );
105138 if (kdeCommandLineOptions .getCqlFilter () != null ) {
106139 conf .set (
107140 GaussianCellMapper .CQL_FILTER_KEY ,
@@ -192,6 +225,7 @@ public int runJob()
192225 final boolean job1Success = job .waitForCompletion (true );
193226 boolean job2Success = false ;
194227 boolean postJob2Success = false ;
228+
195229 // Linear MapReduce job chaining
196230 if (job1Success ) {
197231 setupEntriesPerLevel (
@@ -227,17 +261,64 @@ public int runJob()
227261 setupJob2Output (
228262 conf ,
229263 statsReducer ,
230- outputDataStoreOptions .getGeowaveNamespace ());
264+ outputDataStoreOptions .getGeowaveNamespace (),
265+ kdeCoverageName );
231266 job2Success = statsReducer .waitForCompletion (true );
232267 if (job2Success ) {
233268 postJob2Success = postJob2Actions (
234269 conf ,
235- outputDataStoreOptions .getGeowaveNamespace ());
270+ outputDataStoreOptions .getGeowaveNamespace (),
271+ kdeCoverageName );
236272 }
237273 }
238274 else {
239275 job2Success = false ;
240276 }
277+ if (rasterResizeOutputDataStoreOptions != null ) {
278+ // delegate to resize command to wrap it up with the correctly
279+ // requested tile size
280+ final ResizeCommand resizeCommand = new ResizeCommand ();
281+
282+ // We're going to override these anyway.
283+ resizeCommand .setParameters (
284+ null ,
285+ null );
286+
287+ resizeCommand .setInputStoreOptions (outputDataStoreOptions );
288+ resizeCommand .setOutputStoreOptions (rasterResizeOutputDataStoreOptions );
289+
290+ resizeCommand .getOptions ().setInputCoverageName (
291+ kdeCoverageName );
292+ resizeCommand .getOptions ().setMinSplits (
293+ kdeCommandLineOptions .getMinSplits ());
294+ resizeCommand .getOptions ().setMaxSplits (
295+ kdeCommandLineOptions .getMaxSplits ());
296+ resizeCommand .getOptions ().setHdfsHostPort (
297+ kdeCommandLineOptions .getHdfsHostPort ());
298+ resizeCommand .getOptions ().setJobTrackerOrResourceManHostPort (
299+ kdeCommandLineOptions .getJobTrackerOrResourceManHostPort ());
300+ resizeCommand .getOptions ().setOutputCoverageName (
301+ kdeCommandLineOptions .getCoverageName ());
302+
303+ resizeCommand .getOptions ().setOutputTileSize (
304+ kdeCommandLineOptions .getTileSize ());
305+
306+ final int resizeStatus = ToolRunner .run (
307+ resizeCommand .createRunner (new ManualOperationParams ()),
308+ new String [] {});
309+ if (resizeStatus == 0 ) {
310+ // delegate to clear command to clean up with tmp namespace
311+ // after successful resize
312+ final ClearCommand clearCommand = new ClearCommand ();
313+ clearCommand .setParameters (null );
314+ clearCommand .setInputStoreOptions (outputDataStoreOptions );
315+ clearCommand .execute (new ManualOperationParams ());
316+ }
317+ else {
318+ LOGGER .warn ("Resize command error code '" + resizeStatus + "'. Retaining temporary namespace '"
319+ + outputDataStoreOptions .getGeowaveNamespace () + "' with tile size of 1." );
320+ }
321+ }
241322
242323 fs .delete (
243324 new Path (
@@ -268,7 +349,8 @@ protected void preJob1Setup(
268349
269350 protected boolean postJob2Actions (
270351 final Configuration conf ,
271- final String statsNamespace )
352+ final String statsNamespace ,
353+ final String coverageName )
272354 throws Exception {
273355 return true ;
274356 }
@@ -321,16 +403,18 @@ protected String getJob1Name() {
321403 protected void setupJob2Output (
322404 final Configuration conf ,
323405 final Job statsReducer ,
324- final String statsNamespace )
406+ final String statsNamespace ,
407+ final String coverageName )
325408 throws Exception {
326409 final PrimaryIndex index = new SpatialIndexBuilder ().createIndex ();
327410 final WritableDataAdapter <?> adapter = RasterUtils .createDataAdapterTypeDouble (
328- kdeCommandLineOptions . getCoverageName () ,
411+ coverageName ,
329412 AccumuloKDEReducer .NUM_BANDS ,
330- kdeCommandLineOptions . getTileSize () ,
413+ TILE_SIZE ,
331414 AccumuloKDEReducer .MINS_PER_BAND ,
332415 AccumuloKDEReducer .MAXES_PER_BAND ,
333- AccumuloKDEReducer .NAME_PER_BAND );
416+ AccumuloKDEReducer .NAME_PER_BAND ,
417+ null );
334418 setup (
335419 statsReducer ,
336420 statsNamespace ,
@@ -366,11 +450,11 @@ protected void setup(
366450 public static void main (
367451 final String [] args )
368452 throws Exception {
369- ConfigOptions opts = new ConfigOptions ();
370- OperationParser parser = new OperationParser ();
453+ final ConfigOptions opts = new ConfigOptions ();
454+ final OperationParser parser = new OperationParser ();
371455 parser .addAdditionalObject (opts );
372- KdeCommand command = new KdeCommand ();
373- CommandLineOperationParams params = parser .parse (
456+ final KdeCommand command = new KdeCommand ();
457+ final CommandLineOperationParams params = parser .parse (
374458 command ,
375459 args );
376460 opts .prepare (params );
0 commit comments