Package backtype.storm.contrib.hbase.utils

Source Code of backtype.storm.contrib.hbase.utils.TridentConfig

package backtype.storm.contrib.hbase.utils;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.util.Bytes;

import storm.trident.state.JSONNonTransactionalSerializer;
import storm.trident.state.JSONOpaqueSerializer;
import storm.trident.state.JSONTransactionalSerializer;
import storm.trident.state.Serializer;
import storm.trident.state.StateType;
import storm.trident.tuple.TridentTuple;

import com.esotericsoftware.minlog.Log;

/**
* Configuration for Storm Trident state persistence in HBase
* @param <T>
*/
@SuppressWarnings("serial")
public class TridentConfig<T> extends TupleTableConfig {
  @SuppressWarnings("rawtypes")
  public static final Map<StateType, Serializer> DEFAULT_SERIALZERS =
      new HashMap<StateType, Serializer>() {
        {
          put(StateType.NON_TRANSACTIONAL, new JSONNonTransactionalSerializer());
          put(StateType.TRANSACTIONAL, new JSONTransactionalSerializer());
          put(StateType.OPAQUE, new JSONOpaqueSerializer());
        }
      };

  private int stateCacheSize = 1000;
  private Serializer<T> stateSerializer;

  public TridentConfig(String table, String rowKeyField) {
    super(table, rowKeyField);
  }

  public TridentConfig(final String table, final String rowKeyField, final String timestampField) {
    super(table, rowKeyField, timestampField);
  }

  /**
   * Creates a HBase {@link Put} from a Storm {@link TridentTuple}
   * @param tuple The {@link TridentTuple}
   * @return {@link Put}
   */
  public Put getPutFromTridentTuple(final TridentTuple tuple) {
    byte[] rowKey = Bytes.toBytes(tuple.getStringByField(tupleRowKeyField));

    long ts = 0;
    if (!tupleTimestampField.equals("")) {
      ts = tuple.getLongByField(tupleTimestampField);
    }

    Put p = new Put(rowKey);
    p.setWriteToWAL(writeToWAL);

    if (columnFamilies.size() > 0) {
      for (String cf : columnFamilies.keySet()) {
        byte[] cfBytes = Bytes.toBytes(cf);
        for (String cq : columnFamilies.get(cf)) {
          byte[] cqBytes = Bytes.toBytes(cq);
          byte[] val = Bytes.toBytes(tuple.getStringByField(cq));

          if (ts > 0) {
            p.add(cfBytes, cqBytes, ts, val);
          } else {
            p.add(cfBytes, cqBytes, val);
          }
        }
      }
    }

    return p;
  }

  /**
   * Creates a HBase {@link Get} from a Storm {@link TridentTuple}
   * @param tuple The {@link TridentTuple}
   * @return {@link Get}
   */
  public Get getGetFromTridentTuple(final TridentTuple tuple) {
    byte[] rowKey = Bytes.toBytes(tuple.getStringByField(tupleRowKeyField));

    long ts = 0;
    if (!tupleTimestampField.equals("")) {
      ts = tuple.getLongByField(tupleTimestampField);
    }

    Get g = new Get(rowKey);

    if (columnFamilies.size() > 0) {
      for (String cf : columnFamilies.keySet()) {
        byte[] cfBytes = Bytes.toBytes(cf);
        for (String cq : columnFamilies.get(cf)) {
          byte[] cqBytes = Bytes.toBytes(cq);
          g.addColumn(cfBytes, cqBytes);
          try {
            g.setMaxVersions(1);
          } catch (IOException e) {
            Log.error("Invalid number of versions", e);
          }
          if (ts > 0) {
            g.setTimeStamp(ts);
          }
        }
      }
    }

    return g;
  }

  /**
   * @return The size of the least-recently-used (LRU) cache. <b>Default is 1000
   */
  public int getStateCacheSize() {
    return stateCacheSize;
  }

  /**
   * @param stateCacheSize Sets the size of the least-recently-used (LRU) cache. <b>Default is 1000
   */
  public void setStateCacheSize(int stateCacheSize) {
    this.stateCacheSize = stateCacheSize;
  }

  /**
   * @return The {@link Serializer} used for persisting Trident state to HBase
   */
  public Serializer<T> getStateSerializer() {
    return stateSerializer;
  }

  /**
   * @param stateSerializer Set the {@link Serializer} to use for persisting Trident state to HBase
   */
  public void setStateSerializer(Serializer<T> stateSerializer) {
    this.stateSerializer = stateSerializer;
  }
}
TOP

Related Classes of backtype.storm.contrib.hbase.utils.TridentConfig

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.