Package org.apache.pig.test

Source Code of org.apache.pig.test.TestFilter

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Random;

import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.EqualToExpr;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.GreaterThanExpr;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POAnd;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.DefaultTuple;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.test.utils.GenPhyOp;
import org.apache.pig.test.utils.GenRandomData;
import org.apache.pig.test.utils.TestHelper;
import org.junit.Before;
import org.junit.Test;

public class TestFilter {
    Random r = new Random(42L);
    POFilter pass;
    POFilter fail;
    Tuple t;
    DataBag inp;
    POFilter projFil;

    boolean[] nullFlags = new boolean[] { false, true };

    @Before
    public void setUp() throws Exception {
        pass = GenPhyOp.topFilterOpWithExPlan(50, 25);
        fail = GenPhyOp.topFilterOpWithExPlan(25, 50);
    }

    private void setUpProjFil(boolean withNulls) throws Exception {
        if (withNulls)
            inp = GenRandomData.genRandSmallTupDataBagWithNulls(r, 10, 100);
        else
            inp = GenRandomData.genRandSmallTupDataBag(r, 10, 100);
        t = GenRandomData.genRandSmallBagTuple(r, 10, 100);
        projFil = GenPhyOp.topFilterOpWithProj(1, 50);
        POProject inpPrj = GenPhyOp.exprProject();
        Tuple tmpTpl = new DefaultTuple();
        tmpTpl.append(inp);
        inpPrj.setColumn(0);
        inpPrj.setResultType(DataType.TUPLE);
        inpPrj.setOverloaded(true);
        inpPrj.attachInput(tmpTpl);
        List<PhysicalOperator> inputs = new ArrayList<PhysicalOperator>();
        inputs.add(inpPrj);
        projFil.setInputs(inputs);
    }

    @Test
    public void testGetNextTuple() throws Exception {
        pass.attachInput(t);
        Result res = pass.getNextTuple();
        assertEquals(t, res.result);
        fail.attachInput(t);
        res = fail.getNextTuple();
        assertEquals(res.returnStatus, POStatus.STATUS_EOP);

        for (int i = 0; i < nullFlags.length; i++) {
            int count = 0;
            setUpProjFil(nullFlags[i]);
            while (true) {
                res = projFil.getNextTuple();
                if (res.returnStatus == POStatus.STATUS_EOP)
                    break;
                count++;
                assertEquals(POStatus.STATUS_OK, res.returnStatus);
                Tuple output = (Tuple)res.result;
                assertEquals(
                        "Running testGetNextTuple with nullFlags set to " + nullFlags[i] + ":",
                        true, TestHelper.bagContains(inp, output));
                assertEquals(
                        "Running testGetNextTuple with nullFlags set to " + nullFlags[i] + ":",
                        true, (Integer)((Tuple)res.result).get(1) > 50);
            }
            assertEquals("Running testGetNextTuple with nullFlags set to " + nullFlags[i] + ":",
                    getExpCount(inp), count);

        }
    }

    /**
     * @param inp2
     * @return
     * @throws ExecException
     */
    private int getExpCount(DataBag inp2) throws ExecException {
        // TODO Auto-generated method stub
        int count = 0;
        for (Iterator<Tuple> it = inp2.iterator(); it.hasNext();) {

            Tuple t = it.next();
            if (t.get(1) != null && (Integer)t.get(1) > 50)
                count++;
        }

        return count;
    }

    @Test
    public void testSimpleFilter() throws Exception {
        for (int i = 0; i < nullFlags.length; i++) {
            // Build the inner expression
            POProject p1 = GenPhyOp.exprProject(0);
            POProject p2 = GenPhyOp.exprProject(1);
            GreaterThanExpr gt = GenPhyOp.compGreaterThanExpr(p1, p2, DataType.INTEGER);

            PhysicalPlan ip = new PhysicalPlan();
            ip.add(p1);
            ip.add(p2);
            ip.add(gt);
            ip.connect(p1, gt);
            ip.connect(p2, gt);

            int[] ints = { 0, 1, 1, 0, 1, 1 };
            TupleFactory tf = TupleFactory.getInstance();
            DataBag inbag = BagFactory.getInstance().newDefaultBag();
            Random r = new Random();
            for (int j = 0; j < ints.length; j += 2) {
                // if we are testing with nulls
                // introduce nulls randomly
                if (nullFlags[i] == true) {
                    int rand = r.nextInt(100);
                    if (rand <= 20) {
                        Tuple t = tf.newTuple(2);
                        t.set(0, new Integer(ints[j]));
                        t.set(1, null);
                        inbag.add(t);
                    } else if (rand > 20 && rand <= 40) {
                        Tuple t = tf.newTuple(2);
                        t.set(0, null);
                        t.set(1, new Integer(ints[j + 1]));
                        inbag.add(t);
                    } else if (rand > 40 && rand <= 60) {
                        Tuple t = tf.newTuple(2);
                        t.set(0, null);
                        t.set(1, null);
                        inbag.add(t);
                    }
                }
                Tuple t = tf.newTuple(2);
                t.set(0, new Integer(ints[j]));
                t.set(1, new Integer(ints[j + 1]));
                inbag.add(t);
            }

            PORead read = GenPhyOp.topReadOp(inbag);
            POFilter filter = GenPhyOp.connectedFilterOp(read);
            filter.setPlan(ip);

            PhysicalPlan op = new PhysicalPlan();
            op.add(filter);
            op.add(read);
            op.connect(read, filter);

            DataBag outbag = BagFactory.getInstance().newDefaultBag();
            Result res;
            Tuple t = tf.newTuple();
            do {
                res = filter.getNextTuple();
                if (res.returnStatus == POStatus.STATUS_OK) {
                    outbag.add((Tuple)res.result);
                }
            } while (res.returnStatus == POStatus.STATUS_OK);
            assertEquals("Running " + this.getClass().getName() + "with nullFlags set to "
                    + nullFlags[i] + ":", POStatus.STATUS_EOP, res.returnStatus);
            assertEquals("Running " + this.getClass().getName() + "with nullFlags set to "
                    + nullFlags[i] + ":", 1, outbag.size());
            Iterator<Tuple> it = outbag.iterator();
            assertTrue("Running " + this.getClass().getName() + "with nullFlags set to "
                    + nullFlags[i] + ":", it.hasNext());
            t = it.next();
            assertEquals("Running " + this.getClass().getName() + "with nullFlags set to "
                    + nullFlags[i] + ":", 2, t.size());
            assertTrue("Running " + this.getClass().getName() + "with nullFlags set to "
                    + nullFlags[i] + ":", t.get(0) instanceof Integer);
            assertTrue("Running " + this.getClass().getName() + "with nullFlags set to "
                    + nullFlags[i] + ":", t.get(1) instanceof Integer);
            Integer i1 = (Integer)t.get(0);
            Integer i2 = (Integer)t.get(1);
            assertEquals("Running " + this.getClass().getName() + "with nullFlags set to "
                    + nullFlags[i] + ":", 1, (int)i1);
            assertEquals("Running " + this.getClass().getName() + "with nullFlags set to "
                    + nullFlags[i] + ":", 0, (int)i2);
        }
    }

    @Test
    public void testAndFilter() throws Exception {
        for (int i = 0; i < nullFlags.length; i++) {
            // Build the inner expression
            POProject p1 = GenPhyOp.exprProject(0);
            ConstantExpression c2 = GenPhyOp.exprConst();
            c2.setValue(new Integer(0));
            GreaterThanExpr gt = GenPhyOp.compGreaterThanExpr(p1, c2, DataType.INTEGER);

            POProject p3 = GenPhyOp.exprProject(1);
            ConstantExpression c = GenPhyOp.exprConst();
            c.setValue(new Integer(1));
            EqualToExpr eq = GenPhyOp.compEqualToExpr(p3, c, DataType.INTEGER);
            POAnd and = GenPhyOp.compAndExpr(gt, eq);

            PhysicalPlan ip = new PhysicalPlan();
            ip.add(p1);
            ip.add(c2);
            ip.add(gt);
            ip.add(p3);
            ip.add(c);
            ip.add(eq);
            ip.add(and);
            ip.connect(p1, gt);
            ip.connect(c2, gt);
            ip.connect(p3, eq);
            ip.connect(c, eq);
            ip.connect(eq, and);
            ip.connect(gt, and);

            int[] ints = { 0, 1, 1, 0, 1, 1 };
            TupleFactory tf = TupleFactory.getInstance();
            DataBag inbag = BagFactory.getInstance().newDefaultBag();
            Random r = new Random();
            for (int j = 0; j < ints.length; j += 2) {
                // if we are testing with nulls
                // introduce nulls randomly
                if (nullFlags[i] == true) {
                    int rand = r.nextInt(100);
                    if (rand <= 20) {
                        Tuple t = tf.newTuple(2);
                        t.set(0, new Integer(ints[j]));
                        t.set(1, null);
                        inbag.add(t);
                    } else if (rand > 20 && rand <= 40) {
                        Tuple t = tf.newTuple(2);
                        t.set(0, null);
                        t.set(1, new Integer(ints[j + 1]));
                        inbag.add(t);
                    } else if (rand > 40 && rand <= 60) {
                        Tuple t = tf.newTuple(2);
                        t.set(0, null);
                        t.set(1, null);
                        inbag.add(t);
                    }
                }
                Tuple t = tf.newTuple(2);
                t.set(0, new Integer(ints[j]));
                t.set(1, new Integer(ints[j + 1]));
                inbag.add(t);
            }

            PORead read = GenPhyOp.topReadOp(inbag);
            POFilter filter = GenPhyOp.connectedFilterOp(read);
            filter.setPlan(ip);

            PhysicalPlan op = new PhysicalPlan();
            op.add(filter);
            op.add(read);
            op.connect(read, filter);

            DataBag outbag = BagFactory.getInstance().newDefaultBag();
            Result res;
            Tuple t = tf.newTuple();
            do {
                res = filter.getNextTuple();
                if (res.returnStatus == POStatus.STATUS_OK) {
                    outbag.add((Tuple)res.result);
                }
            } while (res.returnStatus == POStatus.STATUS_OK);
            assertEquals(POStatus.STATUS_EOP, res.returnStatus);
            assertEquals(1, outbag.size());
            Iterator<Tuple> it = outbag.iterator();
            assertTrue(it.hasNext());
            t = it.next();
            assertEquals(2, t.size());
            assertTrue(t.get(0) instanceof Integer);
            assertTrue(t.get(1) instanceof Integer);
            Integer i1 = (Integer)t.get(0);
            Integer i2 = (Integer)t.get(1);
            assertEquals(1, (int)i1);
            assertEquals(1, (int)i2);
        }
    }
}
TOP

Related Classes of org.apache.pig.test.TestFilter

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.