Package mil.nga.giat.geowave.accumulo.mapreduce.input

Source Code of mil.nga.giat.geowave.accumulo.mapreduce.input.GeoWaveInputFormat$IntermediateSplitInfo$IndexRangeLocation

package mil.nga.giat.geowave.accumulo.mapreduce.input;

import java.io.IOException;
import java.math.BigInteger;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeSet;

import mil.nga.giat.geowave.accumulo.AccumuloOperations;
import mil.nga.giat.geowave.accumulo.mapreduce.GeoWaveConfiguratorBase;
import mil.nga.giat.geowave.accumulo.mapreduce.JobContextAdapterStore;
import mil.nga.giat.geowave.accumulo.mapreduce.JobContextIndexStore;
import mil.nga.giat.geowave.accumulo.mapreduce.input.GeoWaveInputFormat.IntermediateSplitInfo.RangeLocationPair;
import mil.nga.giat.geowave.accumulo.metadata.AccumuloIndexStore;
import mil.nga.giat.geowave.accumulo.util.AccumuloUtils;
import mil.nga.giat.geowave.index.NumericIndexStrategy;
import mil.nga.giat.geowave.index.sfc.data.MultiDimensionalNumericData;
import mil.nga.giat.geowave.store.adapter.DataAdapter;
import mil.nga.giat.geowave.store.index.Index;
import mil.nga.giat.geowave.store.query.DistributableQuery;

import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Instance;
import org.apache.accumulo.core.client.TableDeletedException;
import org.apache.accumulo.core.client.TableNotFoundException;
import org.apache.accumulo.core.client.TableOfflineException;
import org.apache.accumulo.core.client.impl.Tables;
import org.apache.accumulo.core.client.impl.TabletLocator;
import org.apache.accumulo.core.client.mapreduce.lib.util.ConfiguratorBase;
import org.apache.accumulo.core.client.mock.MockInstance;
import org.apache.accumulo.core.client.security.tokens.PasswordToken;
import org.apache.accumulo.core.data.ArrayByteSequence;
import org.apache.accumulo.core.data.ByteSequence;
import org.apache.accumulo.core.data.Key;
import org.apache.accumulo.core.data.KeyExtent;
import org.apache.accumulo.core.data.Range;
import org.apache.accumulo.core.master.state.tables.TableState;
import org.apache.accumulo.core.security.Credentials;
import org.apache.accumulo.core.util.UtilWaitThread;
import org.apache.commons.collections.IteratorUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

// @formatter:off
/*if[ACCUMULO_1.5.1]
import java.io.ByteArrayOutputStream;
import java.io.DataOutputStream;
import java.nio.ByteBuffer;
import org.apache.accumulo.core.security.thrift.TCredentials;
end[ACCUMULO_1.5.1]*/
// @formatter:on

public class GeoWaveInputFormat extends
    InputFormat<GeoWaveInputKey, Object>
{
  private static final Class<?> CLASS = GeoWaveInputFormat.class;
  protected static final Logger LOGGER = Logger.getLogger(CLASS);
  private static final BigInteger TWO = BigInteger.valueOf(2L);

  /**
   * Configures a {@link AccumuloOperations} for this job.
   *
   * @param job
   *            the Hadoop job instance to be configured
   * @param zooKeepers
   *            a comma-separated list of zookeeper servers
   * @param instanceName
   *            the Accumulo instance name
   * @param userName
   *            the Accumulo user name
   * @param password
   *            the Accumulo password
   * @param geowaveTableNamespace
   *            the GeoWave table namespace
   */
  public static void setAccumuloOperationsInfo(
      final Job job,
      final String zooKeepers,
      final String instanceName,
      final String userName,
      final String password,
      final String geowaveTableNamespace ) {
    GeoWaveConfiguratorBase.setZookeeperUrl(
        CLASS,
        job,
        zooKeepers);
    GeoWaveConfiguratorBase.setInstanceName(
        CLASS,
        job,
        instanceName);
    GeoWaveConfiguratorBase.setUserName(
        CLASS,
        job,
        userName);
    GeoWaveConfiguratorBase.setPassword(
        CLASS,
        job,
        password);
    GeoWaveConfiguratorBase.setTableNamespace(
        CLASS,
        job,
        geowaveTableNamespace);
  }

  public static void addDataAdapter(
      final Job job,
      final DataAdapter<?> adapter ) {
    JobContextAdapterStore.addDataAdapter(
        job,
        adapter);
  }

  public static void addIndex(
      final Job job,
      final Index index ) {
    JobContextIndexStore.addIndex(
        job,
        index);
  }

  public static void setMinimumSplitCount(
      final Job job,
      final Integer minSplits ) {
    GeoWaveInputConfigurator.setMinimumSplitCount(
        CLASS,
        job,
        minSplits);
  }

  public static void setMaximumSplitCount(
      final Job job,
      final Integer maxSplits ) {
    GeoWaveInputConfigurator.setMaximumSplitCount(
        CLASS,
        job,
        maxSplits);
  }

  public static void setQuery(
      final Job job,
      final DistributableQuery query ) {
    GeoWaveInputConfigurator.setQuery(
        CLASS,
        job,
        query);
  }

  protected static DistributableQuery getQuery(
      final JobContext context ) {
    return GeoWaveInputConfigurator.getQuery(
        CLASS,
        context);
  }

  protected static Index[] getIndices(
      final JobContext context ) {
    final Index[] userIndices = JobContextIndexStore.getIndices(context);
    if ((userIndices == null) || (userIndices.length <= 0)) {
      try {
        // if there are no indices, assume we are searching all indices
        // in the metadata store
        return (Index[]) IteratorUtils.toArray(
            new AccumuloIndexStore(
                getAccumuloOperations(context)).getIndices(),
            Index.class);
      }
      catch (AccumuloException | AccumuloSecurityException e) {
        LOGGER.warn(
            "Unable to lookup indices from GeoWave metadata store",
            e);
      }
    }
    return userIndices;
  }

  protected static String getTableNamespace(
      final JobContext context ) {
    return GeoWaveConfiguratorBase.getTableNamespace(
        CLASS,
        context);
  }

  protected static String getUserName(
      final JobContext context ) {
    return GeoWaveConfiguratorBase.getUserName(
        CLASS,
        context);
  }

  protected static String getPassword(
      final JobContext context ) {
    return GeoWaveConfiguratorBase.getPassword(
        CLASS,
        context);
  }

  /**
   * Initializes an Accumulo {@link TabletLocator} based on the configuration.
   *
   * @param instance
   *            the accumulo instance
   * @param tableName
   *            the accumulo table name
   * @return an Accumulo tablet locator
   * @throws TableNotFoundException
   *             if the table name set on the configuration doesn't exist
   * @since 1.5.0
   */
  protected static TabletLocator getTabletLocator(
      final Instance instance,
      final String tableName,
      final String tableId )
      throws TableNotFoundException {
    TabletLocator tabletLocator;
    // @formatter:off
    /*if[ACCUMULO_1.5.1]
    tabletLocator = TabletLocator.getInstance(
        instance,
        new Text(
            Tables.getTableId(
                instance,
                tableName)));

      else[ACCUMULO_1.5.1]*/
    tabletLocator = TabletLocator.getLocator(instance, new Text(tableId));
    /*end[ACCUMULO_1.5.1]*/
    // @formatter:on
    return tabletLocator;
  }

  protected static boolean binRanges(
      final List<Range> rangeList,
      final String userName,
      final String password,
      final Map<String, Map<KeyExtent, List<Range>>> tserverBinnedRanges,
      final TabletLocator tabletLocator,
      final String instanceId )
      throws AccumuloException,
      AccumuloSecurityException,
      TableNotFoundException,
      IOException {
    // @formatter:off
    /*if[ACCUMULO_1.5.1]
    final ByteArrayOutputStream backingByteArray = new ByteArrayOutputStream();
    final DataOutputStream output = new DataOutputStream(
        backingByteArray);
    new PasswordToken(
        password).write(output);
    output.close();
    final ByteBuffer buffer = ByteBuffer.wrap(backingByteArray.toByteArray());
    final TCredentials credentials = new TCredentials(
        userName,
        PasswordToken.class.getCanonicalName(),
        buffer,
        instanceId);
    return tabletLocator.binRanges(
        rangeList,
        tserverBinnedRanges,
        credentials).isEmpty();
      else[ACCUMULO_1.5.1]*/
    return tabletLocator.binRanges(
        new Credentials(
            userName,
            new PasswordToken(
                password)),
        rangeList,
        tserverBinnedRanges).isEmpty();
      /*end[ACCUMULO_1.5.1]*/
    // @formatter:on
  }

  protected static String getInstanceName(
      final JobContext context ) {
    return GeoWaveConfiguratorBase.getInstanceName(
        CLASS,
        context);
  }

  protected static Integer getMinimumSplitCount(
      final JobContext context ) {
    return GeoWaveInputConfigurator.getMinimumSplitCount(
        CLASS,
        context);
  }

  protected static Integer getMaximumSplitCount(
      final JobContext context ) {
    return GeoWaveInputConfigurator.getMaximumSplitCount(
        CLASS,
        context);
  }

  protected static Instance getInstance(
      final JobContext context ) {
    return GeoWaveInputConfigurator.getInstance(
        CLASS,
        context);
  }

  /**
   * Read the metadata table to get tablets and match up ranges to them.
   */
  @Override
  public List<InputSplit> getSplits(
      final JobContext context )
      throws IOException,
      InterruptedException {
    LOGGER.setLevel(getLogLevel(context));
    validateOptions(context);
    final Integer minSplits = getMinimumSplitCount(context);
    final Integer maxSplits = getMaximumSplitCount(context);
    final TreeSet<IntermediateSplitInfo> splits = getIntermediateSplits(
        context,
        maxSplits);
    // this is an incremental algorithm, it may be better use the target
    // split count to drive it (ie. to get 3 splits this will split 1 large
    // range into two down the middle and then split one of those ranges
    // down the middle to get 3, rather than splitting one range into
    // thirds)
    if ((minSplits != null) && (splits.size() < minSplits)) {
      // set the ranges to at least min splits
      do {
        // remove the highest range, split it into 2 and add both back,
        // increasing the size by 1
        final IntermediateSplitInfo highestSplit = splits.pollLast();
        final IntermediateSplitInfo otherSplit = highestSplit.split();
        splits.add(highestSplit);
        splits.add(otherSplit);
      }
      while (splits.size() < minSplits);
    }
    else if (((maxSplits != null) && (maxSplits > 0)) && (splits.size() > maxSplits)) {
      // merge splits to fit within max splits
      do {
        // this is the naive approach, remove the lowest two ranges and
        // merge them, decreasing the size by 1

        // TODO Ideally merge takes into account locations (as well as
        // possibly the index as a secondary criteria) to limit the
        // number of locations/indices
        final IntermediateSplitInfo lowestSplit = splits.pollFirst();
        final IntermediateSplitInfo nextLowestSplit = splits.pollFirst();
        lowestSplit.merge(nextLowestSplit);
        splits.add(lowestSplit);
      }
      while (splits.size() > maxSplits);
    }
    final List<InputSplit> retVal = new ArrayList<InputSplit>();
    for (final IntermediateSplitInfo split : splits) {
      retVal.add(split.toFinalSplit());
    }

    return retVal;
  }

  private TreeSet<IntermediateSplitInfo> getIntermediateSplits(
      final JobContext context,
      final Integer maxSplits )
      throws IOException {
    final Index[] indices = getIndices(context);
    final DistributableQuery query = getQuery(context);
    final String tableNamespace = getTableNamespace(context);

    final TreeSet<IntermediateSplitInfo> splits = new TreeSet<IntermediateSplitInfo>();
    for (final Index index : indices) {
      if ((query != null) && !query.isSupported(index)) {
        continue;
      }
      final String tableName = AccumuloUtils.getQualifiedTableName(
          tableNamespace,
          index.getId().getString());
      final NumericIndexStrategy indexStrategy = index.getIndexStrategy();
      final TreeSet<Range> ranges;
      if (query != null) {
        final MultiDimensionalNumericData indexConstraints = query.getIndexConstraints(indexStrategy);
        if ((maxSplits != null) && (maxSplits > 0)) {
          ranges = AccumuloUtils.byteArrayRangesToAccumuloRanges(AccumuloUtils.constraintsToByteArrayRanges(
              indexConstraints,
              indexStrategy,
              maxSplits));
        }
        else {
          ranges = AccumuloUtils.byteArrayRangesToAccumuloRanges(AccumuloUtils.constraintsToByteArrayRanges(
              indexConstraints,
              indexStrategy));
        }
      }
      else {
        ranges = new TreeSet<Range>();
        ranges.add(new Range());
      }
      // get the metadata information for these ranges
      final Map<String, Map<KeyExtent, List<Range>>> tserverBinnedRanges = new HashMap<String, Map<KeyExtent, List<Range>>>();
      TabletLocator tl;
      try {

        final Instance instance = getInstance(context);
        final String tableId = Tables.getTableId(
            instance,
            tableName);
        tl = getTabletLocator(
            instance,
            tableName,
            tableId);
        // its possible that the cache could contain complete, but
        // old information about a tables tablets... so clear it
        tl.invalidateCache();
        final String instanceId = instance.getInstanceID();
        final List<Range> rangeList = new ArrayList<Range>(
            ranges);
        int count = 0;
        while (!binRanges(
            rangeList,
            getUserName(context),
            getPassword(context),
            tserverBinnedRanges,
            tl,
            instanceId) && (count < 8)) {
          if (!(instance instanceof MockInstance)) {
            if (!Tables.exists(
                instance,
                tableId)) {
              throw new TableDeletedException(
                  tableId);
            }
            if (Tables.getTableState(
                instance,
                tableId) == TableState.OFFLINE) {
              throw new TableOfflineException(
                  instance,
                  tableId);
            }
          }
          count++;
          tserverBinnedRanges.clear();
          LOGGER.warn("Unable to locate bins for specified ranges. Retrying.");
          UtilWaitThread.sleep(100 + (int) (Math.random() * 100));
          // sleep randomly between 100 and 200 ms
          tl.invalidateCache();
        }
      }
      catch (final Exception e) {
        throw new IOException(
            e);
      }
      final HashMap<String, String> hostNameCache = new HashMap<String, String>();
      for (final Entry<String, Map<KeyExtent, List<Range>>> tserverBin : tserverBinnedRanges.entrySet()) {
        final String tabletServer = tserverBin.getKey();
        final String ipAddress = tabletServer.split(
            ":",
            2)[0];

        String location = hostNameCache.get(ipAddress);
        if (location == null) {
          final InetAddress inetAddress = InetAddress.getByName(ipAddress);
          location = inetAddress.getHostName();
          hostNameCache.put(
              ipAddress,
              location);
        }
        for (final Entry<KeyExtent, List<Range>> extentRanges : tserverBin.getValue().entrySet()) {
          final Range keyExtent = extentRanges.getKey().toDataRange();
          final Map<Index, List<RangeLocationPair>> splitInfo = new HashMap<Index, List<RangeLocationPair>>();
          final List<RangeLocationPair> rangeList = new ArrayList<RangeLocationPair>();
          for (final Range range : extentRanges.getValue()) {
            rangeList.add(new RangeLocationPair(
                keyExtent.clip(range),
                location));
          }
          splitInfo.put(
              index,
              rangeList);
          splits.add(new IntermediateSplitInfo(
              splitInfo));
        }
      }
    }
    return splits;
  }

  protected static class IntermediateSplitInfo implements
      Comparable<IntermediateSplitInfo>
  {
    protected static class IndexRangeLocation
    {
      private final RangeLocationPair rangeLocationPair;
      private final Index index;

      public IndexRangeLocation(
          final RangeLocationPair rangeLocationPair,
          final Index index ) {
        this.rangeLocationPair = rangeLocationPair;
        this.index = index;
      }
    }

    protected static class RangeLocationPair
    {
      private final Range range;
      private final String location;
      private final Map<Integer, BigInteger> rangePerCardinalityCache = new HashMap<Integer, BigInteger>();

      public RangeLocationPair(
          final Range range,
          final String location ) {
        this.location = location;
        this.range = range;
      }

      protected BigInteger getRangeAtCardinality(
          final int cardinality ) {
        final BigInteger rangeAtCardinality = rangePerCardinalityCache.get(cardinality);
        if (rangeAtCardinality != null) {
          return rangeAtCardinality;
        }
        return calcRange(cardinality);

      }

      private BigInteger calcRange(
          final int cardinality ) {
        final BigInteger r = getRange(
            range,
            cardinality);
        rangePerCardinalityCache.put(
            cardinality,
            r);
        return r;
      }
    }

    private final Map<Index, List<RangeLocationPair>> splitInfo;
    private final Map<Integer, BigInteger> totalRangePerCardinalityCache = new HashMap<Integer, BigInteger>();

    public IntermediateSplitInfo(
        final Map<Index, List<RangeLocationPair>> splitInfo ) {
      this.splitInfo = splitInfo;
    }

    private synchronized void merge(
        final IntermediateSplitInfo split ) {
      clearCache();
      for (final Entry<Index, List<RangeLocationPair>> e : split.splitInfo.entrySet()) {
        List<RangeLocationPair> thisList = splitInfo.get(e.getKey());
        if (thisList == null) {
          thisList = new ArrayList<RangeLocationPair>();
          splitInfo.put(
              e.getKey(),
              thisList);
        }
        thisList.addAll(e.getValue());
      }
    }

    private synchronized IntermediateSplitInfo split() {
      final int maxCardinality = getMaxCardinality();
      final BigInteger totalRange = getTotalRangeAtCardinality(maxCardinality);

      // generically you'd want the split to be as limiting to total
      // locations as possible and then as limiting as possible to total
      // indices, but in this case split() is only called when all ranges
      // are in the same location and the same index

      // and you want it to split the ranges into two by total range
      final TreeSet<IndexRangeLocation> orderedSplits = new TreeSet<IndexRangeLocation>(
          new Comparator<IndexRangeLocation>() {

            @Override
            public int compare(
                final IndexRangeLocation o1,
                final IndexRangeLocation o2 ) {
              final BigInteger range1 = o1.rangeLocationPair.getRangeAtCardinality(maxCardinality);
              final BigInteger range2 = o2.rangeLocationPair.getRangeAtCardinality(maxCardinality);
              int retVal = range1.compareTo(range2);
              if (retVal == 0) {
                // we really want to avoid equality because
                retVal = Long.compare(
                    o1.hashCode(),
                    o2.hashCode());
                if (retVal == 0) {
                  // what the heck, give it one last insurance
                  // that they're not equal even though its
                  // extremely unlikely
                  retVal = Long.compare(
                      o1.rangeLocationPair.rangePerCardinalityCache.hashCode(),
                      o2.rangeLocationPair.rangePerCardinalityCache.hashCode());
                }
              }
              return retVal;
            }
          });
      for (final Entry<Index, List<RangeLocationPair>> ranges : splitInfo.entrySet()) {
        for (final RangeLocationPair p : ranges.getValue()) {
          orderedSplits.add(new IndexRangeLocation(
              p,
              ranges.getKey()));
        }
      }
      IndexRangeLocation pairToSplit;
      BigInteger targetRange = totalRange.divide(TWO);
      final Map<Index, List<RangeLocationPair>> otherSplitInfo = new HashMap<Index, List<RangeLocationPair>>();
      do {
        // this will get the least value at or above the target range
        final BigInteger compareRange = targetRange;
        pairToSplit = orderedSplits.ceiling(new IndexRangeLocation(
            new RangeLocationPair(
                null,
                null) {

              @Override
              protected BigInteger getRangeAtCardinality(
                  final int cardinality ) {
                return compareRange;
              }

            },
            null));
        // there are no elements greater than the target, so take the
        // largest element and adjust the target
        if (pairToSplit == null) {
          final IndexRangeLocation highestRange = orderedSplits.pollLast();
          List<RangeLocationPair> rangeList = otherSplitInfo.get(highestRange.index);
          if (rangeList == null) {
            rangeList = new ArrayList<RangeLocationPair>();
            otherSplitInfo.put(
                highestRange.index,
                rangeList);
          }
          rangeList.add(highestRange.rangeLocationPair);
          targetRange = targetRange.subtract(highestRange.rangeLocationPair.getRangeAtCardinality(maxCardinality));
        }
      }
      while ((pairToSplit == null) && !orderedSplits.isEmpty());

      if (pairToSplit == null) {
        // this should never happen!
        LOGGER.error("Unable to identify splits");
        // but if it does, just take the first range off of this and
        // split it in half if this is left as empty
        clearCache();
        return splitSingleRange(maxCardinality);
      }

      // now we just carve the pair to split by the amount we are over
      // the target range
      final BigInteger currentRange = pairToSplit.rangeLocationPair.getRangeAtCardinality(maxCardinality);
      final BigInteger rangeExceeded = currentRange.subtract(targetRange);
      if (rangeExceeded.compareTo(BigInteger.ZERO) > 0) {
        // remove pair to split from ordered splits and split it to
        // attempt to match the target range, adding the appropriate
        // sides of the range to this info's ordered splits and the
        // other's splits
        orderedSplits.remove(pairToSplit);
        final BigInteger end = getEnd(
            pairToSplit.rangeLocationPair.range,
            maxCardinality);
        final byte[] splitKey = getKeyFromBigInteger(
            end.subtract(rangeExceeded),
            maxCardinality);
        List<RangeLocationPair> rangeList = otherSplitInfo.get(pairToSplit.index);
        if (rangeList == null) {
          rangeList = new ArrayList<RangeLocationPair>();
          otherSplitInfo.put(
              pairToSplit.index,
              rangeList);
        }
        rangeList.add(new RangeLocationPair(
            new Range(
                pairToSplit.rangeLocationPair.range.getStartKey(),
                pairToSplit.rangeLocationPair.range.isStartKeyInclusive(),
                new Key(
                    new Text(
                        splitKey)),
                false),
            pairToSplit.rangeLocationPair.location));
        orderedSplits.add(new IndexRangeLocation(
            new RangeLocationPair(
                new Range(
                    new Key(
                        new Text(
                            splitKey)),
                    true,
                    pairToSplit.rangeLocationPair.range.getEndKey(),
                    pairToSplit.rangeLocationPair.range.isEndKeyInclusive()),
                pairToSplit.rangeLocationPair.location),
            pairToSplit.index));
      }
      else if (orderedSplits.size() > 1) {
        // add pair to split to other split and remove it from
        // orderedSplits
        orderedSplits.remove(pairToSplit);
        List<RangeLocationPair> rangeList = otherSplitInfo.get(pairToSplit.index);
        if (rangeList == null) {
          rangeList = new ArrayList<RangeLocationPair>();
          otherSplitInfo.put(
              pairToSplit.index,
              rangeList);
        }
        rangeList.add(pairToSplit.rangeLocationPair);
      }

      // clear splitinfo and set it to ordered splits (what is left of the
      // splits that haven't been placed in the other split info)
      splitInfo.clear();
      for (final IndexRangeLocation split : orderedSplits) {
        List<RangeLocationPair> rangeList = splitInfo.get(split.index);
        if (rangeList == null) {
          rangeList = new ArrayList<RangeLocationPair>();
          splitInfo.put(
              split.index,
              rangeList);
        }
        rangeList.add(split.rangeLocationPair);
      }
      clearCache();
      return new IntermediateSplitInfo(
          otherSplitInfo);
    }

    private IntermediateSplitInfo splitSingleRange(
        final int maxCardinality ) {
      final Map<Index, List<RangeLocationPair>> otherSplitInfo = new HashMap<Index, List<RangeLocationPair>>();
      final List<RangeLocationPair> otherRangeList = new ArrayList<RangeLocationPair>();
      final Iterator<Entry<Index, List<RangeLocationPair>>> it = splitInfo.entrySet().iterator();
      while (it.hasNext()) {
        final Entry<Index, List<RangeLocationPair>> e = it.next();
        final List<RangeLocationPair> rangeList = e.getValue();
        if (!rangeList.isEmpty()) {
          final RangeLocationPair p = rangeList.remove(0);
          if (rangeList.isEmpty()) {
            if (!it.hasNext()) {
              // if this is empty now, divide the split in
              // half
              final BigInteger range = p.getRangeAtCardinality(maxCardinality);
              final BigInteger start = getStart(
                  p.range,
                  maxCardinality);
              final byte[] splitKey = getKeyFromBigInteger(
                  start.add(range.divide(TWO)),
                  maxCardinality);
              rangeList.add(new RangeLocationPair(
                  new Range(
                      p.range.getStartKey(),
                      p.range.isStartKeyInclusive(),
                      new Key(
                          new Text(
                              splitKey)),
                      false),
                  p.location));
              otherRangeList.add(new RangeLocationPair(
                  new Range(
                      new Key(
                          new Text(
                              splitKey)),
                      true,
                      p.range.getEndKey(),
                      p.range.isEndKeyInclusive()),
                  p.location));
              otherSplitInfo.put(
                  e.getKey(),
                  otherRangeList);
              return new IntermediateSplitInfo(
                  otherSplitInfo);
            }
            else {
              // otherwise remove this entry
              it.remove();
            }
          }
          otherRangeList.add(p);
          otherSplitInfo.put(
              e.getKey(),
              otherRangeList);
          return new IntermediateSplitInfo(
              otherSplitInfo);
        }
      }
      // this can only mean there are no ranges
      LOGGER.error("Attempting to split ranges on empty range");
      return new IntermediateSplitInfo(
          otherSplitInfo);
    }

    private GeoWaveInputSplit toFinalSplit() {
      final Map<Index, List<Range>> rangesPerIndex = new HashMap<Index, List<Range>>();
      final Set<String> locations = new HashSet<String>();
      for (final Entry<Index, List<RangeLocationPair>> entry : splitInfo.entrySet()) {
        final List<Range> ranges = new ArrayList<Range>(
            entry.getValue().size());
        for (final RangeLocationPair pair : entry.getValue()) {
          locations.add(pair.location);
          ranges.add(pair.range);
        }
        rangesPerIndex.put(
            entry.getKey(),
            ranges);
      }
      return new GeoWaveInputSplit(
          rangesPerIndex,
          locations.toArray(new String[] {}));
    }

    private int getMaxCardinality() {
      int maxCardinality = 1;
      for (final List<RangeLocationPair> pList : splitInfo.values()) {
        for (final RangeLocationPair p : pList) {
          maxCardinality = Math.max(
              maxCardinality,
              getMaxCardinalityFromRange(p.range));
        }
      }
      return maxCardinality;
    }

    @Override
    public int compareTo(
        final IntermediateSplitInfo o ) {
      final int maxCardinality = Math.max(
          getMaxCardinality(),
          o.getMaxCardinality());
      final BigInteger thisTotal = getTotalRangeAtCardinality(maxCardinality);
      final BigInteger otherTotal = o.getTotalRangeAtCardinality(maxCardinality);
      int retVal = thisTotal.compareTo(otherTotal);
      if (retVal == 0) {
        // because this is used by the treeset, we really want to avoid
        // equality
        retVal = Long.compare(
            hashCode(),
            o.hashCode());
        // what the heck, give it one last insurance
        // that they're not equal even though its
        // extremely unlikely
        if (retVal == 0) {
          retVal = Long.compare(
              totalRangePerCardinalityCache.hashCode(),
              o.totalRangePerCardinalityCache.hashCode());
        }
      }
      return retVal;
    }

    private synchronized BigInteger getTotalRangeAtCardinality(
        final int cardinality ) {
      final BigInteger totalRange = totalRangePerCardinalityCache.get(cardinality);
      if (totalRange != null) {
        return totalRange;
      }
      return calculateTotalRangeForCardinality(cardinality);
    }

    private synchronized BigInteger calculateTotalRangeForCardinality(
        final int cardinality ) {
      BigInteger sum = BigInteger.ZERO;
      for (final List<RangeLocationPair> pairList : splitInfo.values()) {
        for (final RangeLocationPair pair : pairList) {
          sum = sum.add(pair.getRangeAtCardinality(cardinality));
        }
      }
      totalRangePerCardinalityCache.put(
          cardinality,
          sum);
      return sum;
    }

    private synchronized void clearCache() {
      totalRangePerCardinalityCache.clear();
    }
  }

  protected static int getMaxCardinalityFromRange(
      final Range range ) {
    int maxCardinality = 0;
    final Key start = range.getStartKey();
    if (start != null) {
      maxCardinality = Math.max(
          maxCardinality,
          start.getRowData().length());
    }
    final Key end = range.getEndKey();
    if (end != null) {
      maxCardinality = Math.max(
          maxCardinality,
          end.getRowData().length());
    }
    return maxCardinality;
  }

  protected static byte[] getKeyFromBigInteger(
      final BigInteger value,
      final int numBytes ) {
    final byte[] valueBytes = value.toByteArray();
    final byte[] bytes = new byte[numBytes];
    for (int i = 0; i < numBytes; i++) {
      // start from the right
      if (i < valueBytes.length) {
        bytes[bytes.length - i - 1] = valueBytes[valueBytes.length - i - 1];
      }
      else {
        // prepend anything outside of the BigInteger value with 0
        bytes[bytes.length - i - 1] = 0;
      }
    }
    return bytes;
  }

  protected static byte[] extractBytes(
      final ByteSequence seq,
      final int numBytes ) {
    return extractBytes(
        seq,
        numBytes,
        false);
  }

  protected static byte[] extractBytes(
      final ByteSequence seq,
      final int numBytes,
      final boolean infiniteEndKey ) {
    final byte[] bytes = new byte[numBytes + 2];
    bytes[0] = 1;
    bytes[1] = 0;
    for (int i = 0; i < numBytes; i++) {
      if (i >= seq.length()) {
        if (infiniteEndKey) {
          // -1 is 0xff
          bytes[i + 2] = -1;
        }
        else {
          bytes[i + 2] = 0;
        }
      }
      else {
        bytes[i + 2] = seq.byteAt(i);
      }
    }
    return bytes;
  }

  protected static BigInteger getRange(
      final Range range,
      final int cardinality ) {
    return getEnd(
        range,
        cardinality).subtract(
        getStart(
            range,
            cardinality));
  }

  protected static BigInteger getStart(
      final Range range,
      final int cardinality ) {
    final Key start = range.getStartKey();
    byte[] startBytes;
    if (!range.isInfiniteStartKey() && (start != null)) {
      startBytes = extractBytes(
          start.getRowData(),
          cardinality);
    }
    else {
      startBytes = extractBytes(
          new ArrayByteSequence(
              new byte[] {}),
          cardinality);
    }
    return new BigInteger(
        startBytes);
  }

  protected static BigInteger getEnd(
      final Range range,
      final int cardinality ) {
    final Key end = range.getEndKey();
    byte[] endBytes;
    if (!range.isInfiniteStopKey() && (end != null)) {
      endBytes = extractBytes(
          end.getRowData(),
          cardinality);
    }
    else {
      endBytes = extractBytes(
          new ArrayByteSequence(
              new byte[] {}),
          cardinality,
          true);
    }

    return new BigInteger(
        endBytes);
  }

  @Override
  public RecordReader<GeoWaveInputKey, Object> createRecordReader(
      final InputSplit split,
      final TaskAttemptContext context )
      throws IOException,
      InterruptedException {
    LOGGER.setLevel(getLogLevel(context));
    return new GeoWaveRecordReader<Object>();
  }

  /**
   * Sets the log level for this job.
   *
   * @param job
   *            the Hadoop job instance to be configured
   * @param level
   *            the logging level
   * @since 1.5.0
   */
  public static void setLogLevel(
      final Job job,
      final Level level ) {
    ConfiguratorBase.setLogLevel(
        CLASS,
        job.getConfiguration(),
        level);
  }

  /**
   * Gets the log level from this configuration.
   *
   * @param context
   *            the Hadoop context for the configured job
   * @return the log level
   * @since 1.5.0
   * @see #setLogLevel(Job, Level)
   */
  protected static Level getLogLevel(
      final JobContext context ) {
    return ConfiguratorBase.getLogLevel(
        CLASS,
        GeoWaveConfiguratorBase.getConfiguration(context));
  }

  /**
   * Check whether a configuration is fully configured to be used with an
   * Accumulo {@link org.apache.hadoop.mapreduce.InputFormat}.
   *
   * @param context
   *            the Hadoop context for the configured job
   * @throws IOException
   *             if the context is improperly configured
   * @since 1.5.0
   */
  protected static void validateOptions(
      final JobContext context )
      throws IOException {
    // the only required element is the AccumuloOperations info
    try {
      // this should attempt to use the connection info to successfully
      // connect
      if (getAccumuloOperations(context) == null) {
        LOGGER.warn("Zookeeper connection for accumulo is null");
        throw new IOException(
            "Zookeeper connection for accumulo is null");
      }
    }
    catch (final AccumuloException e) {
      LOGGER.warn(
          "Error establishing zookeeper connection for accumulo",
          e);
      throw new IOException(
          e);
    }
    catch (final AccumuloSecurityException e) {
      LOGGER.warn(
          "Security error while establishing connection to accumulo",
          e);
      throw new IOException(
          e);
    }
  }

  public static AccumuloOperations getAccumuloOperations(
      final JobContext context )
      throws AccumuloException,
      AccumuloSecurityException {
    return GeoWaveConfiguratorBase.getAccumuloOperations(
        CLASS,
        context);
  }

  protected static String[] getAuthorizations(
      final JobContext context )
      throws AccumuloException,
      AccumuloSecurityException {
    return GeoWaveInputConfigurator.getAuthorizations(
        CLASS,
        context);
  }

  protected static JobContextAdapterStore getDataAdapterStore(
      final JobContext context,
      final AccumuloOperations accumuloOperations ) {
    return new JobContextAdapterStore(
        context,
        accumuloOperations);
  }
}
TOP

Related Classes of mil.nga.giat.geowave.accumulo.mapreduce.input.GeoWaveInputFormat$IntermediateSplitInfo$IndexRangeLocation

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.