Package org.apache.pig.impl.logicalLayer.optimizer

Source Code of org.apache.pig.impl.logicalLayer.optimizer.LogicalTransformer

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.impl.logicalLayer.optimizer;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.apache.pig.impl.plan.PlanWalker;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.plan.optimizer.Transformer;
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.LOJoin;
import org.apache.pig.impl.logicalLayer.LOJoin;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.logicalLayer.LOCogroup;
import org.apache.pig.impl.logicalLayer.LOFilter;
import org.apache.pig.impl.logicalLayer.LOForEach;
import org.apache.pig.impl.logicalLayer.LOSort;
import org.apache.pig.impl.logicalLayer.LOSplit;
import org.apache.pig.impl.logicalLayer.LOSplitOutput;
import org.apache.pig.impl.logicalLayer.ProjectFixerUpper;
import org.apache.pig.impl.logicalLayer.ProjectionMapCalculator;
import org.apache.pig.impl.logicalLayer.ProjectionMapRemover;

public abstract class LogicalTransformer extends Transformer<LogicalOperator, LogicalPlan> {

    private final Log log = LogFactory.getLog(getClass());

    protected LogicalTransformer(
            LogicalPlan plan,
            PlanWalker<LogicalOperator, LogicalPlan> walker) {
        super(plan, walker);
    }

    /**
     * Rebuild schemas after a rule has transformed the tree.  This will first
     * null out existing schemas and then call getSchema to rebuild them.
     * @throws VisitorException, FrontendException
     */
    protected void rebuildSchemas() throws VisitorException, FrontendException {
        SchemaRemover sr = new SchemaRemover(mPlan);
        sr.visit();
        SchemaCalculator sc = new SchemaCalculator(mPlan);
        sc.visit();
       
    }
   
    /**
     * Rebuild projection maps after a rule has transformed the tree.  This will first
     * null out existing projection maps and then call getProjectionMap to rebuild them.
     * @throws VisitorException
     */
    protected void rebuildProjectionMaps() throws VisitorException {
        ProjectionMapRemover pMapRemover = new ProjectionMapRemover(mPlan);
        pMapRemover.visit();
        ProjectionMapCalculator pMapCalculator = new ProjectionMapCalculator(mPlan);
        pMapCalculator.visit();
       
    }

    /**
     * Insert a node in between two existing nodes.  This includes inserting
     * the node into the correct place in the plan and finding any projects in
     * successors and reconnecting them to the new node as well as rebuilding
     * all of the schemas.
     * @param after Node to insert the new node after
     * @param newNode New node to insert
     * @param before Node to insert this node before
     * @param projectionMapping A map that defines how projections in after
     * relate to projections in newnode.  Keys are the projection offsets in
     * after, values are the new offsets in newnode.  If this field is null,
     * then it will be assumed that the mapping is 1-1.
     * @throws VisitorException, FrontendException
     */
    protected void insertBetween(
            LogicalOperator after,
            LogicalOperator newNode,
            LogicalOperator before,
            Map<Integer, Integer> projectionMapping)
            throws VisitorException, FrontendException {
        // Insert it into the plan.
        mPlan.add(newNode);
        mPlan.insertBetween(after, newNode, before);
    }
   
    /**
     * Once a node has been inserted, inner plans associated with other nodes
     * may have references to the node that has been replaced or moved.  This
     * function walks those inner plans and patches up references.
     * @param after Node that has had a new node inserted after it.
     * @param newNode node that has been inserted
     * @param before Node that has had a new node inserted before it.
     * @param projectionMapping A map that defines how projections in after
     * relate to projections in newNode.  Keys are the projection offsets in
     * after, values are the new offsets in newNode.  If this field is null,
     * then it will be assumed that the mapping is 1-1.
     * @throws VisitorException, FrontendException
     */
    protected void fixUpContainedPlans(
            LogicalOperator after,
            LogicalOperator newNode,
            LogicalOperator before,
            Map<Integer, Integer> projectionMapping)
    throws VisitorException, FrontendException {
        // Fix up COGroup internal wiring
        if (before instanceof LOCogroup) {
            LOCogroup cg = (LOCogroup) before ;
            cg.switchGroupByPlanOp(after, newNode);
        }
        if (before instanceof LOJoin) {
            LOJoin frj = (LOJoin) before ;
            frj.switchJoinColPlanOp(after, newNode);
        }
        if (before instanceof LOJoin) {
            LOJoin frj = (LOJoin) before ;
            frj.switchJoinColPlanOp(after, newNode);
        }

        // Visit all the inner plans of before and change their projects to
        // connect to newNode instead of after.
        // Find right inner plan(s) to visit
        List<LogicalPlan> plans = new ArrayList<LogicalPlan>();
        if (before instanceof LOCogroup) {
            plans.addAll((((LOCogroup)before).getGroupByPlans()).values());
        } else if (before instanceof LOJoin) {
            plans.addAll((((LOJoin)before).getJoinPlans()).values());
        } else if (before instanceof LOJoin) {
            plans.addAll((((LOJoin)before).getJoinPlans()).values());
        }
        else if (before instanceof LOSort) {
            plans.addAll(((LOSort)before).getSortColPlans());
        } else if (before instanceof LOFilter) {
            plans.add(((LOFilter)before).getComparisonPlan());
        } else if (before instanceof LOSplit) {
            // In this case we have to find each of the Split outputs, and
            // add their plans.
            List<LogicalOperator> splitOutputs = mPlan.getSuccessors(before);
            for (LogicalOperator so : splitOutputs) {
                if (!(so instanceof LOSplitOutput)) {
                    String msg = "Found an LOSplit with an operator other "
                        + "than LOSplitOutput after it!";
                    log.error(msg);
                    throw new VisitorException(msg);
                }
                plans.add(((LOSplitOutput)so).getConditionPlan());
            }
        } else if (before instanceof LOForEach) {
            plans.addAll(((LOForEach)before).getForEachPlans());
        }
       
        for (LogicalPlan lp : plans) {
            ProjectFixerUpper pfu =
                new ProjectFixerUpper(lp, after, newNode, projectionMapping);
            pfu.visit();
        }
    }

    /**
     * Insert a node in after an existing nodes.  This includes inserting
     * the node into the correct place in the plan and finding any projects in
     * successors and reconnecting them to the new node as well as rebuilding
     * all of the schemas.  This function
     * assumes that the node has only one predecessor.
     * @param after Node to insert the new node after
     * @param newNode New node to insert
     * @param projectionMapping A map that defines how projections in after
     * relate to projections in newnode.  Keys are the projection offsets in
     * after, values are the new offsets in newnode.  If this field is null,
     * then it will be assumed that the mapping is 1-1.
     * @throws VisitorException, FrontendException
     */
    protected void insertAfter(
            LogicalOperator after,
            LogicalOperator newNode,
            Map<Integer, Integer> projectionMapping)
            throws VisitorException, FrontendException {
        List<LogicalOperator> successors = mPlan.getSuccessors(after);
        if(successors==null) {
            mPlan.addAsLeaf(newNode);
            return;
        }
        if (successors.size() != 1) {
            throw new RuntimeException("insertAfter only valid to insert " +
                "after a node with single output.");
        }
        insertBetween(after, newNode, successors.get(0), projectionMapping);
    }

    @Override
    public void reset() {
        // do nothing
    }

}
TOP

Related Classes of org.apache.pig.impl.logicalLayer.optimizer.LogicalTransformer

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.