/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.jena.tdbloader4.partitioners;
import java.io.IOException;
import java.lang.reflect.Array;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BinaryComparable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.jena.tdbloader4.io.LongQuadWritable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Partitioner effecting a total order by reading split points from an externally generated source.
*/
public class TotalOrderPartitioner<K extends WritableComparable<?>, V> extends Partitioner<K, V> implements Configurable {
private static final Logger log = LoggerFactory.getLogger(TotalOrderPartitioner.class);
@SuppressWarnings("rawtypes")
private Map<String, Node> partitions = new HashMap<String, Node>(9); // nine indexes!
public static final String DEFAULT_PATH = "_partition.lst";
public static final String PARTITIONER_PATH = "mapreduce.totalorderpartitioner.path";
public static final String MAX_TRIE_DEPTH = "mapreduce.totalorderpartitioner.trie.maxdepth";
public static final String NATURAL_ORDER = "mapreduce.totalorderpartitioner.naturalorder";
Configuration conf;
private int numReduceTasks;
public TotalOrderPartitioner() {
log.debug("constructor()");
}
/**
* Read in the partition file and build indexing data structures. If the
* keytype is {@link org.apache.hadoop.io.BinaryComparable} and
* <tt>total.order.partitioner.natural.order</tt> is not false, a trie of
* the first <tt>total.order.partitioner.max.trie.depth</tt>(2) + 1 bytes
* will be built. Otherwise, keys will be located using a binary search of
* the partition keyset using the {@link org.apache.hadoop.io.RawComparator}
* defined for this job. The input file must be sorted with the same
* comparator and contain {@link Job#getNumReduceTasks()} - 1 keys.
*/
// keytype from conf not static
public void setConf(Configuration conf) {
log.debug("setConf({})", conf);
this.conf = conf;
init("GSPO", conf);
init("GPOS", conf);
init("GOSP", conf);
init("SPOG", conf);
init("POSG", conf);
init("OSPG", conf);
init("SPO", conf);
init("POS", conf);
init("OSP", conf);
log.debug("setConf() finished.");
}
@SuppressWarnings("unchecked")
private void init(String indexName, Configuration conf) {
log.debug("init({}, {})", indexName, conf);
try {
String parts = getPartitionFile(conf);
final Path partFile = new Path(parts + "_" + indexName);
final FileSystem fs = (DEFAULT_PATH.equals(parts)) ? FileSystem.getLocal(conf) // assume in DistributedCache
: partFile.getFileSystem(conf);
log.debug("FileSystem is {}", fs);
Job job = new Job(conf);
Class<K> keyClass = (Class<K>) job.getMapOutputKeyClass();
log.debug("Map output key class is {}", keyClass.getSimpleName());
K[] splitPoints = readPartitions(fs, partFile, keyClass, conf);
numReduceTasks = job.getNumReduceTasks();
log.debug("Found {} split points, number of reducers is {}", splitPoints.length, numReduceTasks);
if (splitPoints.length != (numReduceTasks / 9) - 1) {
log.debug("Split points are {} which is different from {}", splitPoints.length, (numReduceTasks / 9) - 1);
throw new IOException("Wrong number of partitions in keyset");
}
RawComparator<K> comparator = (RawComparator<K>) job.getSortComparator();
for (int i = 0; i < splitPoints.length - 1; ++i) {
if (comparator.compare(splitPoints[i], splitPoints[i + 1]) >= 0) {
log.debug("Split points are out of order");
throw new IOException("Split points are out of order");
}
}
boolean natOrder = conf.getBoolean(NATURAL_ORDER, true);
Node<?> partitions = null;
if (natOrder && BinaryComparable.class.isAssignableFrom(keyClass)) {
partitions = buildTrie((BinaryComparable[]) splitPoints, 0, splitPoints.length, new byte[0],
// Now that blocks of identical splitless trie nodes are
// represented reentrantly, and we develop a leaf for any trie
// node with only one split point, the only reason for a depth
// limit is to refute stack overflow or bloat in the pathological
// case where the split points are long and mostly look like bytes
// iii...iixii...iii . Therefore, we make the default
// depth limit large but not huge.
conf.getInt(MAX_TRIE_DEPTH, 200));
} else {
partitions = new BinarySearchNode(splitPoints, comparator);
}
log.debug("Adding {} to {}", partitions, this.partitions);
this.partitions.put(indexName, partitions);
} catch (IOException e) {
throw new IllegalArgumentException("Can't read partitions file", e);
}
log.debug("init({}, {}) finished.", indexName, conf);
}
public Configuration getConf() {
log.debug("getConf() = {}", conf);
return conf;
}
// by construction, we know if our keytype
@SuppressWarnings("unchecked")
// is memcmp-able and uses the trie
public int getPartition(K key, V value, int numPartitions) {
LongQuadWritable quad = (LongQuadWritable)key;
String indexName = quad.getIndexName();
int indexOffset = ( numReduceTasks / 9 ) * LongQuadWritable.getIndexOffset(indexName) ;
int indexPartition = partitions.get(indexName).findPartition(key);
int partition = indexPartition + indexOffset;
if ( log.isDebugEnabled() ) {
log.debug("indexName = {}", indexName);
log.debug("indexOffset = {}", indexOffset);
log.debug("indexPartition = {}", indexPartition);
log.debug("getPartition({}, {}, {}) = {}",
new String[] { key.toString(), value.toString(),
String.valueOf(numPartitions),
String.valueOf(partition) });
}
return partition;
}
/**
* Set the path to the SequenceFile storing the sorted partition keyset. It
* must be the case that for <tt>R</tt> reduces, there are <tt>R-1</tt> keys
* in the SequenceFile.
*/
public static void setPartitionFile(Configuration conf, Path p) {
log.debug("setPartitionFile({}, {})", conf, p);
conf.set(PARTITIONER_PATH, p.toString());
}
/**
* Get the path to the SequenceFile storing the sorted partition keyset.
*
* @see #setPartitionFile(Configuration, Path)
*/
public static String getPartitionFile(Configuration conf) {
String p = conf.get(PARTITIONER_PATH, DEFAULT_PATH);
log.debug("getPartitionFile({}) = {}", conf, p);
return p;
}
/**
* Interface to the partitioner to locate a key in the partition keyset.
*/
interface Node<T> {
/**
* Locate partition in keyset K, st [Ki..Ki+1) defines a partition, with
* implicit K0 = -inf, Kn = +inf, and |K| = #partitions - 1.
*/
int findPartition(T key);
}
/**
* Base class for trie nodes. If the keytype is memcomp-able, this builds
* tries of the first <tt>total.order.partitioner.max.trie.depth</tt> bytes.
*/
static abstract class TrieNode implements Node<BinaryComparable> {
private final int level;
TrieNode(int level) {
this.level = level;
}
int getLevel() {
return level;
}
}
/**
* For types that are not {@link org.apache.hadoop.io.BinaryComparable} or
* where disabled by <tt>total.order.partitioner.natural.order</tt>, search
* the partition keyset with a binary search.
*/
class BinarySearchNode implements Node<K> {
private final K[] splitPoints;
private final RawComparator<K> comparator;
BinarySearchNode(K[] splitPoints, RawComparator<K> comparator) {
this.splitPoints = splitPoints;
this.comparator = comparator;
}
public int findPartition(K key) {
final int pos = Arrays.binarySearch(splitPoints, key, comparator) + 1;
return (pos < 0) ? -pos : pos;
}
}
/**
* An inner trie node that contains 256 children based on the next character.
*/
class InnerTrieNode extends TrieNode {
private TrieNode[] child = new TrieNode[256];
InnerTrieNode(int level) {
super(level);
}
public int findPartition(BinaryComparable key) {
int level = getLevel();
if (key.getLength() <= level) {
return child[0].findPartition(key);
}
return child[0xFF & key.getBytes()[level]].findPartition(key);
}
}
/**
* @param level the tree depth at this node
* @param splitPoints the full split point vector, which holds the split point or points this leaf node should contain
* @param lower first INcluded element of splitPoints
* @param upper first EXcluded element of splitPoints
* @return a leaf node. They come in three kinds: no split points [and the
* findParttion returns a canned index], one split point [and we
* compare with a single comparand], or more than one [and we do a
* binary search]. The last case is rare.
*/
private TrieNode LeafTrieNodeFactory(int level,
BinaryComparable[] splitPoints, int lower, int upper) {
switch (upper - lower) {
case 0:
return new UnsplitTrieNode(level, lower);
case 1:
return new SinglySplitTrieNode(level, splitPoints, lower);
default:
return new LeafTrieNode(level, splitPoints, lower, upper);
}
}
/**
* A leaf trie node that scans for the key between lower..upper.
*
* We don't generate many of these now, since we usually continue trie-ing
* when more than one split point remains at this level. and we make
* different objects for nodes with 0 or 1 split point.
*/
private class LeafTrieNode extends TrieNode {
final int lower;
final int upper;
final BinaryComparable[] splitPoints;
LeafTrieNode(int level, BinaryComparable[] splitPoints, int lower, int upper) {
super(level);
this.lower = lower;
this.upper = upper;
this.splitPoints = splitPoints;
}
public int findPartition(BinaryComparable key) {
final int pos = Arrays.binarySearch(splitPoints, lower, upper, key) + 1;
return (pos < 0) ? -pos : pos;
}
}
private class UnsplitTrieNode extends TrieNode {
final int result;
UnsplitTrieNode(int level, int value) {
super(level);
this.result = value;
}
public int findPartition(BinaryComparable key) {
return result;
}
}
private class SinglySplitTrieNode extends TrieNode {
final int lower;
final BinaryComparable mySplitPoint;
SinglySplitTrieNode(int level, BinaryComparable[] splitPoints, int lower) {
super(level);
this.lower = lower;
this.mySplitPoint = splitPoints[lower];
}
public int findPartition(BinaryComparable key) {
return lower + (key.compareTo(mySplitPoint) < 0 ? 0 : 1);
}
}
/**
* Read the cut points from the given IFile.
*
* @param fs The file system
* @param p The path to read
* @param keyClass The map output key class
* @param job The job config
* @throws IOException
*/
// matching key types enforced by passing in
@SuppressWarnings("unchecked")
// map output key class
private K[] readPartitions(FileSystem fs, Path p, Class<K> keyClass, Configuration conf) throws IOException {
log.debug("readPartitions({}, {}, {}, {})", new Object[]{fs, p, keyClass.getSimpleName(), conf});
SequenceFile.Reader reader = new SequenceFile.Reader(fs, p, conf);
log.debug("SequenceFile.Reader is {}", reader);
log.debug("SequenceFile.Reader position is {}", reader.getPosition());
ArrayList<K> parts = new ArrayList<K>();
K key = ReflectionUtils.newInstance(keyClass, conf);
NullWritable value = NullWritable.get();
while (reader.next(key, value)) {
log.debug("Partition key {}", key);
parts.add(key);
key = ReflectionUtils.newInstance(keyClass, conf);
}
reader.close();
return parts.toArray((K[]) Array.newInstance(keyClass, parts.size()));
}
/**
*
* This object contains a TrieNodeRef if there is such a thing that can be
* repeated. Two adjacent trie node slots that contain no split points can
* be filled with the same trie node, even if they are not on the same
* level. See buildTreeRec, below.
*
*/
private class CarriedTrieNodeRef {
TrieNode content;
CarriedTrieNodeRef() {
content = null;
}
}
/**
* Given a sorted set of cut points, build a trie that will find the correct
* partition quickly.
*
* @param splits
* the list of cut points
* @param lower
* the lower bound of partitions 0..numPartitions-1
* @param upper
* the upper bound of partitions 0..numPartitions-1
* @param prefix
* the prefix that we have already checked against
* @param maxDepth
* the maximum depth we will build a trie for
* @return the trie node that will divide the splits correctly
*/
private TrieNode buildTrie(BinaryComparable[] splits, int lower, int upper, byte[] prefix, int maxDepth) {
return buildTrieRec(splits, lower, upper, prefix, maxDepth, new CarriedTrieNodeRef());
}
/**
* This is the core of buildTrie. The interface, and stub, above, just adds
* an empty CarriedTrieNodeRef.
*
* We build trie nodes in depth first order, which is also in key space
* order. Every leaf node is referenced as a slot in a parent internal node.
* If two adjacent slots [in the DFO] hold leaf nodes that have no split
* point, then they are not separated by a split point either, because
* there's no place in key space for that split point to exist.
*
* When that happens, the leaf nodes would be semantically identical, and we
* reuse the object. A single CarriedTrieNodeRef "ref" lives for the
* duration of the tree-walk. ref carries a potentially reusable, unsplit
* leaf node for such reuse until a leaf node with a split arises, which
* breaks the chain until we need to make a new unsplit leaf node.
*
* Note that this use of CarriedTrieNodeRef means that for internal nodes,
* for internal nodes if this code is modified in any way we still need to
* make or fill in the subnodes in key space order.
*/
private TrieNode buildTrieRec(BinaryComparable[] splits, int lower, int upper, byte[] prefix, int maxDepth, CarriedTrieNodeRef ref) {
final int depth = prefix.length;
// We generate leaves for a single split point as well as for
// no split points.
if (depth >= maxDepth || lower >= upper - 1) {
// If we have two consecutive requests for an unsplit trie node, we
// can deliver the same one the second time.
if (lower == upper && ref.content != null) {
return ref.content;
}
TrieNode result = LeafTrieNodeFactory(depth, splits, lower, upper);
ref.content = lower == upper ? result : null;
return result;
}
InnerTrieNode result = new InnerTrieNode(depth);
byte[] trial = Arrays.copyOf(prefix, prefix.length + 1);
// append an extra byte on to the prefix
int currentBound = lower;
for (int ch = 0; ch < 0xFF; ++ch) {
trial[depth] = (byte) (ch + 1);
lower = currentBound;
while (currentBound < upper) {
if (splits[currentBound].compareTo(trial, 0, trial.length) >= 0) {
break;
}
currentBound += 1;
}
trial[depth] = (byte) ch;
result.child[0xFF & ch] = buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref);
}
// pick up the rest
trial[depth] = (byte) 0xFF;
result.child[0xFF] = buildTrieRec(splits, lower, currentBound, trial, maxDepth, ref);
return result;
}
}