Package com.twitter.elephanttwin.lzo.retrieval

Source Code of com.twitter.elephanttwin.lzo.retrieval.LZOBlockOffsetMapper

/**
* Copyright 2012 Twitter, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program.  If not, see <http://www.gnu.org/licenses/>.
*/
package com.twitter.elephanttwin.lzo.retrieval;

import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map.Entry;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.log4j.Logger;

import com.hadoop.compression.lzo.LzoIndex;
import com.twitter.elephantbird.mapreduce.io.BinaryWritable;
import com.twitter.elephanttwin.indexing.BlockIndexingMapper;
import com.twitter.elephanttwin.io.LongPairWritable;
import com.twitter.elephanttwin.io.TextLongPairWritable;
import com.twitter.elephanttwin.retrieval.BlockIndexedFileInputFormat;

/**
* Used to create indexes of LZO-compressed files.
*
* This mapper outputs <searched_key_value <lzo-block-start-offset,
* lzo-block-end-offset>> for each unique key in this split processed by the
* mapper.
* The key generated by the Mapper consists of the ColumnValue and the start
* offset of the current input record.
* This is for secondary sorting at the reducer side to sort the results first
* on column values then on start offsets.
*/

public class LZOBlockOffsetMapper<M> extends BlockIndexingMapper<LongWritable, BinaryWritable<M>> {

  public static String CLASSNAME_CONF = "LZOBlockOffsetMapper.classname";
  public static String COLUMNNAME_CONF = "LZOBlockOffsetMapper.columnename";

  // we combine non consecutive lzo blocks within defaultGapSize bytes distance.
  public static String GAPSIZE_CONF = "LZOBlockOffsetMapper.allowedgapsize";

  private final static int defaultGapSize = 1024 * 512;

  /* when we combine multiple lzo blocks,
   * the combined size cannot exceed this limit.
   */
  private long maxBlockSize;

  private static final Logger LOG = Logger
      .getLogger(LZOBlockOffsetMapper.class);
  // values in the column to be indexed
  private String columnValue = null;

  // the output Value of the mapper, the same for all output generated from all
  // rows from the same LZO block
  // the start and end offset of LZO block to be sent to the reducer

  private LongPairWritable outputValue = new LongPairWritable();
  private long previousRowLineOffset; //the lineoffset reported by the previous row.

  // the output key by the mapper which contains the column value and the
  // start offset of the lzo block

  Method method;
  // to store unique values in each LZO Block
  HashMap<String, LongPairWritable> map = new HashMap<String, LongPairWritable>(2000);

  /*
   * this array contains start offsets of lzo blocks read by this mapper/split
   * constructed during setup for i>=0 lzoOffsets[i] contains the start offset
   * of the ith lzo block; lzoBlockPostions[i+1] is the end offset of the ith
   * lzo block;
   */
  long[] lzoOffsets;


  int currentLzoBlock = 0;

  // the correct ith lzo block the mapper is reading input from
  // validation: lzoOffsets[currentLzoBlock] == currentLzoBlockStartOffset &&
  // lzoOffsets[currentLzoBlock+1] == currentLzoBlockEndOffset
  int totalLZOBlocks = 0;
  // number of lzo blocks in this mapper/split. 1 +
  // totalLZOBlocks=lzoOffsets.length;
  long lastLZOBlockStartOffset; // in the whole file
  long fileSize; // the size of the whole file.

  int totalRowsInABlock;
  int gapsize;


  @Override
  protected void setup(
      Mapper<LongWritable, BinaryWritable<M>, TextLongPairWritable, LongPairWritable>.Context context)
          throws IOException, InterruptedException {

    long splitStart; // the start offset of the input split;
    long splitLength; // the length of the input split
    long splitEnd; // the last byte of this input split;


    FileSplit fileSplit = (FileSplit) context.getInputSplit();
    splitStart = fileSplit.getStart();
    splitLength = fileSplit.getLength();
    splitEnd = splitStart + splitLength;
    Configuration conf = context.getConfiguration();
    maxBlockSize = Math.max(conf.getLong("dfs.block.size", 256*1024*1024),
        splitLength);
    //we don't want to create more indexed splits than original splits,
    //the original split size could be more than dfs.block.size.

    gapsize = conf.getInt(GAPSIZE_CONF, defaultGapSize);
    LOG.info("gap size allowed to cobmine blocks is set:" + gapsize);
    String valueClassName = context.getConfiguration().get(CLASSNAME_CONF);

    Class<?> c = BlockIndexedFileInputFormat
        .getValueClassByName(valueClassName);
    // column =
    // c.getDeclaredField(context.getConfiguration().get(COLUMNNAME));
    String methodName;
    String columnName = context.getConfiguration().get(COLUMNNAME_CONF);
    methodName = BlockIndexedFileInputFormat.getCamelCaseMethodName(
        columnName, c);

    try {
      method = c.getMethod(methodName);
    } catch (Exception e) {
      LOG.error("cannot instantiate the column to be read", e);
      throw new IOException(e);
    }

    Path file = fileSplit.getPath();
    fileSize = file.getFileSystem(context.getConfiguration())
        .getFileStatus(file).getLen();

    LzoIndex lzoBlockIndex = LzoIndex.readIndex(
        file.getFileSystem(context.getConfiguration()), file);

    if (lzoBlockIndex.isEmpty()) {

      throw new IOException("No LZO index file exists for the input file "
          + file.toString() + " cannot index the input file");
    }

    int num_lzo_blocks = lzoBlockIndex.getNumberOfBlocks();
    lastLZOBlockStartOffset = lzoBlockIndex.getPosition(num_lzo_blocks - 1);

    LOG.info(context.getTaskAttemptID() + " splitStart= " + splitStart
        + " splitEnd=" + splitEnd + " splitLength=" + splitLength);
    LOG.info(context.getTaskAttemptID() + ":total LZOblocks in this file: "
        + num_lzo_blocks);

    // first loop to get the range of block offsets in lzoBlockIndex this mapper
    // is responsible for;
    int startPos = 0;
    int endPos = 0;

    boolean foundStartPos = false;
    boolean foundEndPos = false;
    for (int i = 0; i < num_lzo_blocks; i++) {
      long currentBlockOffset = lzoBlockIndex.getPosition(i);
      if (currentBlockOffset >= splitStart) {
        if (!foundStartPos) {
          startPos = i;
          foundStartPos = true;
        }
      }
      if (currentBlockOffset >= splitEnd) {
        if (!foundEndPos) {
          endPos = i;
          foundEndPos = true;
        }
      }

      if (foundStartPos && foundEndPos)
        break;
    }

    if (!foundEndPos) {
      endPos = num_lzo_blocks - 1;
      totalLZOBlocks = endPos - startPos + 1;
      // the last split, we need to copy from startPos to the end and additional
      // add the end of the file to the array lzoBlockOffset
    } else {
      if (endPos < num_lzo_blocks - 1)
        endPos++;
      if (endPos == num_lzo_blocks - 1) // treat as if it's the last split;
      {
        totalLZOBlocks = endPos - startPos + 1;
        foundEndPos = false;
      } else
        totalLZOBlocks = endPos - startPos;
    }

    // special treatment for the first lzoblock offset, due to the current
    // lzoindex implementation, we have to
    // use 0 for the first lzo block in any lzo compressed file though in fact
    // the actual start offset to the first lzoblock is not 0.
    // later we may consider to change the lzo related package to make sure all
    // lzo block start offsets are treated the same way.

    lzoOffsets = new long[totalLZOBlocks + 1];

    if (foundEndPos) {
      for (int i = 0; i <= totalLZOBlocks; i++)
        lzoOffsets[i] = lzoBlockIndex.getPosition(i + startPos);
    } else {
      // treat the last InputSplit differently
      if (LOG.isDebugEnabled()) {
        LOG.debug("read the last lzo block offset, add the file end offset to the last element in the index array");
      }
      for (int i = 0; i < totalLZOBlocks; i++)
        lzoOffsets[i] = lzoBlockIndex.getPosition(i + startPos);
      lzoOffsets[totalLZOBlocks] = fileSize;
    }

    if (splitStart == 0) {
      lzoOffsets[0] = 0;
    }

    currentLzoBlock = 0;
    outputValue.setFirst( lzoOffsets[0]);
    outputValue.setSecond(lzoOffsets[1]);
    previousRowLineOffset = lzoOffsets[0];

    if (LOG.isDebugEnabled()) {
      LOG.debug("lzoOffsets= " + Arrays.toString(lzoOffsets));
      LOG.debug("lzoOffsets # of elements:" + lzoOffsets.length);
    }
  }

  /*
   * For each key value, combine
   */
  @Override
  public void map(LongWritable key, BinaryWritable<M> value, Context context)
      throws IOException, InterruptedException {

   try {
      columnValue = (String) method.invoke(value.get());
    } catch (Exception e) {
      LOG.error("cannot instantiate the value class to read the input", e);
      throw new IOException(e);
    }

    if (LOG.isDebugEnabled()) {
      LOG.debug("key: " + key + "value: " + columnValue);
    }

    if (columnValue == null) {
      columnValue = "";
    }

    long lineOffset = key.get();

    // treat the first row specially.
    // workaround for the current way lzo readers produce key offset for the
    // first row.

    /*
     * Need to deal with the special case when the last inputsplit has
     * only the last lzoblock. This cannot happen in practice since HDFS block
     * contains thousands of lzo blocks and Hadoop has split_slope to make sure the last split
     * contains the same or more bytes than previous splits. But this could happen in test cases
     * where we manipulate hdfs block size and the size of lzo block size;
     */

    if (lzoOffsets.length > 2) {
      if (lineOffset == 0)
        lineOffset = lzoOffsets[1];

      if (lineOffset == previousRowLineOffset) {
        totalRowsInABlock++;
        //in the same lzo block, increase the cnt for the number of rows in
        //this block.
      } else {
        // a new lzo block, set up the outputValue to be sent to reducer.
        if (LOG.isDebugEnabled()) {
          LOG.debug("totalRowsInABlock is:" + totalRowsInABlock + " in [" +
              previousRowLineOffset + "," + lineOffset + "]");
        }

        totalRowsInABlock = 1;
        //for very long row which spans many lzoblocks, we need to advance
        //the currentLzoBlock multiple times.
        while ( currentLzoBlock <= totalLZOBlocks &&
            (lzoOffsets[currentLzoBlock] < lineOffset))
        currentLzoBlock++;


        /*
         * Logically the following should be the right way to index the lzo
         * blocks. However, due to the way the current lzo readers produce key
         * offset, we have to do some compensation in indexing in order to make it
         * work correctly.
         */
        /*
         * outputValue.setFirst(currentLzoBlockStartOffset);
         * outputValue.setSecond(currentLzoBlockEndOffset);
         * pair.setLong(currentLzoBlockStartOffset);
         */


        //the real start offset is either 2 blocks back or even further if
        //this row is long spanning across many lzo blocks.
        outputValue.setFirst(Math.min(previousRowLineOffset,
            lzoOffsets[Math.max(0, currentLzoBlock - 2)]));

        /* we need to treat the last (two) lzo block differently than any other
         * lzo block, due to the current lzo readers, because we cannot
         * distinguish a row from the last lzo block and a row from the second
         * to the last lzo block. the solution is to combine the last two lzo
         * blocks;
         */
        if (lineOffset >= lastLZOBlockStartOffset)
          outputValue.setSecond(fileSize);
        else
          outputValue.setSecond(lineOffset);

        if (LOG.isDebugEnabled()) {
          LOG.debug("outputValue:" + outputValue);
        }

        if (outputValue.getSecond() <= outputValue.getFirst() ){
          throw new RuntimeException("Index Block end offset is not more than start offset:");
        }
      }
    }
    if (!map.containsKey(columnValue)) {
      map.put(columnValue, new LongPairWritable(outputValue));
    } else {

      LongPairWritable prevPair = map.get(columnValue);

      if (prevPair.getSecond() > outputValue.getFirst() &&
          outputValue.getFirst() < prevPair.getFirst()) {
        throw new RuntimeException("error: overalapping index blocks at offset: " + key);
      }

      /*
       *if we can combine the two blocks, then combine, otherwise send the
       * previously stored index entry to reducer and store the new index entry.
       * Two conditions to be met to combine:
       *  a) "adjacent" to each other;
       *  b) combined size cannot be more than the threshold
      */
      if (prevPair.getSecond() + gapsize < outputValue.getFirst() ||
          outputValue.getSecond() - prevPair.getFirst() > maxBlockSize) {
        context.write(new TextLongPairWritable(new Text(columnValue), prevPair.getFirst()), prevPair);
        if (LOG.isDebugEnabled()) {
          LOG.debug("write to reducer: " + prevPair);
        }

        map.put(columnValue, new LongPairWritable(outputValue));
      } else {
        prevPair.setSecond(outputValue.getSecond());
        map.put(columnValue, prevPair);
      }
    }

    previousRowLineOffset = lineOffset;
  }

  @Override
  protected void cleanup(Context context) throws IOException,
  InterruptedException {

    if (LOG.isDebugEnabled()) {
      LOG.debug("totalRowsInABlock is:" + totalRowsInABlock + " for the last " +
          "lzoblock ended at"  +  previousRowLineOffset);
    }
    for (Entry<String, LongPairWritable> e : map.entrySet()) {
      LongPairWritable lp = e.getValue();
      context.write(new TextLongPairWritable( new Text(e.getKey()),lp.getFirst()),lp);
    }
    map.clear();
  }
}
TOP

Related Classes of com.twitter.elephanttwin.lzo.retrieval.LZOBlockOffsetMapper

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.