* Main method to execute the MapReduce analytic.
*/
@SuppressWarnings("deprecation")
public int runJob()
throws Exception {
final Index spatialIndex = IndexType.SPATIAL_VECTOR.createDefaultIndex();
final Configuration conf = super.getConf();
final BasicAccumuloOperations ops = new BasicAccumuloOperations(
zookeeper,
instance,
user,
password,
namespace);
final AdapterStore adapterStore = new AccumuloAdapterStore(
ops);
final DataAdapter<?> adapter = adapterStore.getAdapter(new ByteArrayId(
StringUtils.stringToBinary(featureType)));
conf.set(
GaussianCellMapper.DATA_ADAPTER_KEY,
ByteArrayUtils.byteArrayToString(PersistenceUtils.toBinary(adapter)));
conf.setInt(
MAX_LEVEL_KEY,
maxLevel);
conf.setInt(
MIN_LEVEL_KEY,
minLevel);
conf.set(
AccumuloKDEReducer.STATS_NAME_KEY,
statsName);
if (cqlFilter != null) {
conf.set(
GaussianCellMapper.CQL_FILTER_KEY,
cqlFilter);
}
preJob1Setup(conf);
final Job job = new Job(
conf);
job.setJarByClass(this.getClass());
job.setJobName(getJob1Name());
job.setMapperClass(getJob1Mapper());
job.setCombinerClass(CellSummationCombiner.class);
job.setReducerClass(getJob1Reducer());
job.setMapOutputKeyClass(LongWritable.class);
job.setMapOutputValueClass(DoubleWritable.class);
job.setOutputKeyClass(DoubleWritable.class);
job.setOutputValueClass(LongWritable.class);
job.setInputFormatClass(AccumuloInputFormat.class);
job.setOutputFormatClass(SequenceFileOutputFormat.class);
job.setNumReduceTasks(8);
if (cqlFilter != null) {
final Filter filter = ECQL.toFilter(cqlFilter);
final Geometry bbox = (Geometry) filter.accept(
ExtractGeometryFilterVisitor.GEOMETRY_VISITOR,
null);
if ((bbox != null) && !bbox.equals(GeometryUtils.infinity())) {
final Constraints c = GeometryUtils.basicConstraintsFromGeometry(bbox);
final List<ByteArrayRange> ranges = spatialIndex.getIndexStrategy().getQueryRanges(
c.getIndexConstraints(spatialIndex.getIndexStrategy()));
InputFormatBase.setRanges(
job,
AccumuloUtils.byteArrayRangesToAccumuloRanges(ranges));
}
conf.set(
GaussianCellMapper.CQL_FILTER_KEY,
cqlFilter);
}
InputFormatBase.setConnectorInfo(
job,
user,
new PasswordToken(
password.getBytes()));
InputFormatBase.setInputTableName(
job,
AccumuloUtils.getQualifiedTableName(
namespace,
StringUtils.stringFromBinary(spatialIndex.getId().getBytes())));
InputFormatBase.setScanAuthorizations(
job,
new Authorizations());
InputFormatBase.setZooKeeperInstance(
job,
instance,
zookeeper);
// we have to at least use a whole row iterator
final IteratorSetting iteratorSettings = new IteratorSetting(
10,
"GEOWAVE_WHOLE_ROW_ITERATOR",
WholeRowIterator.class);
InputFormatBase.addIterator(
job,
iteratorSettings);
final FileSystem fs = FileSystem.get(conf);
fs.delete(
new Path(
"/tmp/" + namespace + "_stats_" + minLevel + "_" + maxLevel + "_" + statsName),
true);
FileOutputFormat.setOutputPath(
job,
new Path(
"/tmp/" + namespace + "_stats_" + minLevel + "_" + maxLevel + "_" + statsName + "/basic"));
final boolean job1Success = job.waitForCompletion(true);
boolean job2Success = false;
boolean postJob2Success = false;
// Linear MapReduce job chaining
if (job1Success) {
final String statsNamespace = namespace + "_stats";
final String tableName = AccumuloUtils.getQualifiedTableName(
statsNamespace,
StringUtils.stringFromBinary(spatialIndex.getId().getBytes()));
conf.set(
TABLE_NAME,
tableName);
setupEntriesPerLevel(