Package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer

Source Code of org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.backend.hadoop.executionengine.mapReduceLayer;

import java.io.ByteArrayOutputStream;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;

import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.plans.MROpPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCounter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POUnion;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.Operator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.MultiMap;

/**
* An operator model for a Map Reduce job.
* Acts as a host to the plans that will
* execute in map, reduce and optionally combine
* phases. These will be embedded in the MROperPlan
* in order to capture the dependencies amongst jobs.
*/
public class MapReduceOper extends Operator<MROpPlanVisitor> {
    private static final long serialVersionUID = 1L;

    //The physical plan that should be executed
    //in the map phase
    public PhysicalPlan mapPlan;
   
    //The physical plan that should be executed
    //in the reduce phase
    public PhysicalPlan reducePlan;
   
    //The physical plan that should be executed
    //in the combine phase if one exists. Will be used
    //by the optimizer.
    public PhysicalPlan combinePlan;
   
    // key for the map plan
    // this is needed when the key is null to create
    // an appropriate NullableXXXWritable object
    public byte mapKeyType;
   
    //Indicates that the map plan creation
    //is complete
    boolean mapDone = false;
   
    //Indicates that the reduce plan creation
    //is complete
    boolean reduceDone = false;
   
    // Indicates that there is an operator which uses endOfAllInput flag in the
    // map plan
    boolean endOfAllInputInMap = false;
   
    // Indicates that there is an operator which uses endOfAllInput flag in the
    // reduce plan
    boolean endOfAllInputInReduce = false;;
   
    //Indicates if this job is an order by job
    boolean globalSort = false;

    // Indicates if this is a limit after a sort
    boolean limitAfterSort = false;
   
    // Indicate if the entire purpose for this map reduce job is doing limit, does not change
    // anything else. This is to help POPackageAnnotator to find the right POPackage to annotate
    boolean limitOnly = false;
   
    OPER_FEATURE feature = OPER_FEATURE.NONE;

    // If true, putting an identity combine in this
    // mapreduce job will speed things up.
    boolean needsDistinctCombiner = false;
   
    // If true, we will use secondary key in the map-reduce job
    boolean useSecondaryKey = false;
   
    //The quantiles file name if globalSort is true
    String quantFile;
   
    //The sort order of the columns;
    //asc is true and desc is false
    boolean[] sortOrder;
   
    // Sort order for secondary keys;
    boolean[] secondarySortOrder;

    public Set<String> UDFs;
   
    public Set<PhysicalOperator> scalars;
   
    // Indicates if a UDF comparator is used
    boolean isUDFComparatorUsed = false;
   
    transient NodeIdGenerator nig;

    private String scope;
   
    int requestedParallelism = -1;
   
    // estimated at runtime
    int estimatedParallelism = -1;
   
    // calculated at runtime
    int runtimeParallelism = -1;
   
    /* Name of the Custom Partitioner used */
    String customPartitioner = null;
   
    // Last POLimit value in this map reduce operator, needed by LimitAdjuster
    // to add additional map reduce operator with 1 reducer after this
    long limit = -1;

    // POLimit can also have an expression. See PIG-1926
    PhysicalPlan limitPlan = null;

    // Indicates that this MROper is a splitter MROper.
    // That is, this MROper ends due to a POSPlit operator.
    private boolean splitter = false;

  // Set to true if it is skewed join
  private boolean skewedJoin = false;

    // Name of the partition file generated by sampling process,
    // Used by Skewed Join
  private String skewedJoinPartitionFile;
 
  // Flag to communicate from MRCompiler to JobControlCompiler what kind of
  // comparator is used by Hadoop for sorting for this MROper.
  // By default, set to false which will make Pig provide raw comparators.
  // Set to true in indexing job generated in map-side cogroup, merge join.
  private boolean usingTypedComparator = false;
 
  // Flag to indicate if the small input splits need to be combined to form a larger
  // one in order to reduce the number of mappers. For merge join, both tables
  // are NOT combinable for correctness.
  private boolean combineSmallSplits = true;
 
  // Map of the physical operator in physical plan to the one in MR plan: only needed
  // if the physical operator is changed/replaced in MR compilation due to, e.g., optimization
  public MultiMap<PhysicalOperator, PhysicalOperator> phyToMRMap;
 
  private static enum OPER_FEATURE {
      NONE,
      // Indicate if this job is a sampling job
      SAMPLER,
      // Indicate if this job is a merge indexer
      INDEXER,
      // Indicate if this job is a group by job
      GROUPBY,     
      // Indicate if this job is a cogroup job
      COGROUP,     
      // Indicate if this job is a regular join job
      HASHJOIN;
  };
 
    public MapReduceOper(OperatorKey k) {
        super(k);
        mapPlan = new PhysicalPlan();
        combinePlan = new PhysicalPlan();
        reducePlan = new PhysicalPlan();
        UDFs = new HashSet<String>();
        scalars = new HashSet<PhysicalOperator>();
        nig = NodeIdGenerator.getGenerator();
        scope = k.getScope();
        phyToMRMap = new MultiMap<PhysicalOperator, PhysicalOperator>();
    }

    /*@Override
    public String name() {
        return "MapReduce - " + mKey.toString();
    }*/
   
    private String shiftStringByTabs(String DFStr, String tab) {
        StringBuilder sb = new StringBuilder();
        String[] spl = DFStr.split("\n");
        for (int i = 0; i < spl.length; i++) {
            sb.append(tab);
            sb.append(spl[i]);
            sb.append("\n");
        }
        sb.delete(sb.length() - "\n".length(), sb.length());
        return sb.toString();
    }
   
    /**
     * Uses the string representation of the
     * component plans to identify itself.
     */
    @Override
    public String name() {
        String udfStr = getUDFsAsStr();
       
        StringBuilder sb = new StringBuilder("MapReduce" + "(" + requestedParallelism +
                (udfStr.equals("")? "" : ",") + udfStr + ")" + " - " + mKey.toString()
                + ":\n");
        int index = sb.length();
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        if(!mapPlan.isEmpty()){
            mapPlan.explain(baos);
            String mp = new String(baos.toByteArray());
            sb.append(shiftStringByTabs(mp, "|   "));
        }
        else
            sb.append("Map Plan Empty");
        if (!reducePlan.isEmpty()){
            baos.reset();
            reducePlan.explain(baos);
            String rp = new String(baos.toByteArray());
            sb.insert(index, shiftStringByTabs(rp, "|   ") + "\n");
        }
        else
            sb.insert(index, "Reduce Plan Empty" + "\n");
        return sb.toString();
    }

    private String getUDFsAsStr() {
        StringBuilder sb = new StringBuilder();
        if(UDFs!=null && UDFs.size()>0){
            for (String str : UDFs) {
                sb.append(str.substring(str.lastIndexOf('.')+1));
                sb.append(',');
            }
            sb.deleteCharAt(sb.length()-1);
        }
        return sb.toString();
    }

    @Override
    public boolean supportsMultipleInputs() {
        return true;
    }

    @Override
    public boolean supportsMultipleOutputs() {
        return true;
    }

    @Override
    public void visit(MROpPlanVisitor v) throws VisitorException {
        v.visitMROp(this);
    }
   
    public boolean isMapDone() {
        return mapDone;
    }
   
    public void setMapDone(boolean mapDone){
        this.mapDone = mapDone;
    }
   
    public void setMapDoneSingle(boolean mapDone) throws PlanException{
        this.mapDone = mapDone;
        if (mapDone && mapPlan.getLeaves().size()>1) {
            mapPlan.addAsLeaf(getUnion());
        }
    }
   
    public void setMapDoneMultiple(boolean mapDone) throws PlanException{
        this.mapDone = mapDone;
        if (mapDone && mapPlan.getLeaves().size()>0) {
            mapPlan.addAsLeaf(getUnion());
        }
    }
   
    private POUnion getUnion(){
        return new POUnion(new OperatorKey(scope,nig.getNextNodeId(scope)));
    }
   
    public boolean isReduceDone() {
        return reduceDone;
    }

    public void setReduceDone(boolean reduceDone){
        this.reduceDone = reduceDone;
    }
   
    public boolean isGlobalSort() {
        return globalSort;
    }
   
    public boolean isSkewedJoin() {
      return (skewedJoinPartitionFile != null);
    }
   
    public void setSkewedJoinPartitionFile(String file) {     
      skewedJoinPartitionFile = file;
    }
   
    public String getSkewedJoinPartitionFile() {
      return skewedJoinPartitionFile;
    }

  public void setSkewedJoin(boolean skJoin) {
    this.skewedJoin = skJoin;
  }

  public boolean getSkewedJoin() {
    return skewedJoin;
  }

    public void setGlobalSort(boolean globalSort) {
        this.globalSort = globalSort;
    }

    public boolean isLimitAfterSort() {
        return limitAfterSort;
    }

    public void setLimitAfterSort(boolean las) {
        limitAfterSort = las;
    }
   
    public boolean isLimitOnly() {
        return limitOnly;
    }
   
    public void setLimitOnly(boolean limitOnly) {
        this.limitOnly = limitOnly;
    }

    public boolean isIndexer() {
        return (feature == OPER_FEATURE.INDEXER);
    }
   
    public void markIndexer() {
        feature = OPER_FEATURE.INDEXER;
    }
   
    public boolean isSampler() {
        return (feature == OPER_FEATURE.SAMPLER);
    }
   
    public void markSampler() {
        feature = OPER_FEATURE.SAMPLER;
    }
   
    public boolean isGroupBy() {
        return (feature == OPER_FEATURE.GROUPBY);
    }
   
    public void markGroupBy() {
        feature = OPER_FEATURE.GROUPBY;
    }
   
    public boolean isCogroup() {
        return (feature == OPER_FEATURE.COGROUP);
    }
   
    public void markCogroup() {
        feature = OPER_FEATURE.COGROUP;
    }
   
    public boolean isRegularJoin() {
        return (feature == OPER_FEATURE.HASHJOIN);
    }
   
    public void markRegularJoin() {
        feature = OPER_FEATURE.HASHJOIN;
    }
   
    public boolean needsDistinctCombiner() {
        return needsDistinctCombiner;
    }

    public void setNeedsDistinctCombiner(boolean nic) {
        needsDistinctCombiner = nic;
    }

    public String getQuantFile() {
        return quantFile;
    }

    public void setQuantFile(String quantFile) {
        this.quantFile = quantFile;
    }

    public void setSortOrder(boolean[] sortOrder) {
        if(null == sortOrder) return;
        this.sortOrder = new boolean[sortOrder.length];
        for(int i = 0; i < sortOrder.length; ++i) {
            this.sortOrder[i] = sortOrder[i];
        }
    }
   
    public void setSecondarySortOrder(boolean[] secondarySortOrder) {
        if(null == secondarySortOrder) return;
        this.secondarySortOrder = new boolean[secondarySortOrder.length];
        for(int i = 0; i < secondarySortOrder.length; ++i) {
            this.secondarySortOrder[i] = secondarySortOrder[i];
        }
    }
            
    public boolean[] getSortOrder() {
        return sortOrder;
    }

    public boolean[] getSecondarySortOrder() {
        return secondarySortOrder;
    }

    /**
     * @return whether end of all input is set in the map plan
     */
    public boolean isEndOfAllInputSetInMap() {
        return endOfAllInputInMap;
    }

    /**
     * @param endOfAllInputInMap the streamInMap to set
     */
    public void setEndOfAllInputInMap(boolean endOfAllInputInMap) {
        this.endOfAllInputInMap = endOfAllInputInMap;
    }

    /**
     * @return whether end of all input is set in the reduce plan
     */
    public boolean isEndOfAllInputSetInReduce() {
        return endOfAllInputInReduce;
    }

    /**
     * @param endOfAllInputInReduce the streamInReduce to set
     */
    public void setEndOfAllInputInReduce(boolean endOfAllInputInReduce) {
        this.endOfAllInputInReduce = endOfAllInputInReduce;
    }   

    public int getRequestedParallelism() {
        return requestedParallelism;
    }
   
    public String getCustomPartitioner() {
      return customPartitioner;
    }

    public void setSplitter(boolean spl) {
        splitter = spl;
    }

    public boolean isSplitter() {
        return splitter;
    }
   
    public boolean getUseSecondaryKey() {
        return useSecondaryKey;
    }
   
    public void setUseSecondaryKey(boolean useSecondaryKey) {
        this.useSecondaryKey = useSecondaryKey;
    }

    protected boolean usingTypedComparator() {
        return usingTypedComparator;
    }

    protected void useTypedComparator(boolean useTypedComparator) {
        this.usingTypedComparator = useTypedComparator;
    }

    protected void noCombineSmallSplits() {
        combineSmallSplits = false;
    }

    public boolean combineSmallSplits() {
        return combineSmallSplits;
    }

    public boolean isRankOperation() {
        return getRankOperationId().size() != 0;
    }
   
    public ArrayList<String> getRankOperationId() {
        ArrayList<String> operationIDs = new ArrayList<String>();
        Iterator<PhysicalOperator> mapRoots = this.mapPlan.getRoots().iterator();

        while(mapRoots.hasNext()) {
            PhysicalOperator operation = mapRoots.next();
            if(operation instanceof PORank)
                operationIDs.add(((PORank) operation).getOperationID());
        }

        return operationIDs;
    }

    public boolean isCounterOperation() {
        return (getCounterOperation() != null);
    }

    public boolean isRowNumber() {
        POCounter counter = getCounterOperation();
        return (counter != null)?counter.isRowNumber():false;
    }

    public String getOperationID() {
        POCounter counter = getCounterOperation();
        return (counter != null)?counter.getOperationID():null;
    }

    private POCounter getCounterOperation() {
        POCounter counter = getCounterOperation(this.mapPlan);
        if (counter == null) {
            counter = getCounterOperation(this.reducePlan);
        }
        return counter;
    }

    private POCounter getCounterOperation(PhysicalPlan plan) {
        PhysicalOperator operator;
        Iterator<PhysicalOperator> it = plan.getLeaves().iterator();

        while (it.hasNext()) {
            operator = it.next();
            if (operator instanceof POCounter) {
                return (POCounter) operator;
            } else if (operator instanceof POStore) {
                List<PhysicalOperator> preds = plan.getPredecessors(operator);
                if (preds != null) {
                    for (PhysicalOperator pred : preds) {
                        if (pred instanceof POCounter) {
                            return (POCounter) pred;
                        }
                    }
                }
            }
        }
        return null;
    }
}
TOP

Related Classes of org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MapReduceOper

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.