/**
* Copyright 2012 Twitter, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
package com.twitter.elephanttwin.lzo.retrieval;
import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map.Entry;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.log4j.Logger;
import com.hadoop.compression.lzo.LzoIndex;
import com.twitter.elephantbird.mapreduce.io.BinaryWritable;
import com.twitter.elephanttwin.indexing.BlockIndexingMapper;
import com.twitter.elephanttwin.io.LongPairWritable;
import com.twitter.elephanttwin.io.TextLongPairWritable;
import com.twitter.elephanttwin.retrieval.BlockIndexedFileInputFormat;
/**
* Used to create indexes of LZO-compressed files.
*
* This mapper outputs <searched_key_value <lzo-block-start-offset,
* lzo-block-end-offset>> for each unique key in this split processed by the
* mapper.
* The key generated by the Mapper consists of the ColumnValue and the start
* offset of the current input record.
* This is for secondary sorting at the reducer side to sort the results first
* on column values then on start offsets.
*/
public class LZOBlockOffsetMapper<M> extends BlockIndexingMapper<LongWritable, BinaryWritable<M>> {
public static String CLASSNAME_CONF = "LZOBlockOffsetMapper.classname";
public static String COLUMNNAME_CONF = "LZOBlockOffsetMapper.columnename";
// we combine non consecutive lzo blocks within defaultGapSize bytes distance.
public static String GAPSIZE_CONF = "LZOBlockOffsetMapper.allowedgapsize";
private final static int defaultGapSize = 1024 * 512;
/* when we combine multiple lzo blocks,
* the combined size cannot exceed this limit.
*/
private long maxBlockSize;
private static final Logger LOG = Logger
.getLogger(LZOBlockOffsetMapper.class);
// values in the column to be indexed
private String columnValue = null;
// the output Value of the mapper, the same for all output generated from all
// rows from the same LZO block
// the start and end offset of LZO block to be sent to the reducer
private LongPairWritable outputValue = new LongPairWritable();
private long previousRowLineOffset; //the lineoffset reported by the previous row.
// the output key by the mapper which contains the column value and the
// start offset of the lzo block
Method method;
// to store unique values in each LZO Block
HashMap<String, LongPairWritable> map = new HashMap<String, LongPairWritable>(2000);
/*
* this array contains start offsets of lzo blocks read by this mapper/split
* constructed during setup for i>=0 lzoOffsets[i] contains the start offset
* of the ith lzo block; lzoBlockPostions[i+1] is the end offset of the ith
* lzo block;
*/
long[] lzoOffsets;
int currentLzoBlock = 0;
// the correct ith lzo block the mapper is reading input from
// validation: lzoOffsets[currentLzoBlock] == currentLzoBlockStartOffset &&
// lzoOffsets[currentLzoBlock+1] == currentLzoBlockEndOffset
int totalLZOBlocks = 0;
// number of lzo blocks in this mapper/split. 1 +
// totalLZOBlocks=lzoOffsets.length;
long lastLZOBlockStartOffset; // in the whole file
long fileSize; // the size of the whole file.
int totalRowsInABlock;
int gapsize;
@Override
protected void setup(
Mapper<LongWritable, BinaryWritable<M>, TextLongPairWritable, LongPairWritable>.Context context)
throws IOException, InterruptedException {
long splitStart; // the start offset of the input split;
long splitLength; // the length of the input split
long splitEnd; // the last byte of this input split;
FileSplit fileSplit = (FileSplit) context.getInputSplit();
splitStart = fileSplit.getStart();
splitLength = fileSplit.getLength();
splitEnd = splitStart + splitLength;
Configuration conf = context.getConfiguration();
maxBlockSize = Math.max(conf.getLong("dfs.block.size", 256*1024*1024),
splitLength);
//we don't want to create more indexed splits than original splits,
//the original split size could be more than dfs.block.size.
gapsize = conf.getInt(GAPSIZE_CONF, defaultGapSize);
LOG.info("gap size allowed to cobmine blocks is set:" + gapsize);
String valueClassName = context.getConfiguration().get(CLASSNAME_CONF);
Class<?> c = BlockIndexedFileInputFormat
.getValueClassByName(valueClassName);
// column =
// c.getDeclaredField(context.getConfiguration().get(COLUMNNAME));
String methodName;
String columnName = context.getConfiguration().get(COLUMNNAME_CONF);
methodName = BlockIndexedFileInputFormat.getCamelCaseMethodName(
columnName, c);
try {
method = c.getMethod(methodName);
} catch (Exception e) {
LOG.error("cannot instantiate the column to be read", e);
throw new IOException(e);
}
Path file = fileSplit.getPath();
fileSize = file.getFileSystem(context.getConfiguration())
.getFileStatus(file).getLen();
LzoIndex lzoBlockIndex = LzoIndex.readIndex(
file.getFileSystem(context.getConfiguration()), file);
if (lzoBlockIndex.isEmpty()) {
throw new IOException("No LZO index file exists for the input file "
+ file.toString() + " cannot index the input file");
}
int num_lzo_blocks = lzoBlockIndex.getNumberOfBlocks();
lastLZOBlockStartOffset = lzoBlockIndex.getPosition(num_lzo_blocks - 1);
LOG.info(context.getTaskAttemptID() + " splitStart= " + splitStart
+ " splitEnd=" + splitEnd + " splitLength=" + splitLength);
LOG.info(context.getTaskAttemptID() + ":total LZOblocks in this file: "
+ num_lzo_blocks);
// first loop to get the range of block offsets in lzoBlockIndex this mapper
// is responsible for;
int startPos = 0;
int endPos = 0;
boolean foundStartPos = false;
boolean foundEndPos = false;
for (int i = 0; i < num_lzo_blocks; i++) {
long currentBlockOffset = lzoBlockIndex.getPosition(i);
if (currentBlockOffset >= splitStart) {
if (!foundStartPos) {
startPos = i;
foundStartPos = true;
}
}
if (currentBlockOffset >= splitEnd) {
if (!foundEndPos) {
endPos = i;
foundEndPos = true;
}
}
if (foundStartPos && foundEndPos)
break;
}
if (!foundEndPos) {
endPos = num_lzo_blocks - 1;
totalLZOBlocks = endPos - startPos + 1;
// the last split, we need to copy from startPos to the end and additional
// add the end of the file to the array lzoBlockOffset
} else {
if (endPos < num_lzo_blocks - 1)
endPos++;
if (endPos == num_lzo_blocks - 1) // treat as if it's the last split;
{
totalLZOBlocks = endPos - startPos + 1;
foundEndPos = false;
} else
totalLZOBlocks = endPos - startPos;
}
// special treatment for the first lzoblock offset, due to the current
// lzoindex implementation, we have to
// use 0 for the first lzo block in any lzo compressed file though in fact
// the actual start offset to the first lzoblock is not 0.
// later we may consider to change the lzo related package to make sure all
// lzo block start offsets are treated the same way.
lzoOffsets = new long[totalLZOBlocks + 1];
if (foundEndPos) {
for (int i = 0; i <= totalLZOBlocks; i++)
lzoOffsets[i] = lzoBlockIndex.getPosition(i + startPos);
} else {
// treat the last InputSplit differently
if (LOG.isDebugEnabled()) {
LOG.debug("read the last lzo block offset, add the file end offset to the last element in the index array");
}
for (int i = 0; i < totalLZOBlocks; i++)
lzoOffsets[i] = lzoBlockIndex.getPosition(i + startPos);
lzoOffsets[totalLZOBlocks] = fileSize;
}
if (splitStart == 0) {
lzoOffsets[0] = 0;
}
currentLzoBlock = 0;
outputValue.setFirst( lzoOffsets[0]);
outputValue.setSecond(lzoOffsets[1]);
previousRowLineOffset = lzoOffsets[0];
if (LOG.isDebugEnabled()) {
LOG.debug("lzoOffsets= " + Arrays.toString(lzoOffsets));
LOG.debug("lzoOffsets # of elements:" + lzoOffsets.length);
}
}
/*
* For each key value, combine
*/
@Override
public void map(LongWritable key, BinaryWritable<M> value, Context context)
throws IOException, InterruptedException {
try {
columnValue = (String) method.invoke(value.get());
} catch (Exception e) {
LOG.error("cannot instantiate the value class to read the input", e);
throw new IOException(e);
}
if (LOG.isDebugEnabled()) {
LOG.debug("key: " + key + "value: " + columnValue);
}
if (columnValue == null) {
columnValue = "";
}
long lineOffset = key.get();
// treat the first row specially.
// workaround for the current way lzo readers produce key offset for the
// first row.
/*
* Need to deal with the special case when the last inputsplit has
* only the last lzoblock. This cannot happen in practice since HDFS block
* contains thousands of lzo blocks and Hadoop has split_slope to make sure the last split
* contains the same or more bytes than previous splits. But this could happen in test cases
* where we manipulate hdfs block size and the size of lzo block size;
*/
if (lzoOffsets.length > 2) {
if (lineOffset == 0)
lineOffset = lzoOffsets[1];
if (lineOffset == previousRowLineOffset) {
totalRowsInABlock++;
//in the same lzo block, increase the cnt for the number of rows in
//this block.
} else {
// a new lzo block, set up the outputValue to be sent to reducer.
if (LOG.isDebugEnabled()) {
LOG.debug("totalRowsInABlock is:" + totalRowsInABlock + " in [" +
previousRowLineOffset + "," + lineOffset + "]");
}
totalRowsInABlock = 1;
//for very long row which spans many lzoblocks, we need to advance
//the currentLzoBlock multiple times.
while ( currentLzoBlock <= totalLZOBlocks &&
(lzoOffsets[currentLzoBlock] < lineOffset))
currentLzoBlock++;
/*
* Logically the following should be the right way to index the lzo
* blocks. However, due to the way the current lzo readers produce key
* offset, we have to do some compensation in indexing in order to make it
* work correctly.
*/
/*
* outputValue.setFirst(currentLzoBlockStartOffset);
* outputValue.setSecond(currentLzoBlockEndOffset);
* pair.setLong(currentLzoBlockStartOffset);
*/
//the real start offset is either 2 blocks back or even further if
//this row is long spanning across many lzo blocks.
outputValue.setFirst(Math.min(previousRowLineOffset,
lzoOffsets[Math.max(0, currentLzoBlock - 2)]));
/* we need to treat the last (two) lzo block differently than any other
* lzo block, due to the current lzo readers, because we cannot
* distinguish a row from the last lzo block and a row from the second
* to the last lzo block. the solution is to combine the last two lzo
* blocks;
*/
if (lineOffset >= lastLZOBlockStartOffset)
outputValue.setSecond(fileSize);
else
outputValue.setSecond(lineOffset);
if (LOG.isDebugEnabled()) {
LOG.debug("outputValue:" + outputValue);
}
if (outputValue.getSecond() <= outputValue.getFirst() ){
throw new RuntimeException("Index Block end offset is not more than start offset:");
}
}
}
if (!map.containsKey(columnValue)) {
map.put(columnValue, new LongPairWritable(outputValue));
} else {
LongPairWritable prevPair = map.get(columnValue);
if (prevPair.getSecond() > outputValue.getFirst() &&
outputValue.getFirst() < prevPair.getFirst()) {
throw new RuntimeException("error: overalapping index blocks at offset: " + key);
}
/*
*if we can combine the two blocks, then combine, otherwise send the
* previously stored index entry to reducer and store the new index entry.
* Two conditions to be met to combine:
* a) "adjacent" to each other;
* b) combined size cannot be more than the threshold
*/
if (prevPair.getSecond() + gapsize < outputValue.getFirst() ||
outputValue.getSecond() - prevPair.getFirst() > maxBlockSize) {
context.write(new TextLongPairWritable(new Text(columnValue), prevPair.getFirst()), prevPair);
if (LOG.isDebugEnabled()) {
LOG.debug("write to reducer: " + prevPair);
}
map.put(columnValue, new LongPairWritable(outputValue));
} else {
prevPair.setSecond(outputValue.getSecond());
map.put(columnValue, prevPair);
}
}
previousRowLineOffset = lineOffset;
}
@Override
protected void cleanup(Context context) throws IOException,
InterruptedException {
if (LOG.isDebugEnabled()) {
LOG.debug("totalRowsInABlock is:" + totalRowsInABlock + " for the last " +
"lzoblock ended at" + previousRowLineOffset);
}
for (Entry<String, LongPairWritable> e : map.entrySet()) {
LongPairWritable lp = e.getValue();
context.write(new TextLongPairWritable( new Text(e.getKey()),lp.getFirst()),lp);
}
map.clear();
}
}