Package org.apache.drill.exec.physical.impl.join

Source Code of org.apache.drill.exec.physical.impl.join.HashJoinBatch

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.impl.join;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.logical.data.JoinCondition;
import org.apache.drill.common.logical.data.NamedExpression;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.Types;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.compile.sig.GeneratorMapping;
import org.apache.drill.exec.compile.sig.MappingSet;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.SchemaChangeException;
import org.apache.drill.exec.expr.ClassGenerator;
import org.apache.drill.exec.expr.CodeGenerator;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.expr.holders.IntHolder;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.ops.MetricDef;
import org.apache.drill.exec.physical.config.HashJoinPOP;
import org.apache.drill.exec.physical.impl.common.ChainedHashTable;
import org.apache.drill.exec.physical.impl.common.HashTable;
import org.apache.drill.exec.physical.impl.common.HashTableConfig;
import org.apache.drill.exec.physical.impl.common.HashTableStats;
import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch;
import org.apache.drill.exec.record.AbstractRecordBatch;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.ExpandableHyperContainer;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.allocator.VectorAllocator;
import org.eigenbase.rel.JoinRelType;

import com.sun.codemodel.JExpr;
import com.sun.codemodel.JExpression;
import com.sun.codemodel.JVar;

public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> {

  public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
  public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;

    // Probe side record batch
    private final RecordBatch left;

    // Build side record batch
    private final RecordBatch right;

    // Join type, INNER, LEFT, RIGHT or OUTER
    private final JoinRelType joinType;

    // Join conditions
    private final List<JoinCondition> conditions;

    // Runtime generated class implementing HashJoinProbe interface
    private HashJoinProbe hashJoinProbe = null;

    /* Helper class
     * Maintains linked list of build side records with the same key
     * Keeps information about which build records have a corresponding
     * matching key in the probe side (for outer, right joins)
     */
    private HashJoinHelper hjHelper = null;

    // Underlying hashtable used by the hash join
    private HashTable hashTable = null;

    /* Hyper container to store all build side record batches.
     * Records are retrieved from this container when there is a matching record
     * on the probe side
     */
    private ExpandableHyperContainer hyperContainer;

    // Number of records in the output container
    private int outputRecords;

    // Current batch index on the build side
    private int buildBatchIndex = 0;

    // Schema of the build side
    private BatchSchema rightSchema = null;

    private boolean first = true;

    private boolean done = false;

    // Generator mapping for the build side
    // Generator mapping for the build side : scalar
    private static final GeneratorMapping PROJECT_BUILD = GeneratorMapping.create("doSetup"/* setup method */,
                                                                                  "projectBuildRecord" /* eval method */,
                                                                                  null /* reset */, null /* cleanup */);
    // Generator mapping for the build side : constant
    private static final GeneratorMapping PROJECT_BUILD_CONSTANT = GeneratorMapping.create("doSetup"/* setup method */,
                                                                                  "doSetup" /* eval method */,
                                                                                   null /* reset */, null /* cleanup */);

    // Generator mapping for the probe side : scalar
    private static final GeneratorMapping PROJECT_PROBE = GeneratorMapping.create("doSetup" /* setup method */,
                                                                                  "projectProbeRecord" /* eval method */,
                                                                                  null /* reset */, null /* cleanup */);
    // Generator mapping for the probe side : constant
    private static final GeneratorMapping PROJECT_PROBE_CONSTANT = GeneratorMapping.create("doSetup" /* setup method */,
                                                                                  "doSetup" /* eval method */,
                                                                                  null /* reset */, null /* cleanup */);


    // Mapping set for the build side
    private final MappingSet projectBuildMapping = new MappingSet("buildIndex" /* read index */, "outIndex" /* write index */,
                                                                  "buildBatch" /* read container */,
                                                                  "outgoing" /* write container */,
                                                                  PROJECT_BUILD_CONSTANT, PROJECT_BUILD);

    // Mapping set for the probe side
    private final MappingSet projectProbeMapping = new MappingSet("probeIndex" /* read index */, "outIndex" /* write index */,
                                                                  "probeBatch" /* read container */,
                                                                  "outgoing" /* write container */,
                                                                  PROJECT_PROBE_CONSTANT, PROJECT_PROBE);

    // indicates if we have previously returned an output batch
    boolean firstOutputBatch = true;

    IterOutcome leftUpstream = IterOutcome.NONE;

    private final HashTableStats htStats = new HashTableStats();

    public enum Metric implements MetricDef {

      NUM_BUCKETS,
      NUM_ENTRIES,
      NUM_RESIZING,
      RESIZING_TIME;
     
      // duplicate for hash ag

      @Override
      public int metricId() {
        return ordinal();
      }
    }
   
    @Override
    public int getRecordCount() {
        return outputRecords;
    }


    @Override
    public IterOutcome innerNext() {
        if (done) {
          return IterOutcome.NONE;
        }
        try {
            /* If we are here for the first time, execute the build phase of the
             * hash join and setup the run time generated class for the probe side
             */
            if (hashJoinProbe == null) {

                // Initialize the hash join helper context
                hjHelper = new HashJoinHelper(context, oContext.getAllocator());

                /* Build phase requires setting up the hash table. Hash table will
                 * materialize both the build and probe side expressions while
                 * creating the hash table. So we need to invoke next() on our probe batch
                 * as well, for the materialization to be successful. This batch will not be used
                 * till we complete the build phase.
                 */
                leftUpstream = next(HashJoinHelper.LEFT_INPUT, left);

                // Build the hash table, using the build side record batches.
                executeBuildPhase();

                // Update the hash table related stats for the operator
                updateStats(this.hashTable);

                // Create the run time generated code needed to probe and project
                hashJoinProbe = setupHashJoinProbe();
            }

            // Store the number of records projected
            if (hashTable != null
                || joinType != JoinRelType.INNER) {

                // Allocate the memory for the vectors in the output container
                allocateVectors();

                outputRecords = hashJoinProbe.probeAndProject();

                /* We are here because of one the following
                 * 1. Completed processing of all the records and we are done
                 * 2. We've filled up the outgoing batch to the maximum and we need to return upstream
                 * Either case build the output container's schema and return
                 */
                if (outputRecords > 0 || first) {
                  first = false;

                  // Build the container schema and set the counts
                  container.buildSchema(BatchSchema.SelectionVectorMode.NONE);
                  container.setRecordCount(outputRecords);

                  for (VectorWrapper<?> v : container) {
                    v.getValueVector().getMutator().setValueCount(outputRecords);
                  }

                  // First output batch, return OK_NEW_SCHEMA
                  if (firstOutputBatch == true) {
                    firstOutputBatch = false;
                    return IterOutcome.OK_NEW_SCHEMA;
                  }

                  // Not the first output batch
                  return IterOutcome.OK;
                }
            } else {
                // Our build side is empty, we won't have any matches, clear the probe side
                if (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
                    for (VectorWrapper<?> wrapper : left) {
                      wrapper.getValueVector().clear();
                    }
                    left.kill(true);
                    leftUpstream = next(HashJoinHelper.LEFT_INPUT, left);
                    while (leftUpstream == IterOutcome.OK_NEW_SCHEMA || leftUpstream == IterOutcome.OK) {
                      for (VectorWrapper<?> wrapper : left) {
                        wrapper.getValueVector().clear();
                      }
                      leftUpstream = next(HashJoinHelper.LEFT_INPUT, left);
                    }
                }
            }

            // No more output records, clean up and return
            done = true;
            if (first) {
              return IterOutcome.OK_NEW_SCHEMA;
            }
            return IterOutcome.NONE;
        } catch (ClassTransformationException | SchemaChangeException | IOException e) {
            context.fail(e);
            killIncoming(false);
            return IterOutcome.STOP;
        }
    }

    public void setupHashTable() throws IOException, SchemaChangeException, ClassTransformationException {

        // Setup the hash table configuration object
        int conditionsSize = conditions.size();

        NamedExpression rightExpr[] = new NamedExpression[conditionsSize];
        NamedExpression leftExpr[] = new NamedExpression[conditionsSize];

        // Create named expressions from the conditions
        for (int i = 0; i < conditionsSize; i++) {
            rightExpr[i] = new NamedExpression(conditions.get(i).getRight(), new FieldReference("build_side_" + i ));
            leftExpr[i] = new NamedExpression(conditions.get(i).getLeft(), new FieldReference("probe_side_" + i));

            // Hash join only supports equality currently.
            assert conditions.get(i).getRelationship().equals("==");
        }

        // Set the left named expression to be null if the probe batch is empty.
        if (leftUpstream != IterOutcome.OK_NEW_SCHEMA && leftUpstream != IterOutcome.OK) {
            leftExpr = null;
        } else {
          if (left.getSchema().getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) {
            throw new SchemaChangeException("Hash join does not support probe batch with selection vectors");
          }
        }

        HashTableConfig htConfig =
            new HashTableConfig(context.getOptions().getOption(ExecConstants.MIN_HASH_TABLE_SIZE_KEY).num_val.intValue(),
            HashTable.DEFAULT_LOAD_FACTOR, rightExpr, leftExpr);

        // Create the chained hash table
        ChainedHashTable ht  = new ChainedHashTable(htConfig, context, oContext.getAllocator(), this.right, this.left, null);
        hashTable = ht.createAndSetupHashTable(null);
    }

    public void executeBuildPhase() throws SchemaChangeException, ClassTransformationException, IOException {

        //Setup the underlying hash table
        IterOutcome rightUpstream = next(HashJoinHelper.RIGHT_INPUT, right);

        boolean moreData = true;

        while (moreData) {

            switch (rightUpstream) {

                case NONE:
                case NOT_YET:
                case STOP:
                    moreData = false;
                    continue;

                case OK_NEW_SCHEMA:
                    if (rightSchema == null) {
                        rightSchema = right.getSchema();

                        if (rightSchema.getSelectionVectorMode() != BatchSchema.SelectionVectorMode.NONE) {
                          throw new SchemaChangeException("Hash join does not support build batch with selection vectors");
                        }
                        setupHashTable();
                    } else {
                        throw new SchemaChangeException("Hash join does not support schema changes");
                    }
                // Fall through
                case OK:
                    int currentRecordCount = right.getRecordCount();

                    /* For every new build batch, we store some state in the helper context
                     * Add new state to the helper context
                     */
                    hjHelper.addNewBatch(currentRecordCount);

                    // Holder contains the global index where the key is hashed into using the hash table
                    IntHolder htIndex = new IntHolder();

                    // For every record in the build batch , hash the key columns
                    for (int i = 0; i < currentRecordCount; i++) {

                        HashTable.PutStatus status = hashTable.put(i, htIndex, 1 /* retry count */);

                        if (status != HashTable.PutStatus.PUT_FAILED) {
                            /* Use the global index returned by the hash table, to store
                             * the current record index and batch index. This will be used
                             * later when we probe and find a match.
                             */
                            hjHelper.setCurrentIndex(htIndex.value, buildBatchIndex, i);
                        }
                    }

                    /* Completed hashing all records in this batch. Transfer the batch
                     * to the hyper vector container. Will be used when we want to retrieve
                     * records that have matching keys on the probe side.
                     */
                    RecordBatchData nextBatch = new RecordBatchData(right);
                    if (hyperContainer == null) {
                        hyperContainer = new ExpandableHyperContainer(nextBatch.getContainer());
                    } else {
                        hyperContainer.addBatch(nextBatch.getContainer());
                    }

                    // completed processing a batch, increment batch index
                    buildBatchIndex++;
                    break;
            }
            // Get the next record batch
            rightUpstream = next(HashJoinHelper.RIGHT_INPUT, right);
        }
    }

    public HashJoinProbe setupHashJoinProbe() throws ClassTransformationException, IOException {



        final CodeGenerator<HashJoinProbe> cg = CodeGenerator.get(HashJoinProbe.TEMPLATE_DEFINITION, context.getFunctionRegistry());
        ClassGenerator<HashJoinProbe> g = cg.getRoot();

        // Generate the code to project build side records
        g.setMappingSet(projectBuildMapping);


        int fieldId = 0;
        JExpression buildIndex = JExpr.direct("buildIndex");
        JExpression outIndex = JExpr.direct("outIndex");
        g.rotateBlock();

        if (hyperContainer != null) {
            for(VectorWrapper<?> vv : hyperContainer) {

                MajorType inputType = vv.getField().getType();
                MajorType outputType;
                if (joinType == JoinRelType.LEFT && inputType.getMode() == DataMode.REQUIRED) {
                  outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
                } else {
                  outputType = inputType;
                }

                // Add the vector to our output container
                ValueVector v = TypeHelper.getNewVector(MaterializedField.create(vv.getField().getPath(), outputType), context.getAllocator());
                container.add(v);

                JVar inVV = g.declareVectorValueSetupAndMember("buildBatch", new TypedFieldId(vv.getField().getType(), true, fieldId));
                JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, fieldId));
                g.getEvalBlock()._if(outVV.invoke("copyFromSafe")
                  .arg(buildIndex.band(JExpr.lit((int) Character.MAX_VALUE)))
                  .arg(outIndex)
                  .arg(inVV.component(buildIndex.shrz(JExpr.lit(16)))).not())._then()._return(JExpr.FALSE);

                fieldId++;
            }
        }
        g.rotateBlock();
        g.getEvalBlock()._return(JExpr.TRUE);

        // Generate the code to project probe side records
        g.setMappingSet(projectProbeMapping);

        int outputFieldId = fieldId;
        fieldId = 0;
        JExpression probeIndex = JExpr.direct("probeIndex");
        int recordCount = 0;

        if (leftUpstream == IterOutcome.OK || leftUpstream == IterOutcome.OK_NEW_SCHEMA) {
            for (VectorWrapper<?> vv : left) {

                MajorType inputType = vv.getField().getType();
                MajorType outputType;
                if (joinType == JoinRelType.RIGHT && inputType.getMode() == DataMode.REQUIRED) {
                  outputType = Types.overrideMode(inputType, DataMode.OPTIONAL);
                } else {
                  outputType = inputType;
                }

                ValueVector v = TypeHelper.getNewVector(MaterializedField.create(vv.getField().getPath(), outputType), oContext.getAllocator());
                container.add(v);

                JVar inVV = g.declareVectorValueSetupAndMember("probeBatch", new TypedFieldId(inputType, false, fieldId));
                JVar outVV = g.declareVectorValueSetupAndMember("outgoing", new TypedFieldId(outputType, false, outputFieldId));

                g.getEvalBlock()._if(outVV.invoke("copyFromSafe").arg(probeIndex).arg(outIndex).arg(inVV).not())._then()._return(JExpr.FALSE);

                fieldId++;
                outputFieldId++;
            }
            recordCount = left.getRecordCount();
        }
        g.rotateBlock();
        g.getEvalBlock()._return(JExpr.TRUE);

        HashJoinProbe hj = context.getImplementationClass(cg);

        hj.setupHashJoinProbe(context, hyperContainer, left, recordCount, this, hashTable, hjHelper, joinType);
        return hj;
    }

    private void allocateVectors(){
      for(VectorWrapper<?> v : container){
        v.getValueVector().allocateNew();
      }
    }

    public HashJoinBatch(HashJoinPOP popConfig, FragmentContext context, RecordBatch left, RecordBatch right) throws OutOfMemoryException {
        super(popConfig, context);
        this.left = left;
        this.right = right;
        this.joinType = popConfig.getJoinType();
        this.conditions = popConfig.getConditions();
    }

    private void updateStats(HashTable htable) {
      if(htable == null) return;
      htable.getStats(htStats);
      this.stats.setLongStat(Metric.NUM_BUCKETS, htStats.numBuckets);
      this.stats.setLongStat(Metric.NUM_ENTRIES, htStats.numEntries);
      this.stats.setLongStat(Metric.NUM_RESIZING, htStats.numResizing);
      this.stats.setLongStat(Metric.RESIZING_TIME, htStats.resizingTime);
    }

    @Override
    public void killIncoming(boolean sendUpstream) {
        this.left.kill(sendUpstream);
        this.right.kill(sendUpstream);
    }

    @Override
    public void cleanup() {
        hjHelper.clear();

        // If we didn't receive any data, hyperContainer may be null, check before clearing
        if (hyperContainer != null) {
            hyperContainer.clear();
        }

        if (hashTable != null) {
            hashTable.clear();
        }
        super.cleanup();
        left.cleanup();
        right.cleanup();
    }
}
TOP

Related Classes of org.apache.drill.exec.physical.impl.join.HashJoinBatch

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.