Package org.commoncrawl.hadoop.mapred

Source Code of org.commoncrawl.hadoop.mapred.ArcRecordReader

package org.commoncrawl.hadoop.mapred;

import java.io.BufferedReader;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;

import java.lang.Math;
import java.lang.StringBuffer;
import java.util.Arrays;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.RecordReader;

import org.apache.log4j.Logger;

import org.commoncrawl.compressors.gzip.GzipCompressorInputStream;

/**
* Reads ARC records.
*
* Set "io.file.buffer.size" to define the amount of data that should be
* buffered from S3.
*/
public class ArcRecordReader
    implements RecordReader<Text, ArcRecord> {

  private static final Logger LOG = Logger.getLogger(ArcRecordReader.class);

  private FSDataInputStream         _fsin;
  private GzipCompressorInputStream _gzip;
  private long                      _fileLength;

  /**
   *
   */
  public ArcRecordReader(Configuration job, FileSplit split)
      throws IOException {

    if (split.getStart() != 0) {
      IOException ex = new IOException("Invalid ARC file split start " + split.getStart() + ": ARC files are not splittable");
      LOG.error(ex.getMessage());
      throw ex;
    }

    // open the file and seek to the start of the split
    final Path file = split.getPath();

    FileSystem fs = file.getFileSystem(job);

    this._fsin = fs.open(file);

    // create a GZIP stream that *does not* automatically read through members
    this._gzip = new GzipCompressorInputStream(this._fsin, false);

    this._fileLength = fs.getFileStatus(file).getLen();

    // First record should be an ARC file header record.  Skip it.
    this._skipRecord();
  }

  /**
   * Skips the current record, and advances to the next GZIP member.
   */
  private void _skipRecord()
      throws IOException {

    long n = 0;

    do {
      n = this._gzip.skip(999999999);
    }
    while (n > 0);

    this._gzip.nextMember();
  }
 
  /**
   * @inheritDoc
   */
  public Text createKey() {
    return new Text();
  }
 
  /**
   * @inheritDoc
   */
  public ArcRecord createValue() {
    return new ArcRecord();
  }

  private static byte[] _checkBuffer = new byte[64];

  /**
   *
   */
  public synchronized boolean next(Text key, ArcRecord value)
      throws IOException {

    boolean isValid = true;
   
    // try reading an ARC record from the stream
    try {
      isValid = value.readFrom(this._gzip);
    }
    catch (EOFException ex) {
      return false;
    }

    // if the record is not valid, skip it
    if (isValid == false) {
      LOG.error("Invalid ARC record found at GZIP position "+this._gzip.getBytesRead()+".  Skipping ...");
      this._skipRecord();
      return true;
    }

    if (value.getURL() != null)
      key.set(value.getURL());

    // check to make sure we've reached the end of the GZIP member
    int n = this._gzip.read(_checkBuffer, 0, 64);

    if (n != -1) {
      LOG.error(n+"  bytes of unexpected content found at end of ARC record.  Skipping ...");
      this._skipRecord();
    }
    else {
      this._gzip.nextMember();
    }
  
    return true;
  }

  /**
   * @inheritDoc
   */
  public float getProgress()
      throws IOException {
    return Math.min(1.0f, this._gzip.getBytesRead() / (float) this._fileLength);
  }
 
  /**
   * @inheritDoc
   */
  public synchronized long getPos()
      throws IOException {
    return this._gzip.getBytesRead();
  }

  /**
   * @inheritDoc
   */
  public synchronized void close()
      throws IOException {

    if (this._gzip != null)
      this._gzip.close();
  }

}
TOP

Related Classes of org.commoncrawl.hadoop.mapred.ArcRecordReader

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.