//illegal state, raise exception
throw new IllegalStateException("Reduce phase of MapReduceTask " + taskId + " on node "
+ localAddress + " executed with empty input keys");
} else{
//first hook into lifecycle
MapReduceTaskLifecycleService taskLifecycleService = MapReduceTaskLifecycleService.getInstance();
log.tracef("For m/r task %s invoking %s at %s", taskId, reduceCommand, localAddress);
try {
taskLifecycleService.onPreExecute(reducer, cache);
for (KOut key : keys) {
//load result value from map phase
List<VOut> value = null;
if(useIntermediateKeys){
value = tmpCache.get(new IntermediateCompositeKey<KOut>(taskId, key));
} else {
value = tmpCache.get(key);
}
// and reduce it
VOut reduced = reducer.reduce(key, value.iterator());
result.put(key, reduced);
log.tracef("For m/r task %s reduced %s to %s at %s ", taskId, key, reduced, localAddress);
}
} finally {
taskLifecycleService.onPostExecute(reducer);
}
}
return result;
}