Package org.apache.pig.impl.logicalLayer.optimizer

Source Code of org.apache.pig.impl.logicalLayer.optimizer.LogicalTransformer$ProjectFixerUpper

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.impl.logicalLayer.optimizer;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.apache.pig.impl.plan.DepthFirstWalker;
import org.apache.pig.impl.plan.PlanWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.plan.optimizer.Transformer;
import org.apache.pig.impl.plan.optimizer.Transformer;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.LOFRJoin;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.logicalLayer.LOCogroup;
import org.apache.pig.impl.logicalLayer.LOFilter;
import org.apache.pig.impl.logicalLayer.LOForEach;
import org.apache.pig.impl.logicalLayer.LOGenerate;
import org.apache.pig.impl.logicalLayer.LOProject;
import org.apache.pig.impl.logicalLayer.LOSort;
import org.apache.pig.impl.logicalLayer.LOSplit;
import org.apache.pig.impl.logicalLayer.LOSplitOutput;
import org.apache.pig.impl.logicalLayer.LOVisitor;

public abstract class LogicalTransformer extends Transformer<LogicalOperator, LogicalPlan> {

    private final Log log = LogFactory.getLog(getClass());

    protected LogicalTransformer(
            LogicalPlan plan,
            PlanWalker<LogicalOperator, LogicalPlan> walker) {
        super(plan, walker);
    }

    /**
     * Rebuild schemas after a rule has transformed the tree.  This will first
     * null out existing schemas and then call getSchema to rebuild them.
     * @throws VisitorException, FrontendException
     */
    protected void rebuildSchemas() throws VisitorException, FrontendException {
        SchemaRemover sr = new SchemaRemover(mPlan);
        sr.visit();
        SchemaCalculator sc = new SchemaCalculator(mPlan);
        sc.visit();
       
    }

    /**
     * A class to visit all the projects and change them to attach to a new
     * node.  This class overrides all of the relational operators visit
     * methods because it does not want to visit contained plans.
     */
    private class ProjectFixerUpper extends LOVisitor {

        private LogicalOperator mNewNode;
        private LogicalOperator mOldNode;
        private Map<Integer, Integer> mProjectionMapping;

        ProjectFixerUpper(
                LogicalPlan plan,
                LogicalOperator oldNode,
                LogicalOperator newNode, Map<Integer, Integer> projectionMapping) {
            super(plan,
                new DepthFirstWalker<LogicalOperator, LogicalPlan>(plan));
            mNewNode = newNode;
            mOldNode = oldNode;
            mProjectionMapping = projectionMapping;
        }

        protected void visit(LOCogroup cg) throws VisitorException {
        }

        protected void visit(LOSort s) throws VisitorException {
        }

        protected void visit(LOFilter f) throws VisitorException {
        }

        protected void visit(LOSplit s) throws VisitorException {
        }

        protected void visit(LOSplitOutput s) throws VisitorException {
        }

        protected void visit(LOForEach f) throws VisitorException {
        }

        protected void visit(LOProject p) throws VisitorException {
            // Only switch the expression if this is a top level projection,
            // that is, this project is pointing to a relational operator
            // outside the plan).
            List<LogicalOperator> preds = mPlan.getPredecessors(p);
            if (preds == null || preds.size() == 0) {
                if(p.getExpression().equals(mOldNode))
                    // Change the expression
                    p.setExpression(mNewNode);

                // Remap the projection column if necessary
                if (mProjectionMapping != null && !p.isStar()) {
                    List<Integer> oldProjection = p.getProjection();
                    List<Integer> newProjection =
                        new ArrayList<Integer>(oldProjection.size());
                    for (Integer i : oldProjection) {
                        Integer n = mProjectionMapping.get(i);
                        assert(n != null);
                        newProjection.add(n);
                    }
                }
            } else {
                p.getExpression().visit(this);
            }
        }
    }

    /**
     * Insert a node in between two existing nodes.  This includes inserting
     * the node into the correct place in the plan and finding any projects in
     * successors and reconnecting them to the new node as well as rebuilding
     * all of the schemas.
     * @param after Node to insert the new node after
     * @param newNode New node to insert
     * @param before Node to insert this node before
     * @param projectionMapping A map that defines how projections in after
     * relate to projections in newnode.  Keys are the projection offsets in
     * after, values are the new offsets in newnode.  If this field is null,
     * then it will be assumed that the mapping is 1-1.
     * @throws VisitorException, FrontendException
     */
    protected void insertBetween(
            LogicalOperator after,
            LogicalOperator newNode,
            LogicalOperator before,
            Map<Integer, Integer> projectionMapping)
            throws VisitorException, FrontendException {
        // Insert it into the plan.
        mPlan.add(newNode);
        mPlan.insertBetween(after, newNode, before);
        fixUpContainedPlans(after, newNode, before, projectionMapping);
    }
   
    /**
     * Once a node has been inserted, inner plans associated with other nodes
     * may have references to the node that has been replaced or moved.  This
     * function walks those inner plans and patches up references.
     * @param after Node that has had a new node inserted after it.
     * @param newNode node that has been inserted
     * @param before Node that has had a new node inserted before it.
     * @param projectionMapping A map that defines how projections in after
     * relate to projections in newNode.  Keys are the projection offsets in
     * after, values are the new offsets in newNode.  If this field is null,
     * then it will be assumed that the mapping is 1-1.
     * @throws VisitorException, FrontendException
     */
    protected void fixUpContainedPlans(
            LogicalOperator after,
            LogicalOperator newNode,
            LogicalOperator before,
            Map<Integer, Integer> projectionMapping)
    throws VisitorException, FrontendException {
        // Fix up COGroup internal wiring
        if (before instanceof LOCogroup) {
            LOCogroup cg = (LOCogroup) before ;
            cg.switchGroupByPlanOp(after, newNode);
        }
        if (before instanceof LOFRJoin) {
            LOFRJoin frj = (LOFRJoin) before ;
            frj.switchJoinColPlanOp(after, newNode);
        }

        // Visit all the inner plans of before and change their projects to
        // connect to newNode instead of after.
        // Find right inner plan(s) to visit
        List<LogicalPlan> plans = new ArrayList<LogicalPlan>();
        if (before instanceof LOCogroup) {
            plans.addAll((((LOCogroup)before).getGroupByPlans()).values());
        } else if (before instanceof LOFRJoin) {
            plans.addAll((((LOFRJoin)before).getJoinColPlans()).values());
        }
        else if (before instanceof LOSort) {
            plans.addAll(((LOSort)before).getSortColPlans());
        } else if (before instanceof LOFilter) {
            plans.add(((LOFilter)before).getComparisonPlan());
        } else if (before instanceof LOSplit) {
            // In this case we have to find each of the Split outputs, and
            // add their plans.
            List<LogicalOperator> splitOutputs = mPlan.getSuccessors(before);
            for (LogicalOperator so : splitOutputs) {
                if (!(so instanceof LOSplitOutput)) {
                    String msg = "Found an LOSplit with an operator other "
                        + "than LOSplitOutput after it!";
                    log.error(msg);
                    throw new VisitorException(msg);
                }
                plans.add(((LOSplitOutput)so).getConditionPlan());
            }
        } else if (before instanceof LOForEach) {
            plans.addAll(((LOForEach)before).getForEachPlans());
        }
       
        for (LogicalPlan lp : plans) {
            ProjectFixerUpper pfu =
                new ProjectFixerUpper(lp, after, newNode, projectionMapping);
            pfu.visit();
        }
    }

    /**
     * Insert a node in after an existing nodes.  This includes inserting
     * the node into the correct place in the plan and finding any projects in
     * successors and reconnecting them to the new node as well as rebuilding
     * all of the schemas.  This function
     * assumes that the node has only one predecessor.
     * @param after Node to insert the new node after
     * @param newNode New node to insert
     * @param projectionMapping A map that defines how projections in after
     * relate to projections in newnode.  Keys are the projection offsets in
     * after, values are the new offsets in newnode.  If this field is null,
     * then it will be assumed that the mapping is 1-1.
     * @throws VisitorException, FrontendException
     */
    protected void insertAfter(
            LogicalOperator after,
            LogicalOperator newNode,
            Map<Integer, Integer> projectionMapping)
            throws VisitorException, FrontendException {
        List<LogicalOperator> successors = mPlan.getSuccessors(after);
        if(successors==null) {
            mPlan.addAsLeaf(newNode);
            return;
        }
        if (successors.size() != 1) {
            throw new RuntimeException("insertAfter only valid to insert " +
                "after a node with single output.");
        }
        insertBetween(after, newNode, successors.get(0), projectionMapping);
    }

    /**
     * Remove a node in the middle of a linear chain. This includes removing the node
     * from the plan and reconnect the nodes before and after the node as well as rebuilding
     * all of the schemas. This node should have one and only one predecessor and zero or one
     * successor
     * @param nodeToRemove Node to remove
     * @param projectionMapping A map that defines how projections in after
     * relate to projections in nodeToRemove.  Keys are the projection offsets in
     * after, values are the new offsets in nodeToRemove.  If this field is null,
     * then it will be assumed that the mapping is 1-1.
     * @throws VisitorException, FrontendException
     */
    protected void removeFromChain(
            LogicalOperator nodeToRemove,
            Map<Integer, Integer> projectionMapping)
            throws VisitorException, FrontendException {
     
      List<LogicalOperator> afterNodes = mPlan.getPredecessors(nodeToRemove);
      if (afterNodes.size()!=1)
        throw new RuntimeException("removeFromChain only valid to remove " +
              "node has one predecessor.");
      List<LogicalOperator> beforeNodes = mPlan.getSuccessors(nodeToRemove);
      if (beforeNodes!=null && beforeNodes.size()!=1)
        throw new RuntimeException("removeFromChain only valid to remove " +
            "node has one successor.");
     
      // Get after and before node
      LogicalOperator after = mPlan.getPredecessors(nodeToRemove).get(0);
      LogicalOperator before = null;
      if (beforeNodes!=null)
        before = mPlan.getSuccessors(nodeToRemove).get(0);
     
        // Remove nodeToRemove from plan
      mPlan.remove(nodeToRemove);
      if (before!=null)
      {
        // Shortcut nodeToRemove.
          mPlan.connect(after, before);
 
          // Visit all the inner plans of before and change their projects to
          // connect to after instead of nodeToRemove.
          // Find right inner plan(s) to visit
          List<LogicalPlan> plans = new ArrayList<LogicalPlan>();
          if (before instanceof LOCogroup) {
              plans.addAll((((LOCogroup)before).getGroupByPlans()).values());
          } else if (before instanceof LOSort) {
              plans.addAll(((LOSort)before).getSortColPlans());
          } else if (before instanceof LOFilter) {
              plans.add(((LOFilter)before).getComparisonPlan());
          } else if (before instanceof LOSplitOutput) {
              plans.add(((LOSplitOutput)before).getConditionPlan());
          } else if (before instanceof LOForEach) {
              plans.addAll(((LOForEach)before).getForEachPlans());
          }
         
          for (LogicalPlan lp : plans) {
              ProjectFixerUpper pfu =
                  new ProjectFixerUpper(lp, nodeToRemove, after, projectionMapping);
              pfu.visit();
          }
 
      }

      // Now rebuild the schemas
        // rebuildSchemas();
    }
   
}
TOP

Related Classes of org.apache.pig.impl.logicalLayer.optimizer.LogicalTransformer$ProjectFixerUpper

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.