Package com.netflix.aegisthus.input

Source Code of com.netflix.aegisthus.input.AegisthusInputFormat

/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.netflix.aegisthus.input;

import java.io.BufferedInputStream;
import java.io.DataInputStream;
import java.io.IOException;
import java.util.List;
import java.util.Map;

import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.TypeParser;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.utils.Pair;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.netflix.aegisthus.input.AegSplit.Type;
import com.netflix.aegisthus.input.readers.CombineSSTableReader;
import com.netflix.aegisthus.input.readers.CommitLogRecordReader;
import com.netflix.aegisthus.input.readers.JsonRecordReader;
import com.netflix.aegisthus.input.readers.SSTableRecordReader;
import com.netflix.aegisthus.io.sstable.OffsetScanner;
import com.netflix.aegisthus.io.sstable.SSTableScanner;

/**
* The AegisthusInputFormat class handles creating splits and reading sstables,
* commitlogs and json.
*/
public class AegisthusInputFormat extends FileInputFormat<Text, Text> {
  private static final Log LOG = LogFactory.getLog(AegisthusInputFormat.class);
  public static final String COLUMN_TYPE = "aegisthus.columntype";
  public static final String KEY_TYPE = "aegisthus.keytype";
  @SuppressWarnings("rawtypes")
  protected Map<String, AbstractType> convertors;
  private List<FileStatus> overrideListStatus = null;

  @Override
  public RecordReader<Text, Text> createRecordReader(InputSplit inputSplit, TaskAttemptContext context) {
    AegSplit split = null;
    if (inputSplit instanceof AegCombinedSplit) {
      return new CombineSSTableReader();
    }
    split = (AegSplit) inputSplit;
    RecordReader<Text, Text> reader = null;
    switch (split.getType()) {
    case json:
      reader = new JsonRecordReader();
      break;
    case sstable:
      reader = new SSTableRecordReader();
      break;
    case commitlog:
      reader = new CommitLogRecordReader();
      break;
    }
    return reader;
  }

  @SuppressWarnings("rawtypes")
  private Map<String, AbstractType> initConvertors(JobContext job) throws IOException {
    Map<String, AbstractType> convertors = Maps.newHashMap();
    String conversion = job.getConfiguration().get(KEY_TYPE);
    LOG.info(KEY_TYPE + ": " + conversion);
    if (conversion != null) {
      try {
        convertors.put(SSTableScanner.KEY, TypeParser.parse(conversion));
      } catch (ConfigurationException e) {
        throw new IOException(e);
      } catch (SyntaxException e) {
        throw new IOException(e);
            }
    }
    conversion = job.getConfiguration().get(COLUMN_TYPE);
    LOG.info(COLUMN_TYPE + ": " + conversion);
    if (conversion != null) {
      try {
        convertors.put(SSTableScanner.COLUMN_NAME_KEY, TypeParser.parse(conversion));
      } catch (ConfigurationException e) {
        throw new IOException(e);
      } catch (SyntaxException e) {
        throw new IOException(e);
            }
    }

    if (convertors.size() == 0) {
      return null;
    }
    return convertors;
  }

  /**
   * The main thing that the addSSTableSplit handles is to split SSTables
   * using their index if available. The general algorithm is that if the file
   * is large than the blocksize plus some fuzzy factor to
   */
  public void addSSTableSplit(List<InputSplit> splits, JobContext job, FileStatus file) throws IOException {
    Path path = file.getPath();
    FileSystem fs = path.getFileSystem(job.getConfiguration());
    long length = file.getLen();
    BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0, length);
    if (length != 0) {
      long blockSize = file.getBlockSize();
      long maxSplitSize = (long) (blockSize * .99);
      long fuzzySplit = (long) (blockSize * 1.2);

      long bytesRemaining = length;

      Path compressionPath = new Path(path.getParent(), path.getName().replaceAll("-Data.db",
                                            "-CompressionInfo.db"));
      if (!fs.exists(compressionPath)) {
        OffsetScanner scanner = null;
        // Only initialize if we are going to have more than a single
        // split
        if (fuzzySplit < length) {
          Path indexPath = new Path(path.getParent(), path.getName().replaceAll("-Data.db", "-Index.db"));
          if (!fs.exists(indexPath)) {
            fuzzySplit = length;
          } else {
            FSDataInputStream fileIn = fs.open(indexPath);
            scanner = new OffsetScanner(new DataInputStream(new BufferedInputStream(fileIn)), indexPath.getName());
          }
        }
        long splitStart = 0;
        while (splitStart + fuzzySplit < length && scanner.hasNext()) {
          long splitSize = 0;
          // The scanner returns an offset from the start of the file.
          while (splitSize < maxSplitSize && scanner.hasNext()) {
                        Pair<Long, Long> pair = scanner.next();
                        splitSize = pair.left - splitStart;
          }
          int blkIndex = getBlockIndex(blkLocations, splitStart + (splitSize / 2));
          LOG.info("split path: " + path.getName() + ":" + splitStart + ":" + splitSize);
          splits
              .add(new AegSplitpath,
                        splitStart,
                        splitSize,
                        blkLocations[blkIndex].getHosts(),
                        convertors));
          bytesRemaining -= splitSize;
          splitStart += splitSize;
        }
        if (scanner != null ) {
          scanner.close();
        }
      }

      if (bytesRemaining != 0) {
        LOG.info("end path: " + path.getName() + ":" + (length - bytesRemaining) + ":" + bytesRemaining);
        splits.add(new AegSplit(path,
                    length - bytesRemaining,
                    bytesRemaining,
                    blkLocations[blkLocations.length - 1].getHosts(),
                    convertors,
                    fs.exists(compressionPath),
                    compressionPath));
      }
    } else {
      LOG.info("skipping zero length file: " + path.toString());
    }
  }

  @Override
  public List<InputSplit> getSplits(JobContext job) throws IOException {
    List<InputSplit> splits = Lists.newArrayList();
    List<FileStatus> files = listStatus(job);
    convertors = initConvertors(job);
    for (FileStatus file : files) {
      String name = file.getPath().getName();
      if (name.endsWith("-Data.db")) {
        addSSTableSplit(splits, job, file);
      } else if (name.startsWith("CommitLog")) {
        LOG.info(String.format("adding %s as a CommitLog split", file.getPath().toUri().toString()));
        BlockLocation[] blkLocations = file
            .getPath()
            .getFileSystem(job.getConfiguration())
            .getFileBlockLocations(file, 0, file.getLen());
        splits.add(new AegSplit(file.getPath(),
                    0,
                    file.getLen(),
                    blkLocations[0].getHosts(),
                    Type.commitlog,
                    convertors));
      } else {
        LOG.info(String.format("adding %s as a json split", file.getPath().toUri().toString()));
        BlockLocation[] blkLocations = file
            .getPath()
            .getFileSystem(job.getConfiguration())
            .getFileBlockLocations(file, 0, file.getLen());
        splits.add(new AegSplit(file.getPath(), 0, file.getLen(), blkLocations[0].getHosts(), Type.json));
      }
    }
    return splits;
  }

  /**
   * If we have been given a list of statuses to operate on, use those instead of falling back to the base method.
   */
  @Override
  protected List<FileStatus> listStatus(JobContext job) throws IOException {
    if (overrideListStatus == null) {
      return super.listStatus(job);
    } else {
      return overrideListStatus;
    }
  }

  /**
   * Instead of using the base listStatus, return this list of file statuses
   *
   * @param overrideListStatus the list of statuses to be returned for all future calls to this.listStatus()
   */
  public void setOverrideListStatus(List<FileStatus> overrideListStatus) {
    this.overrideListStatus = overrideListStatus;
  }
}
TOP

Related Classes of com.netflix.aegisthus.input.AegisthusInputFormat

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.