Package org.apache.pig.newplan.logical.rules

Source Code of org.apache.pig.newplan.logical.rules.MergeFilter$MergeFilterTransformer

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.newplan.logical.rules;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.OperatorSubPlan;
import org.apache.pig.newplan.logical.expression.AndExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.expression.ProjectExpression;
import org.apache.pig.newplan.logical.relational.LOFilter;
import org.apache.pig.newplan.logical.relational.LogicalPlan;
import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
import org.apache.pig.newplan.optimizer.Rule;
import org.apache.pig.newplan.optimizer.Transformer;

public class MergeFilter extends Rule {

    public MergeFilter(String n) {
        super(n, false);      
    }

    @Override
    public Transformer getNewTransformer() {       
        return new MergeFilterTransformer();
    }

    public class MergeFilterTransformer extends Transformer {

        private OperatorSubPlan subPlan;

        @Override
        public boolean check(OperatorPlan matched) throws FrontendException {          
            LOFilter filter = (LOFilter)matched.getSources().get(0);
            List<Operator> succeds = currentPlan.getSuccessors(filter);
            // if this filter is followed by another filter, we should combine them
            if (succeds != null && succeds.size() == 1) {
                if (succeds.get(0) instanceof LOFilter) {
                    return true;
                }
            }
            return false;
        }

        @Override
        public void transform(OperatorPlan matched) throws FrontendException {    
            subPlan = new OperatorSubPlan(currentPlan);
           
            LOFilter filter = (LOFilter)matched.getSources().get(0);

            subPlan.add(filter);
           
            List<Operator> succeds = currentPlan.getSuccessors(filter);
            if (succeds != null && succeds.size()== 1 && (succeds.get(0) instanceof LOFilter)) {
                LOFilter next = (LOFilter)succeds.get(0);
                combineFilterCond(filter, next);
               
                List<Operator> succs = currentPlan.getSuccessors(next);
                if (succs!=null && succs.size()>0) {
                    subPlan.add(succs.get(0));
                }

                // Since we remove next, we need to merge soft link into filter
                List<Operator> nextSoftPreds = null;
                if (currentPlan.getSoftLinkPredecessors(next)!=null) {
                    nextSoftPreds = new ArrayList<Operator>();
                    nextSoftPreds.addAll(currentPlan.getSoftLinkPredecessors(next));
                }
               
                if (nextSoftPreds!=null) {
                    for (Operator softPred : nextSoftPreds) {
                        currentPlan.removeSoftLink(softPred, next);
                        currentPlan.createSoftLink(softPred, filter);
                    }
                }
                currentPlan.removeAndReconnect(next);
            }
           
            Iterator<Operator> iter = filter.getFilterPlan().getOperators();
            while (iter.hasNext()) {
                Operator oper = iter.next();
                if (oper instanceof ProjectExpression) {
                    ((ProjectExpression)oper).setAttachedRelationalOp(filter);
                }
            }
        }       
       
        @Override
        public OperatorPlan reportChanges() {         
            return subPlan;
        }
       
        // combine the condition of two filters. The condition of second filter
        // is added into the condition of first filter with an AND operator.
        private void combineFilterCond(LOFilter f1, LOFilter f2) throws FrontendException {
            LogicalExpressionPlan p1 = f1.getFilterPlan();
            LogicalExpressionPlan p2 = f2.getFilterPlan();
            LogicalExpressionPlan andPlan = new LogicalExpressionPlan();
           
            // add existing operators         
            Iterator<Operator> iter = p1.getOperators();
            while(iter.hasNext()) {
                andPlan.add(iter.next());
            }
           
            iter = p2.getOperators();
            while(iter.hasNext()) {
                andPlan.add(iter.next());
            }
           
            // add all connections
            iter = p1.getOperators();
            while(iter.hasNext()) {
                Operator n = iter.next();
                List<Operator> l = p1.getPredecessors(n);
                if (l != null) {
                    for(Operator op: l) {
                        andPlan.connect(op, n);
                    }
                }
            }
           
            iter = p2.getOperators();
            while(iter.hasNext()) {
                Operator n = iter.next();
                List<Operator> l = p2.getPredecessors(n);
                if (l != null) {
                    for(Operator op: l) {
                        andPlan.connect(op, n);
                    }
                }
            }         
           
            // create an AND
            new AndExpression(andPlan, (LogicalExpression)p1.getSources().get(0), (LogicalExpression)p2.getSources().get(0));         
           
            f1.setFilterPlan(andPlan);
        }

    }

    @Override
    protected OperatorPlan buildPattern() {       
        // the pattern that this rule looks for
        // is filter operator
        LogicalPlan plan = new LogicalPlan();
        LogicalRelationalOperator op = new LOFilter(plan);
        plan.add(op);       
       
        return plan;
    }
}
TOP

Related Classes of org.apache.pig.newplan.logical.rules.MergeFilter$MergeFilterTransformer

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.