Package org.apache.hadoop.hive.ql.exec.vector

Source Code of org.apache.hadoop.hive.ql.exec.vector.VectorReduceSinkOperator

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hive.ql.exec.vector;

import java.io.IOException;
import java.util.Random;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.exec.ReduceSinkOperator;
import org.apache.hadoop.hive.ql.exec.TopNHash;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpression;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriter;
import org.apache.hadoop.hive.ql.exec.vector.expressions.VectorExpressionWriterFactory;
import org.apache.hadoop.hive.ql.io.HiveKey;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.plan.OperatorDesc;
import org.apache.hadoop.hive.ql.plan.ReduceSinkDesc;
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.Serializer;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
// import org.apache.hadoop.util.StringUtils;

public class VectorReduceSinkOperator extends ReduceSinkOperator {

  private static final Log LOG = LogFactory.getLog(
      VectorReduceSinkOperator.class.getName());

  private static final long serialVersionUID = 1L;

  /**
   * The evaluators for the key columns. Key columns decide the sort order on
   * the reducer side. Key columns are passed to the reducer in the "key".
   */
  private VectorExpression[] keyEval;

  /**
   * The key value writers. These know how to write the necessary writable type
   * based on key column metadata, from the primitive vector type.
   */
  private transient VectorExpressionWriter[] keyWriters;

  /**
   * The evaluators for the value columns. Value columns are passed to reducer
   * in the "value".
   */
  private VectorExpression[] valueEval;

  /**
   * The output value writers. These know how to write the necessary writable type
   * based on value column metadata, from the primitive vector type.
   */
  private transient VectorExpressionWriter[] valueWriters;

  /**
   * The evaluators for the partition columns (CLUSTER BY or DISTRIBUTE BY in
   * Hive language). Partition columns decide the reducer that the current row
   * goes to. Partition columns are not passed to reducer.
   */
  private VectorExpression[] partitionEval;

  /**
  * Evaluators for bucketing columns. This is used to compute bucket number.
  */
  private VectorExpression[] bucketEval;
  private int buckColIdxInKey;

  /**
   * The partition value writers. These know how to write the necessary writable type
   * based on partition column metadata, from the primitive vector type.
   */
  private transient VectorExpressionWriter[] partitionWriters;
  private transient VectorExpressionWriter[] bucketWriters = null;

  public VectorReduceSinkOperator(VectorizationContext vContext, OperatorDesc conf)
      throws HiveException {
    this();
    ReduceSinkDesc desc = (ReduceSinkDesc) conf;
    this.conf = desc;
    keyEval = vContext.getVectorExpressions(desc.getKeyCols());
    valueEval = vContext.getVectorExpressions(desc.getValueCols());
    partitionEval = vContext.getVectorExpressions(desc.getPartitionCols());
    bucketEval = null;
    if (desc.getBucketCols() != null && !desc.getBucketCols().isEmpty()) {
      bucketEval = vContext.getVectorExpressions(desc.getBucketCols());
      buckColIdxInKey = desc.getPartitionCols().size();
    }
  }

  public VectorReduceSinkOperator() {
    super();
  }

  @Override
  protected void initializeOp(Configuration hconf) throws HiveException {
    try {
      numDistributionKeys = conf.getNumDistributionKeys();
      distinctColIndices = conf.getDistinctColumnIndices();
      numDistinctExprs = distinctColIndices.size();

      TableDesc keyTableDesc = conf.getKeySerializeInfo();
      keySerializer = (Serializer) keyTableDesc.getDeserializerClass()
          .newInstance();
      keySerializer.initialize(null, keyTableDesc.getProperties());
      keyIsText = keySerializer.getSerializedClass().equals(Text.class);

      /*
       * Compute and assign the key writers and the key object inspector
       */
      VectorExpressionWriterFactory.processVectorExpressions(
          conf.getKeyCols(),
          conf.getOutputKeyColumnNames(),
          new VectorExpressionWriterFactory.SingleOIDClosure() {
            @Override
            public void assign(VectorExpressionWriter[] writers,
              ObjectInspector objectInspector) {
              keyWriters = writers;
              keyObjectInspector = objectInspector;
            }
          });

      String colNames = "";
      for(String colName : conf.getOutputKeyColumnNames()) {
        colNames = String.format("%s %s", colNames, colName);
      }

      LOG.debug(String.format("keyObjectInspector [%s]%s => %s",
          keyObjectInspector.getClass(),
          keyObjectInspector,
          colNames));

      partitionWriters = VectorExpressionWriterFactory.getExpressionWriters(conf.getPartitionCols());
      if (conf.getBucketCols() != null && !conf.getBucketCols().isEmpty()) {
        bucketWriters = VectorExpressionWriterFactory.getExpressionWriters(conf.getBucketCols());
      }

      TableDesc valueTableDesc = conf.getValueSerializeInfo();
      valueSerializer = (Serializer) valueTableDesc.getDeserializerClass()
          .newInstance();
      valueSerializer.initialize(null, valueTableDesc.getProperties());

      /*
       * Compute and assign the value writers and the value object inspector
       */
      VectorExpressionWriterFactory.processVectorExpressions(
          conf.getValueCols(),
          conf.getOutputValueColumnNames(),
          new VectorExpressionWriterFactory.SingleOIDClosure() {
            @Override
            public void assign(VectorExpressionWriter[] writers,
                ObjectInspector objectInspector) {
                valueWriters = writers;
                valueObjectInspector = objectInspector;
              }
          });

      colNames = "";
      for(String colName : conf.getOutputValueColumnNames()) {
        colNames = String.format("%s %s", colNames, colName);
      }

      LOG.debug(String.format("valueObjectInspector [%s]%s => %s",
          valueObjectInspector.getClass(),
          valueObjectInspector,
          colNames));

      int numKeys = numDistinctExprs > 0 ? numDistinctExprs : 1;
      int keyLen = numDistinctExprs > 0 ? numDistributionKeys + 1 :
        numDistributionKeys;
      cachedKeys = new Object[numKeys][keyLen];
      cachedValues = new Object[valueEval.length];

      int tag = conf.getTag();
      tagByte[0] = (byte) tag;
      LOG.info("Using tag = " + tag);

      int limit = conf.getTopN();
      float memUsage = conf.getTopNMemoryUsage();
      if (limit >= 0 && memUsage > 0) {
        reducerHash.initialize(limit, memUsage, conf.isMapGroupBy(), this);
      }
    } catch(Exception e) {
      throw new HiveException(e);
    }
  }

  @Override
  public void processOp(Object row, int tag) throws HiveException {
    VectorizedRowBatch vrg = (VectorizedRowBatch) row;

    LOG.debug(String.format("sinking %d rows, %d values, %d keys, %d parts",
        vrg.size,
        valueEval.length,
        keyEval.length,
        partitionEval.length));

    try {
      // Evaluate the keys
      for (int i = 0; i < keyEval.length; i++) {
        keyEval[i].evaluate(vrg);
      }

      // Determine which rows we need to emit based on topN optimization
      int startResult = reducerHash.startVectorizedBatch(vrg.size);
      if (startResult == TopNHash.EXCLUDE) {
        return; // TopN wants us to exclude all rows.
      }
      // TODO: can we do this later/only for the keys that are needed? E.g. update vrg.selected.
      for (int i = 0; i < partitionEval.length; i++) {
        partitionEval[i].evaluate(vrg);
      }
      if (bucketEval != null) {
        for (int i = 0; i < bucketEval.length; i++) {
          bucketEval[i].evaluate(vrg);
        }
      }
      // run the vector evaluations
      for (int i = 0; i < valueEval.length; i++) {
         valueEval[i].evaluate(vrg);
      }

      boolean useTopN = startResult != TopNHash.FORWARD;
      // Go thru the batch once. If we are not using TopN, we will forward all things and be done.
      // If we are using topN, we will make the first key for each row and store/forward it.
      // Values, hashes and additional distinct rows will be handled in the 2nd pass in that case.
      for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) {
        int rowIndex = batchIndex;
        if (vrg.selectedInUse) {
          rowIndex = vrg.selected[batchIndex];
        }
        // First, make distrib key components for this row and determine distKeyLength.
        populatedCachedDistributionKeys(vrg, rowIndex, 0);

        // replace bucketing columns with hashcode % numBuckets
        int buckNum = 0;
        if (bucketEval != null && bucketEval.length != 0) {
          buckNum = computeBucketNumber(vrg, rowIndex, conf.getNumBuckets());
          cachedKeys[0][buckColIdxInKey] = new IntWritable(buckNum);
        }
        HiveKey firstKey = toHiveKey(cachedKeys[0], tag, null);
        int distKeyLength = firstKey.getDistKeyLength();
        // Add first distinct expression, if any.
        if (numDistinctExprs > 0) {
          populateCachedDistinctKeys(vrg, rowIndex, 0);
          firstKey = toHiveKey(cachedKeys[0], tag, distKeyLength);
        }

        if (useTopN) {
          reducerHash.tryStoreVectorizedKey(firstKey, batchIndex);
        } else {
        // No TopN, just forward the first key and all others.
          int hashCode = 0;
          if (bucketEval != null && bucketEval.length != 0) {
            hashCode = computeHashCode(vrg, rowIndex, buckNum);
          } else {
            hashCode = computeHashCode(vrg, rowIndex);
          }
          firstKey.setHashCode(hashCode);
          BytesWritable value = makeValueWritable(vrg, rowIndex);
          collect(firstKey, value);
          forwardExtraDistinctRows(vrg, rowIndex, hashCode, value, distKeyLength, tag, 0);
        }
      }

      if (!useTopN) return; // All done.

      // If we use topN, we have called tryStore on every key now. We can process the results.
      for (int batchIndex = 0 ; batchIndex < vrg.size; ++batchIndex) {
        int result = reducerHash.getVectorizedBatchResult(batchIndex);
        if (result == TopNHash.EXCLUDE) continue;
        int rowIndex = batchIndex;
        if (vrg.selectedInUse) {
          rowIndex = vrg.selected[batchIndex];
        }
        // Compute value and hashcode - we'd either store or forward them.
        int hashCode = computeHashCode(vrg, rowIndex);
        BytesWritable value = makeValueWritable(vrg, rowIndex);
        int distKeyLength = -1;
        if (result == TopNHash.FORWARD) {
          HiveKey firstKey = reducerHash.getVectorizedKeyToForward(batchIndex);
          firstKey.setHashCode(hashCode);
          distKeyLength = firstKey.getDistKeyLength();
          collect(firstKey, value);
        } else {
          reducerHash.storeValue(result, value, hashCode, true);
          distKeyLength = reducerHash.getVectorizedKeyDistLength(batchIndex);
        }
        // Now forward other the rows if there's multi-distinct (but see TODO in forward...).
        // Unfortunately, that means we will have to rebuild the cachedKeys. Start at 1.
        if (numDistinctExprs > 1) {
          populatedCachedDistributionKeys(vrg, rowIndex, 1);
          forwardExtraDistinctRows(vrg, rowIndex, hashCode, value, distKeyLength, tag, 1);
        }
      }
    } catch (SerDeException e) {
      throw new HiveException(e);
    } catch (IOException e) {
      throw new HiveException(e);
    }
  }

  /**
   * This function creates and forwards all the additional KVs for the multi-distinct case,
   * after the first (0th) KV pertaining to the row has already been stored or forwarded.
   * @param vrg the batch
   * @param rowIndex the row index in the batch
   * @param hashCode the partitioning hash code to use; same as for the first KV
   * @param value the value to use; same as for the first KV
   * @param distKeyLength the distribution key length of the first key; TODO probably extraneous
   * @param tag the tag
   * @param baseIndex the index in cachedKeys where the pre-evaluated distribution keys are stored
   */
  private void forwardExtraDistinctRows(VectorizedRowBatch vrg, int rowIndex,int hashCode,
      BytesWritable value, int distKeyLength, int tag, int baseIndex)
          throws HiveException, SerDeException, IOException {
    // TODO: We don't have to forward extra distinct rows immediately (same in non-vector) if
    //       the first key has already been stored. There's few bytes difference between keys
    //       for different distincts, and the value/etc. are all the same.
    //       We could store deltas to re-gen extra rows when flushing TopN.
    for (int i = 1; i < numDistinctExprs; i++) {
      if (i != baseIndex) {
        System.arraycopy(cachedKeys[baseIndex], 0, cachedKeys[i], 0, numDistributionKeys);
      }
      populateCachedDistinctKeys(vrg, rowIndex, i);
      HiveKey hiveKey = toHiveKey(cachedKeys[i], tag, distKeyLength);
      hiveKey.setHashCode(hashCode);
      collect(hiveKey, value);
    }
  }

  /**
   * Populate distribution keys part of cachedKeys for a particular row from the batch.
   * @param vrg the batch
   * @param rowIndex the row index in the batch
   * @param index the cachedKeys index to write to
   */
  private void populatedCachedDistributionKeys(
      VectorizedRowBatch vrg, int rowIndex, int index) throws HiveException {
    for (int i = 0; i < numDistributionKeys; i++) {
      int batchColumn = keyEval[i].getOutputColumn();
      ColumnVector vectorColumn = vrg.cols[batchColumn];
      cachedKeys[index][i] = keyWriters[i].writeValue(vectorColumn, rowIndex);
    }
    if (cachedKeys[index].length > numDistributionKeys) {
      cachedKeys[index][numDistributionKeys] = null;
    }
  }

  /**
   * Populate distinct keys part of cachedKeys for a particular row from the batch.
   * @param vrg the batch
   * @param rowIndex the row index in the batch
   * @param index the cachedKeys index to write to
   */
  private void populateCachedDistinctKeys(
      VectorizedRowBatch vrg, int rowIndex, int index) throws HiveException {
    StandardUnion union;
    cachedKeys[index][numDistributionKeys] = union = new StandardUnion(
        (byte)index, new Object[distinctColIndices.get(index).size()]);
    Object[] distinctParameters = (Object[]) union.getObject();
    for (int distinctParamI = 0; distinctParamI < distinctParameters.length; distinctParamI++) {
      int distinctColIndex = distinctColIndices.get(index).get(distinctParamI);
      int batchColumn = keyEval[distinctColIndex].getOutputColumn();
      distinctParameters[distinctParamI] =
          keyWriters[distinctColIndex].writeValue(vrg.cols[batchColumn], rowIndex);
    }
    union.setTag((byte) index);
  }

  private BytesWritable makeValueWritable(VectorizedRowBatch vrg, int rowIndex)
      throws HiveException, SerDeException {
    for (int i = 0; i < valueEval.length; i++) {
      int batchColumn = valueEval[i].getOutputColumn();
      ColumnVector vectorColumn = vrg.cols[batchColumn];
      cachedValues[i] = valueWriters[i].writeValue(vectorColumn, rowIndex);
    }
    // Serialize the value
    return (BytesWritable)valueSerializer.serialize(cachedValues, valueObjectInspector);
  }

  private int computeHashCode(VectorizedRowBatch vrg, int rowIndex) throws HiveException {
    // Evaluate the HashCode
    int keyHashCode = 0;
    if (partitionEval.length == 0) {
      // If no partition cols, just distribute the data uniformly to provide better
      // load balance. If the requirement is to have a single reducer, we should set
      // the number of reducers to 1.
      // Use a constant seed to make the code deterministic.
      if (random == null) {
        random = new Random(12345);
      }
      keyHashCode = random.nextInt();
    } else {
      for (int p = 0; p < partitionEval.length; p++) {
        ColumnVector columnVector = vrg.cols[partitionEval[p].getOutputColumn()];
        Object partitionValue = partitionWriters[p].writeValue(columnVector, rowIndex);
        keyHashCode = keyHashCode
            * 31
            + ObjectInspectorUtils.hashCode(
                partitionValue,
                partitionWriters[p].getObjectInspector());
      }
    }
    return keyHashCode;
  }

  private int computeHashCode(VectorizedRowBatch vrg, int rowIndex, int buckNum) throws HiveException {
    int keyHashCode = computeHashCode(vrg, rowIndex);
    keyHashCode = keyHashCode * 31 + buckNum;
    return keyHashCode;
  }

  private int computeBucketNumber(VectorizedRowBatch vrg, int rowIndex, int numBuckets) throws HiveException {
    int bucketNum = 0;
    for (int p = 0; p < bucketEval.length; p++) {
      ColumnVector columnVector = vrg.cols[bucketEval[p].getOutputColumn()];
      Object bucketValue = bucketWriters[p].writeValue(columnVector, rowIndex);
      bucketNum = bucketNum
          * 31
          + ObjectInspectorUtils.hashCode(
              bucketValue,
              bucketWriters[p].getObjectInspector());
    }

    if (bucketNum < 0) {
      bucketNum = -1 * bucketNum;
    }

    return bucketNum % numBuckets;
  }

  static public String getOperatorName() {
    return "RS";
  }

  public VectorExpression[] getPartitionEval() {
    return partitionEval;
  }

  public void setPartitionEval(VectorExpression[] partitionEval) {
    this.partitionEval = partitionEval;
  }

  public VectorExpression[] getValueEval() {
    return valueEval;
  }

  public void setValueEval(VectorExpression[] valueEval) {
    this.valueEval = valueEval;
  }

  public VectorExpression[] getKeyEval() {
    return keyEval;
  }

  public void setKeyEval(VectorExpression[] keyEval) {
    this.keyEval = keyEval;
  }
}
TOP

Related Classes of org.apache.hadoop.hive.ql.exec.vector.VectorReduceSinkOperator

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.