Package org.apache.pig.newplan.logical.rules

Source Code of org.apache.pig.newplan.logical.rules.PushUpFilter$PushUpFilterTransformer

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.pig.newplan.logical.rules;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;

import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.OperatorSubPlan;
import org.apache.pig.newplan.logical.expression.LogicalExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.expression.ProjectExpression;
import org.apache.pig.newplan.logical.expression.UserFuncExpression;
import org.apache.pig.newplan.logical.relational.LOCogroup;
import org.apache.pig.newplan.logical.relational.LOCross;
import org.apache.pig.newplan.logical.relational.LODistinct;
import org.apache.pig.newplan.logical.relational.LOFilter;
import org.apache.pig.newplan.logical.relational.LOForEach;
import org.apache.pig.newplan.logical.relational.LOJoin;
import org.apache.pig.newplan.logical.relational.LOLimit;
import org.apache.pig.newplan.logical.relational.LOLoad;
import org.apache.pig.newplan.logical.relational.LONative;
import org.apache.pig.newplan.logical.relational.LOSort;
import org.apache.pig.newplan.logical.relational.LOSplit;
import org.apache.pig.newplan.logical.relational.LOSplitOutput;
import org.apache.pig.newplan.logical.relational.LOStore;
import org.apache.pig.newplan.logical.relational.LOStream;
import org.apache.pig.newplan.logical.relational.LOUnion;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
import org.apache.pig.newplan.logical.relational.LogicalSchema;
import org.apache.pig.newplan.optimizer.Rule;
import org.apache.pig.newplan.optimizer.Transformer;

public class PushUpFilter extends Rule {

    public PushUpFilter(String n) {
        super(n, false);
    }

    @Override
    public Transformer getNewTransformer() {
        return new PushUpFilterTransformer();
    }

    public class PushUpFilterTransformer extends Transformer {
        private OperatorSubPlan subPlan;

        @Override
        public boolean check(OperatorPlan matched) throws FrontendException {
            // check if it is inner join
            Operator current = matched.getSources().get(0);

            Operator pred = findNonFilterPredecessor( current );
            if( pred == null )
                return false;

            // sort and union are always okay.
            if( pred instanceof LOSort || pred instanceof LOUnion ) {
                return true;
            }

            // if the predecessor is one of LOLoad/LOStore/LOStream/LOLimit/LONative
            // if predecessor is LOForEach, it is optimized by rule FilterAboveForeach
            // return false
            if( pred instanceof LOLoad   || pred instanceof LOStore || pred instanceof LOStream      ||
                pred instanceof LOFilter || pred instanceof LOSplit || pred instanceof LOSplitOutput ||
                pred instanceof LOLimit  || pred instanceof LONative || pred instanceof LOForEach) {
                return false;
            }

            LOFilter filter = (LOFilter)current;
            List<Operator> preds = currentPlan.getPredecessors( pred );
            LogicalExpressionPlan filterPlan = filter.getFilterPlan();

            if (OptimizerUtils.planHasNonDeterministicUdf(filterPlan)) {
                return false;
            }

            //if there is no nondeterministic udf, filter can be pushed above
            // Distinct
            if(pred instanceof LODistinct){
                return true;
            }

            // collect all uids used in the filter plan
            Set<Long> uids = collectUidFromExpPlan(filterPlan);

            if( pred instanceof LOCogroup ) {
                LOCogroup cogrp = (LOCogroup)pred;
                if( preds.size() == 1 ) {
                    if( hasAll( (LogicalRelationalOperator)preds.get( 0 ), uids )    ) {
                        // Order by is ok if all UIDs can be found from previous operator.
                        return true;
                    }
                } else if ( 1 == cogrp.getExpressionPlans().get( 0 ).size() && !containUDF( filterPlan ) ) {
                    // Optimization is possible if there is only a single key.
                    // For regular cogroup, we cannot use UIDs to determine if filter can be pushed up.
                    // But if there is no UDF, it's okay, as only UDF can take bag field as input.
                    return true;
                }
            }

            // if the predecessor is a multi-input operator then detailed
            // checks are required
            if( pred instanceof LOCross || pred instanceof LOJoin ) {
                boolean[] innerFlags = null;
                boolean isFullOuter = true;
                boolean isInner = true;
                if( pred instanceof LOJoin ) {
                    innerFlags = ((LOJoin)pred).getInnerFlags();
                    // If all innerFlag is false, means a full outer join,
                    for (boolean inner : innerFlags) {
                        if (inner) {
                            isFullOuter = false;
                        } else {
                            isInner = false;
                        }
                    }
                    if (isFullOuter)
                        return false;
                }


                for(int j=0; j<preds.size(); j++) {
                    if (hasAll((LogicalRelationalOperator)preds.get(j), uids)) {
                        // For LOJoin, innerFlag==true indicate that branch is the outer join side
                        // which has the exact opposite semantics
                        if (pred instanceof LOCross || pred instanceof LOJoin && (isInner || innerFlags[j]))
                            return true;
                    }
                }
            }

            return false;
        }

        private boolean containUDF(LogicalExpressionPlan filterPlan) {
            Iterator<Operator> it = filterPlan.getOperators();
            while( it.hasNext() ) {
                if( it.next() instanceof UserFuncExpression )
                    return true;
            }
            return false;
        }

        Set<Long> collectUidFromExpPlan(LogicalExpressionPlan filterPlan) throws FrontendException {
            Set<Long> uids = new HashSet<Long>();
            Iterator<Operator> iter = filterPlan.getOperators();
            while(iter.hasNext()) {
                Operator op = iter.next();
                if (op instanceof ProjectExpression) {
                    long uid = ((ProjectExpression)op).getFieldSchema().uid;
                    uids.add(uid);
                }
            }
            return uids;
        }

        /**
         * Starting from current operator (which is a filter), search its successors until
         * locating a non-filter operator. Null is returned if none is found.
         */
        private Operator findNonFilterPredecessor(Operator current) {
            Operator op = current;
            do {
                List<Operator> predecessors = currentPlan.getPredecessors( op );

                // if there are no predecessors return false
                if( predecessors == null || predecessors.size() == 0 ) {
                    return null;
                }

                Operator pred = predecessors.get( 0 );
                if( pred instanceof LOFilter ) {
                    op = pred;
                    continue;
                } else {
                    return pred;
                }
            } while( true );

        }

        @Override
        public void transform(OperatorPlan matched) throws FrontendException {
            subPlan = new OperatorSubPlan(currentPlan);

            LOFilter filter = (LOFilter)matched.getSources().get(0);

            // This is the one that we will insert filter btwn it and it's input.
            Operator predecessor = this.findNonFilterPredecessor( filter );
            subPlan.add( predecessor) ;

            // Disconnect the filter in the plan without removing it from the plan.
            Operator predec = currentPlan.getPredecessors( filter ).get( 0 );
            Operator succed;

            if (currentPlan.getSuccessors(filter)!=null)
                succed = currentPlan.getSuccessors(filter).get(0);
            else
                succed = null;

            Pair<Integer, Integer> p1 = currentPlan.disconnect(predec, filter);
            if (succed!=null) {
                subPlan.add(succed);
                Pair<Integer, Integer> p2 = currentPlan.disconnect(filter, succed);
                currentPlan.connect(predec, p1.first, succed, p2.second);
            }

            if( predecessor instanceof LOSort || predecessor instanceof LODistinct ||
                ( predecessor instanceof LOCogroup && currentPlan.getPredecessors( predecessor ).size() == 1 ) ) {
                // For sort, put the filter in front of it.
                Operator prev = currentPlan.getPredecessors( predecessor ).get( 0 );

                insertFilter( prev, predecessor, filter );
                return;
            }

            // Find the predecessor of join that contains all required uids.
            LogicalExpressionPlan filterPlan = filter.getFilterPlan();
            List<Operator> preds = currentPlan.getPredecessors( predecessor );
            Map<Integer, Operator> inputs = findInputsToAddFilter( filterPlan, predecessor, preds );

            LOFilter newFilter = null;
            for( Entry<Integer, Operator> entry : inputs.entrySet() ) {
                int inputIndex = entry.getKey();
                Operator pred = entry.getValue();

                // Find projection field offset
                int columnOffset = 0;
                if( predecessor instanceof LOJoin || predecessor instanceof LOCross ) {
                    for( int i = 0; i < inputIndex; i++ ) {
                        columnOffset += ( (LogicalRelationalOperator)preds.get( i ) ).getSchema().size();
                    }
                }

                // Reuse the filter for the first match. For others, need to make a copy of the filter
                // and add it between input and predecessor.
                newFilter = newFilter == null ? filter : new LOFilter( (LogicalPlan)currentPlan );

                currentPlan.add( newFilter );
                subPlan.add( newFilter );
                subPlan.add( pred );
                LogicalExpressionPlan fPlan = filterPlan.deepCopy();
                List<Operator> sinks = fPlan.getSinks();
                List<ProjectExpression> projExprs = new ArrayList<ProjectExpression>();
                for( Operator sink : sinks ) {
                    if( sink instanceof ProjectExpression )
                        projExprs.add( (ProjectExpression)sink );
                }

                if( predecessor instanceof LOCogroup ) {
                    for( ProjectExpression projExpr : projExprs ) {
                        // Need to merge filter condition and cogroup by expression;
                        LogicalExpressionPlan plan = ((LOCogroup) predecessor).getExpressionPlans().get( inputIndex ).iterator().next();
                        LogicalExpressionPlan copy = plan.deepCopy();
                        LogicalExpression root = (LogicalExpression)copy.getSinks().get( 0 );
                        List<Operator> predecessors = fPlan.getPredecessors( projExpr );
                        if( predecessors == null || predecessors.size() == 0 ) {
                            fPlan.remove( projExpr );
                            fPlan.add( root );
                        } else {
                            fPlan.add( root );
                            Operator pred1 = predecessors.get( 0 );
                            Pair<Integer, Integer> pair = fPlan.disconnect( pred1, projExpr );
                            fPlan.connect( pred1, pair.first, root, pair.second );
                            fPlan.remove( projExpr );
                        }
                    }
                }

                // Now, reset the projection expressions in the new filter plan.
                sinks = fPlan.getSinks();
                for( Operator sink : sinks ) {
                    if( sink instanceof ProjectExpression ) {
                        ProjectExpression projE = (ProjectExpression)sink;
                         projE.setAttachedRelationalOp( newFilter );
                         projE.setInputNum( 0 );
                         projE.setColNum( projE.getColNum() - columnOffset );
                    }
                 }
                newFilter.setFilterPlan( fPlan );

                insertFilter( pred, predecessor, newFilter );
            }
        }

        // check if a relational operator contains all of the specified uids
        private boolean hasAll(LogicalRelationalOperator op, Set<Long> uids) throws FrontendException {
            LogicalSchema schema = op.getSchema();
            if (schema==null)
                return false;
            for(long uid: uids) {
                if (schema.findField(uid) == -1) {
                    return false;
                }
            }

            return true;
        }

        @Override
        public OperatorPlan reportChanges() {
            return currentPlan;
        }

        // Insert the filter in between the given two operators.
        private void insertFilter(Operator prev, Operator predecessor, LOFilter filter)
        throws FrontendException {
            Pair<Integer, Integer> p3 = currentPlan.disconnect( prev, predecessor );
            currentPlan.connect( prev, p3.first, filter, 0 );
            currentPlan.connect( filter, 0, predecessor, p3.second );
        }

        // Identify those among preds that will need to have a filter between it and the predecessor.
        private Map<Integer, Operator> findInputsToAddFilter(LogicalExpressionPlan filterPlan, Operator predecessor,
                List<Operator> preds) throws FrontendException {
            Map<Integer, Operator> inputs = new HashMap<Integer, Operator>();

            if( predecessor instanceof LOUnion || predecessor instanceof LOCogroup ) {
                for( int i = 0; i < preds.size(); i++ ) {
                    inputs.put( i, preds.get( i ) );
                }
                return inputs;
            }

            // collect all uids used in the filter plan
            Set<Long> uids = collectUidFromExpPlan(filterPlan);
            boolean[] innerFlags = null;
            boolean isInner = true;
            if (predecessor instanceof LOJoin) {
                innerFlags = ((LOJoin)predecessor).getInnerFlags();
                for (boolean inner : innerFlags) {
                    if (!inner) {
                        isInner = false;
                        break;
                    }
                }
            }

            // Find the predecessor of join that contains all required uids.
            for(int j=0; j<preds.size(); j++) {
                // Filter can push to LOJoin outer branch, but no inner branch
                if( hasAll((LogicalRelationalOperator)preds.get(j), uids) &&
                        (predecessor instanceof LOCross || predecessor instanceof LOJoin && (isInner || innerFlags[j]))) {
                    Operator input = preds.get(j);
                    subPlan.add(input);
                    inputs.put( j, input );
                }
            }
            return  inputs;
        }
    }

    @Override
    protected OperatorPlan buildPattern() {
        LogicalPlan plan = new LogicalPlan();
        LogicalRelationalOperator op1 = new LOFilter(plan);
        plan.add( op1 );

        return plan;
    }

}
TOP

Related Classes of org.apache.pig.newplan.logical.rules.PushUpFilter$PushUpFilterTransformer

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.