Skip to content

Commit 0ffd322

Browse files
authored
Merge pull request #892 from ngageoint/kde_improvement
fixed KDE to tileSize 1 and added tile resize as post process
2 parents 98d895a + 87e0225 commit 0ffd322

File tree

15 files changed

+236
-112
lines changed

15 files changed

+236
-112
lines changed

analytics/mapreduce/src/main/java/mil/nga/giat/geowave/analytic/mapreduce/kde/AccumuloKDEReducer.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,19 @@
55
import java.util.ArrayList;
66
import java.util.List;
77

8+
import org.apache.hadoop.io.DoubleWritable;
9+
import org.apache.hadoop.io.LongWritable;
10+
import org.apache.hadoop.mapreduce.Reducer;
11+
import org.opengis.coverage.grid.GridCoverage;
12+
13+
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
814
import mil.nga.giat.geowave.adapter.raster.RasterUtils;
915
import mil.nga.giat.geowave.core.geotime.ingest.SpatialDimensionalityTypeProvider;
1016
import mil.nga.giat.geowave.core.index.ByteArrayId;
1117
import mil.nga.giat.geowave.core.store.index.PrimaryIndex;
1218
import mil.nga.giat.geowave.mapreduce.JobContextIndexStore;
1319
import mil.nga.giat.geowave.mapreduce.output.GeoWaveOutputKey;
1420

15-
import org.apache.hadoop.io.DoubleWritable;
16-
import org.apache.hadoop.io.LongWritable;
17-
import org.apache.hadoop.mapreduce.Reducer;
18-
import org.opengis.coverage.grid.GridCoverage;
19-
2021
public class AccumuloKDEReducer extends
2122
Reducer<DoubleWritable, LongWritable, GeoWaveOutputKey, GridCoverage>
2223
{
@@ -118,7 +119,6 @@ public boolean equals(
118119
private int numXTiles;
119120
private int numYTiles;
120121
private String coverageName;
121-
private int tileSize;
122122
protected List<ByteArrayId> indexList;
123123

124124
@Override
@@ -146,7 +146,7 @@ protected void reduce(
146146
final TileInfo tileInfo = fromCellIndexToTileInfo(cellIndex);
147147
final WritableRaster raster = RasterUtils.createRasterTypeDouble(
148148
NUM_BANDS,
149-
tileSize);
149+
KDEJobRunner.TILE_SIZE);
150150
raster.setSample(
151151
tileInfo.x,
152152
tileInfo.y,
@@ -183,14 +183,15 @@ protected void reduce(
183183
}
184184
}
185185

186+
@SuppressFBWarnings(value = "INT_BAD_REM_BY_1", justification = "The calculation is appropriate if we ever want to vary to tile size.")
186187
private TileInfo fromCellIndexToTileInfo(
187188
final long index ) {
188189
final int xPost = (int) (index / numYPosts);
189190
final int yPost = (int) (index % numYPosts);
190-
final int xTile = xPost / tileSize;
191-
final int yTile = yPost / tileSize;
192-
final int x = (xPost % tileSize);
193-
final int y = (yPost % tileSize);
191+
final int xTile = xPost / KDEJobRunner.TILE_SIZE;
192+
final int yTile = yPost / KDEJobRunner.TILE_SIZE;
193+
final int x = (xPost % KDEJobRunner.TILE_SIZE);
194+
final int y = (yPost % KDEJobRunner.TILE_SIZE);
194195
final double tileWestLon = ((xTile * 360.0) / numXTiles) - 180.0;
195196
final double tileSouthLat = ((yTile * 180.0) / numYTiles) - 90.0;
196197
final double tileEastLon = tileWestLon + (360.0 / numXTiles);
@@ -201,7 +202,9 @@ private TileInfo fromCellIndexToTileInfo(
201202
tileSouthLat,
202203
tileNorthLat,
203204
x,
204-
tileSize - y - 1); // remember java rasters go from 0 at the top
205+
KDEJobRunner.TILE_SIZE - y - 1); // remember java rasters go
206+
// from 0 at the
207+
// top
205208
// to (height-1) at the bottom, so we have
206209
// to
207210
// inverse the y here which goes from bottom
@@ -223,9 +226,6 @@ protected void setup(
223226
coverageName = context.getConfiguration().get(
224227
KDEJobRunner.COVERAGE_NAME_KEY,
225228
"");
226-
tileSize = context.getConfiguration().getInt(
227-
KDEJobRunner.TILE_SIZE_KEY,
228-
1);
229229
numLevels = (maxLevels - minLevels) + 1;
230230
level = context.getConfiguration().getInt(
231231
"mapred.task.partition",
@@ -236,7 +236,7 @@ protected void setup(
236236
numYTiles = (int) Math.pow(
237237
2,
238238
level);
239-
numYPosts = numYTiles * tileSize;
239+
numYPosts = numYTiles * KDEJobRunner.TILE_SIZE;
240240

241241
totalKeys = context.getConfiguration().getLong(
242242
"Entries per level.level" + level,

analytics/mapreduce/src/main/java/mil/nga/giat/geowave/analytic/mapreduce/kde/GaussianCellMapper.java

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ public class GaussianCellMapper extends
2525
protected static final String CQL_FILTER_KEY = "CQL_FILTER";
2626
protected int minLevel;
2727
protected int maxLevel;
28-
protected int tileSize;
2928
protected Filter filter;
3029
protected Map<Integer, LevelStore> levelStoreMap;
3130

@@ -41,9 +40,6 @@ protected void setup(
4140
maxLevel = context.getConfiguration().getInt(
4241
KDEJobRunner.MAX_LEVEL_KEY,
4342
25);
44-
tileSize = context.getConfiguration().getInt(
45-
KDEJobRunner.TILE_SIZE_KEY,
46-
1);
4743
final String cql = context.getConfiguration().get(
4844
CQL_FILTER_KEY);
4945
if ((cql != null) && !cql.isEmpty()) {
@@ -61,10 +57,10 @@ protected void setup(
6157
for (int level = maxLevel; level >= minLevel; level--) {
6258
final int numXPosts = (int) Math.pow(
6359
2,
64-
level + 1) * tileSize;
60+
level + 1) * KDEJobRunner.TILE_SIZE;
6561
final int numYPosts = (int) Math.pow(
6662
2,
67-
level) * tileSize;
63+
level) * KDEJobRunner.TILE_SIZE;
6864
populateLevelStore(
6965
context,
7066
numXPosts,

analytics/mapreduce/src/main/java/mil/nga/giat/geowave/analytic/mapreduce/kde/KDECommandLineOptions.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,10 @@ public class KDECommandLineOptions
1818
@Parameter(names = "--maxLevel", required = true, description = "The max level to run a KDE at")
1919
private Integer maxLevel;
2020

21-
@Parameter(names = "--minSplits", required = true, description = "The min partitions for the input data")
21+
@Parameter(names = "--minSplits", description = "The min partitions for the input data")
2222
private Integer minSplits;
2323

24-
@Parameter(names = "--maxSplits", required = true, description = "The max partitions for the input data")
24+
@Parameter(names = "--maxSplits", description = "The max partitions for the input data")
2525
private Integer maxSplits;
2626

2727
@Parameter(names = "--coverageName", required = true, description = "The coverage name")
@@ -33,8 +33,8 @@ public class KDECommandLineOptions
3333
@Parameter(names = "--jobSubmissionHostPort", required = true, description = "The job submission tracker")
3434
private String jobTrackerOrResourceManHostPort;
3535

36-
@Parameter(names = "--tileSize", required = true, description = "The tile size")
37-
private Integer tileSize;
36+
@Parameter(names = "--tileSize", description = "The tile size")
37+
private Integer tileSize = 1;
3838

3939
@Parameter(names = "--cqlFilter", description = "An optional CQL filter applied to the input data")
4040
private String cqlFilter;

analytics/mapreduce/src/main/java/mil/nga/giat/geowave/analytic/mapreduce/kde/KDEJobRunner.java

Lines changed: 100 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.io.IOException;
44
import java.net.URI;
55
import java.net.URISyntaxException;
6+
import java.util.Map;
67

78
import org.apache.hadoop.conf.Configuration;
89
import org.apache.hadoop.conf.Configured;
@@ -30,21 +31,27 @@
3031
import com.vividsolutions.jts.geom.Geometry;
3132

3233
import mil.nga.giat.geowave.adapter.raster.RasterUtils;
34+
import mil.nga.giat.geowave.adapter.raster.adapter.merge.nodata.NoDataMergeStrategy;
35+
import mil.nga.giat.geowave.adapter.raster.operations.ResizeCommand;
3336
import mil.nga.giat.geowave.adapter.vector.plugin.ExtractGeometryFilterVisitor;
3437
import mil.nga.giat.geowave.analytic.mapreduce.operations.KdeCommand;
3538
import mil.nga.giat.geowave.core.cli.operations.config.options.ConfigOptions;
3639
import mil.nga.giat.geowave.core.cli.parser.CommandLineOperationParams;
40+
import mil.nga.giat.geowave.core.cli.parser.ManualOperationParams;
3741
import mil.nga.giat.geowave.core.cli.parser.OperationParser;
3842
import mil.nga.giat.geowave.core.geotime.GeometryUtils;
3943
import mil.nga.giat.geowave.core.geotime.ingest.SpatialDimensionalityTypeProvider.SpatialIndexBuilder;
4044
import mil.nga.giat.geowave.core.geotime.store.query.SpatialQuery;
4145
import mil.nga.giat.geowave.core.index.ByteArrayId;
46+
import mil.nga.giat.geowave.core.store.StoreFactoryOptions;
4247
import mil.nga.giat.geowave.core.store.adapter.AdapterStore;
4348
import mil.nga.giat.geowave.core.store.adapter.WritableDataAdapter;
49+
import mil.nga.giat.geowave.core.store.config.ConfigUtils;
4450
import mil.nga.giat.geowave.core.store.index.Index;
4551
import mil.nga.giat.geowave.core.store.index.IndexStore;
4652
import mil.nga.giat.geowave.core.store.index.PrimaryIndex;
4753
import mil.nga.giat.geowave.core.store.index.writer.IndexWriter;
54+
import mil.nga.giat.geowave.core.store.operations.remote.ClearCommand;
4855
import mil.nga.giat.geowave.core.store.operations.remote.options.DataStorePluginOptions;
4956
import mil.nga.giat.geowave.core.store.query.QueryOptions;
5057
import mil.nga.giat.geowave.mapreduce.GeoWaveConfiguratorBase;
@@ -58,10 +65,11 @@ public class KDEJobRunner extends
5865
{
5966
private static final Logger LOGGER = LoggerFactory.getLogger(KDEJobRunner.class);
6067
public static final String GEOWAVE_CLASSPATH_JARS = "geowave.classpath.jars";
68+
private static final String TMP_COVERAGE_SUFFIX = "_tMp_CoVeRaGe";
69+
protected static int TILE_SIZE = 1;
6170
public static final String MAX_LEVEL_KEY = "MAX_LEVEL";
6271
public static final String MIN_LEVEL_KEY = "MIN_LEVEL";
6372
public static final String COVERAGE_NAME_KEY = "COVERAGE_NAME";
64-
public static final String TILE_SIZE_KEY = "TILE_SIZE";
6573
protected KDECommandLineOptions kdeCommandLineOptions;
6674
protected DataStorePluginOptions inputDataStoreOptions;
6775
protected DataStorePluginOptions outputDataStoreOptions;
@@ -86,6 +94,34 @@ public int runJob()
8694
conf = new Configuration();
8795
setConf(conf);
8896
}
97+
98+
DataStorePluginOptions rasterResizeOutputDataStoreOptions;
99+
String kdeCoverageName;
100+
// so we don't need a no data merge strategy, use 1 for the tile size of
101+
// the KDE output and then run a resize operation
102+
if ((kdeCommandLineOptions.getTileSize() > 1)) {
103+
// this is the ending data store options after resize, the KDE will
104+
// need to output to a temporary namespace, a resize operation
105+
// will use the outputDataStoreOptions
106+
rasterResizeOutputDataStoreOptions = outputDataStoreOptions;
107+
108+
// first clone the outputDataStoreOptions, then set it to a tmp
109+
// namespace
110+
final Map<String, String> configOptions = outputDataStoreOptions.getFactoryOptionsAsMap();
111+
final StoreFactoryOptions options = ConfigUtils.populateOptionsFromList(
112+
outputDataStoreOptions.getFactoryFamily().getDataStoreFactory().createOptionsInstance(),
113+
configOptions);
114+
options.setGeowaveNamespace(outputDataStoreOptions.getGeowaveNamespace() + "_tmp");
115+
outputDataStoreOptions = new DataStorePluginOptions(
116+
outputDataStoreOptions.getType(),
117+
outputDataStoreOptions.getFactoryFamily(),
118+
options);
119+
kdeCoverageName = kdeCommandLineOptions.getCoverageName() + TMP_COVERAGE_SUFFIX;
120+
}
121+
else {
122+
rasterResizeOutputDataStoreOptions = null;
123+
kdeCoverageName = kdeCommandLineOptions.getCoverageName();
124+
}
89125
GeoWaveConfiguratorBase.setRemoteInvocationParams(
90126
kdeCommandLineOptions.getHdfsHostPort(),
91127
kdeCommandLineOptions.getJobTrackerOrResourceManHostPort(),
@@ -98,10 +134,7 @@ public int runJob()
98134
kdeCommandLineOptions.getMinLevel());
99135
conf.set(
100136
COVERAGE_NAME_KEY,
101-
kdeCommandLineOptions.getCoverageName());
102-
conf.setInt(
103-
TILE_SIZE_KEY,
104-
kdeCommandLineOptions.getTileSize());
137+
kdeCoverageName);
105138
if (kdeCommandLineOptions.getCqlFilter() != null) {
106139
conf.set(
107140
GaussianCellMapper.CQL_FILTER_KEY,
@@ -192,6 +225,7 @@ public int runJob()
192225
final boolean job1Success = job.waitForCompletion(true);
193226
boolean job2Success = false;
194227
boolean postJob2Success = false;
228+
195229
// Linear MapReduce job chaining
196230
if (job1Success) {
197231
setupEntriesPerLevel(
@@ -227,17 +261,64 @@ public int runJob()
227261
setupJob2Output(
228262
conf,
229263
statsReducer,
230-
outputDataStoreOptions.getGeowaveNamespace());
264+
outputDataStoreOptions.getGeowaveNamespace(),
265+
kdeCoverageName);
231266
job2Success = statsReducer.waitForCompletion(true);
232267
if (job2Success) {
233268
postJob2Success = postJob2Actions(
234269
conf,
235-
outputDataStoreOptions.getGeowaveNamespace());
270+
outputDataStoreOptions.getGeowaveNamespace(),
271+
kdeCoverageName);
236272
}
237273
}
238274
else {
239275
job2Success = false;
240276
}
277+
if (rasterResizeOutputDataStoreOptions != null) {
278+
// delegate to resize command to wrap it up with the correctly
279+
// requested tile size
280+
final ResizeCommand resizeCommand = new ResizeCommand();
281+
282+
// We're going to override these anyway.
283+
resizeCommand.setParameters(
284+
null,
285+
null);
286+
287+
resizeCommand.setInputStoreOptions(outputDataStoreOptions);
288+
resizeCommand.setOutputStoreOptions(rasterResizeOutputDataStoreOptions);
289+
290+
resizeCommand.getOptions().setInputCoverageName(
291+
kdeCoverageName);
292+
resizeCommand.getOptions().setMinSplits(
293+
kdeCommandLineOptions.getMinSplits());
294+
resizeCommand.getOptions().setMaxSplits(
295+
kdeCommandLineOptions.getMaxSplits());
296+
resizeCommand.getOptions().setHdfsHostPort(
297+
kdeCommandLineOptions.getHdfsHostPort());
298+
resizeCommand.getOptions().setJobTrackerOrResourceManHostPort(
299+
kdeCommandLineOptions.getJobTrackerOrResourceManHostPort());
300+
resizeCommand.getOptions().setOutputCoverageName(
301+
kdeCommandLineOptions.getCoverageName());
302+
303+
resizeCommand.getOptions().setOutputTileSize(
304+
kdeCommandLineOptions.getTileSize());
305+
306+
final int resizeStatus = ToolRunner.run(
307+
resizeCommand.createRunner(new ManualOperationParams()),
308+
new String[] {});
309+
if (resizeStatus == 0) {
310+
// delegate to clear command to clean up with tmp namespace
311+
// after successful resize
312+
final ClearCommand clearCommand = new ClearCommand();
313+
clearCommand.setParameters(null);
314+
clearCommand.setInputStoreOptions(outputDataStoreOptions);
315+
clearCommand.execute(new ManualOperationParams());
316+
}
317+
else {
318+
LOGGER.warn("Resize command error code '" + resizeStatus + "'. Retaining temporary namespace '"
319+
+ outputDataStoreOptions.getGeowaveNamespace() + "' with tile size of 1.");
320+
}
321+
}
241322

242323
fs.delete(
243324
new Path(
@@ -268,7 +349,8 @@ protected void preJob1Setup(
268349

269350
protected boolean postJob2Actions(
270351
final Configuration conf,
271-
final String statsNamespace )
352+
final String statsNamespace,
353+
final String coverageName )
272354
throws Exception {
273355
return true;
274356
}
@@ -321,16 +403,18 @@ protected String getJob1Name() {
321403
protected void setupJob2Output(
322404
final Configuration conf,
323405
final Job statsReducer,
324-
final String statsNamespace )
406+
final String statsNamespace,
407+
final String coverageName )
325408
throws Exception {
326409
final PrimaryIndex index = new SpatialIndexBuilder().createIndex();
327410
final WritableDataAdapter<?> adapter = RasterUtils.createDataAdapterTypeDouble(
328-
kdeCommandLineOptions.getCoverageName(),
411+
coverageName,
329412
AccumuloKDEReducer.NUM_BANDS,
330-
kdeCommandLineOptions.getTileSize(),
413+
TILE_SIZE,
331414
AccumuloKDEReducer.MINS_PER_BAND,
332415
AccumuloKDEReducer.MAXES_PER_BAND,
333-
AccumuloKDEReducer.NAME_PER_BAND);
416+
AccumuloKDEReducer.NAME_PER_BAND,
417+
null);
334418
setup(
335419
statsReducer,
336420
statsNamespace,
@@ -366,11 +450,11 @@ protected void setup(
366450
public static void main(
367451
final String[] args )
368452
throws Exception {
369-
ConfigOptions opts = new ConfigOptions();
370-
OperationParser parser = new OperationParser();
453+
final ConfigOptions opts = new ConfigOptions();
454+
final OperationParser parser = new OperationParser();
371455
parser.addAdditionalObject(opts);
372-
KdeCommand command = new KdeCommand();
373-
CommandLineOperationParams params = parser.parse(
456+
final KdeCommand command = new KdeCommand();
457+
final CommandLineOperationParams params = parser.parse(
374458
command,
375459
args);
376460
opts.prepare(params);

0 commit comments

Comments
 (0)