Package parallelai.spyglass.hbase

Source Code of parallelai.spyglass.hbase.HBaseRawScheme$ValueCopier

/*
* Copyright (c) 2009 Concurrent, Inc.
*
* This work has been released into the public domain
* by the copyright holder. This applies worldwide.
*
* In case this is not legally possible:
* The copyright holder grants any entity the right
* to use this work for any purpose, without any
* conditions, unless such conditions are required by law.
*/

package parallelai.spyglass.hbase;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashSet;

import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.RecordReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatValueCopier;
import com.twitter.elephantbird.mapred.input.DeprecatedInputFormatWrapper;

import cascading.flow.FlowProcess;
import cascading.scheme.Scheme;
import cascading.scheme.SinkCall;
import cascading.scheme.SourceCall;
import cascading.tap.Tap;
import cascading.tuple.Fields;
import cascading.tuple.Tuple;
import cascading.tuple.TupleEntry;
import cascading.util.Util;

/**
* It provides the wiring between Fields and Columns and Column families
* In effect to write to cf:column
*
* data:name data:surname address: street
*  name1      surname1       address1
*
* We will initialize the HBaseSource with
*   ("data","data","data")
*   ("name","surname","address")
*   Data:
*   ("name1","surname1","address1")
*   ...
*
* The HBaseRawScheme class is a {@link Scheme} subclass. It is used in conjunction
* with the {@HBaseRawTap} to allow for the reading and writing of data
* to and from a HBase cluster.
*
* @see HBaseRawTap
*/
@SuppressWarnings({ "rawtypes", "deprecation" })
public class HBaseRawScheme extends Scheme<JobConf, RecordReader, OutputCollector, Object[], Object[]> {
  /**
   *
   */
  private static final long serialVersionUID = 6248976486883281356L;

  /** Field LOG */
  private static final Logger LOG = LoggerFactory.getLogger(HBaseRawScheme.class);

  public final Fields RowKeyField = new Fields("rowkey");
  public final Fields RowField = new Fields("row");

  /** String familyNames */
  private String[] familyNames;

  private boolean writeNulls = true;

  /**
   * Constructor HBaseScheme creates a new HBaseScheme instance.
   * @param familyName
   *            of type String
   */
  public HBaseRawScheme(String familyName) {
    this(new String[] { familyName });
  }

  public HBaseRawScheme(String[] familyNames) {
    this.familyNames = familyNames;
    setSourceFields();
  }

  public HBaseRawScheme(String familyName, boolean writeNulls) {
    this(new String[] { familyName }, writeNulls);
  }

  public HBaseRawScheme(String[] familyNames, boolean writeNulls) {
    this.familyNames = familyNames;
    this.writeNulls = writeNulls;
    setSourceFields();
  }

  private void setSourceFields() {
    Fields sourceFields = Fields.join(RowKeyField, RowField);
    setSourceFields(sourceFields);
  }

  /**
   * Method getFamilyNames returns the set of familyNames of this HBaseScheme
   * object.
   *
   * @return the familyNames (type String[]) of this HBaseScheme object.
   */
  public String[] getFamilyNames() {
    HashSet<String> familyNameSet = new HashSet<String>();
    if (familyNames != null) {
      for (String familyName : familyNames) {
        familyNameSet.add(familyName);
      }
    }
    return familyNameSet.toArray(new String[0]);
  }

  @Override
  public void sourcePrepare(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) {
    Object[] pair = new Object[] { sourceCall.getInput().createKey(), sourceCall.getInput().createValue() };

    sourceCall.setContext(pair);
  }

  @Override
  public void sourceCleanup(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall) {
    sourceCall.setContext(null);
  }

  @SuppressWarnings("unchecked")
  @Override
  public boolean source(FlowProcess<JobConf> flowProcess, SourceCall<Object[], RecordReader> sourceCall)
      throws IOException {
    Tuple result = new Tuple();

    Object key = sourceCall.getContext()[0];
    Object value = sourceCall.getContext()[1];
    boolean hasNext = sourceCall.getInput().next(key, value);
    if (!hasNext) {
      return false;
    }

    // Skip nulls
    if (key == null || value == null) {
      return true;
    }

    ImmutableBytesWritable keyWritable = (ImmutableBytesWritable) key;
    Result row = (Result) value;
    result.add(keyWritable);
    result.add(row);
    sourceCall.getIncomingEntry().setTuple(result);
    return true;
  }

  @SuppressWarnings("unchecked")
  @Override
  public void sink(FlowProcess<JobConf> flowProcess, SinkCall<Object[], OutputCollector> sinkCall) throws IOException {
    TupleEntry tupleEntry = sinkCall.getOutgoingEntry();
    OutputCollector outputCollector = sinkCall.getOutput();
    Tuple key = tupleEntry.selectTuple(RowKeyField);
    Object okey = key.getObject(0);
    ImmutableBytesWritable keyBytes = getBytes(okey);
    Put put = new Put(keyBytes.get());
    Fields outFields = tupleEntry.getFields().subtract(RowKeyField);
    if (null != outFields) {
      TupleEntry values = tupleEntry.selectEntry(outFields);
      for (int n = 0; n < values.getFields().size(); n++) {
        Object o = values.get(n);
        ImmutableBytesWritable valueBytes = getBytes(o);
        Comparable field = outFields.get(n);
        ColumnName cn = parseColumn((String) field);
        if (null == cn.family) {
          if (n >= familyNames.length)
            cn.family = familyNames[familyNames.length - 1];
          else
            cn.family = familyNames[n];
        }
        if (null != o || writeNulls)
          put.add(Bytes.toBytes(cn.family), Bytes.toBytes(cn.name), valueBytes.get());
      }
    }
    outputCollector.collect(null, put);
  }

  private ImmutableBytesWritable getBytes(Object obj) {
    if (null == obj)
      return new ImmutableBytesWritable(new byte[0]);
    if (obj instanceof ImmutableBytesWritable)
      return (ImmutableBytesWritable) obj;
    else if (obj instanceof String)
      return new ImmutableBytesWritable(Bytes.toBytes((String) obj));
    else if (obj instanceof Long)
      return new ImmutableBytesWritable(Bytes.toBytes((Long) obj));
    else if (obj instanceof Integer)
      return new ImmutableBytesWritable(Bytes.toBytes((Integer) obj));
    else if (obj instanceof Short)
      return new ImmutableBytesWritable(Bytes.toBytes((Short) obj));
    else if (obj instanceof Boolean)
      return new ImmutableBytesWritable(Bytes.toBytes((Boolean) obj));
    else if (obj instanceof Double)
      return new ImmutableBytesWritable(Bytes.toBytes((Double) obj));
    else
      throw new IllegalArgumentException("cannot convert object to ImmutableBytesWritable, class="
          + obj.getClass().getName());
  }

  private ColumnName parseColumn(String column) {
    ColumnName ret = new ColumnName();
    int pos = column.indexOf(":");
    if (pos > 0) {
      ret.name = column.substring(pos + 1);
      ret.family = column.substring(0, pos);
    } else {
      ret.name = column;
    }
    return ret;
  }

  private class ColumnName {
    String family;
    String name;

    ColumnName() {
    }
  }

  @Override
  public void sinkConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap, JobConf conf) {
    conf.setOutputFormat(TableOutputFormat.class);
    conf.setOutputKeyClass(ImmutableBytesWritable.class);
    conf.setOutputValueClass(Put.class);
  }

  @Override
  public void sourceConfInit(FlowProcess<JobConf> process, Tap<JobConf, RecordReader, OutputCollector> tap,
      JobConf conf) {

    DeprecatedInputFormatWrapper.setInputFormat(org.apache.hadoop.hbase.mapreduce.TableInputFormat.class, conf,
        ValueCopier.class);
    if (null != familyNames) {
      String columns = Util.join(this.familyNames, " ");
      LOG.debug("sourcing from column families: {}", columns);
      conf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.SCAN_COLUMNS, columns);
    }
  }

  @Override
  public boolean equals(Object object) {
    if (this == object) {
      return true;
    }
    if (object == null || getClass() != object.getClass()) {
      return false;
    }
    if (!super.equals(object)) {
      return false;
    }

    HBaseRawScheme that = (HBaseRawScheme) object;

    if (!Arrays.equals(familyNames, that.familyNames)) {
      return false;
    }
    return true;
  }

  @Override
  public int hashCode() {
    int result = super.hashCode();
    result = 31 * result + (familyNames != null ? Arrays.hashCode(familyNames) : 0);
    return result;
  }

  public static class ValueCopier implements DeprecatedInputFormatValueCopier<Result> {

    public ValueCopier() {
    }

    public void copyValue(Result oldValue, Result newValue) {
      if (null != oldValue && null != newValue) {
        oldValue.copyFrom(newValue);
      }
    }

  }
}
TOP

Related Classes of parallelai.spyglass.hbase.HBaseRawScheme$ValueCopier

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.