Package org.apache.pig.test

Source Code of org.apache.pig.test.TestStore

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;

import static org.junit.Assert.assertEquals;

import java.util.*;

import org.apache.pig.ExecType;

import java.io.File;
import java.io.BufferedReader;
import java.io.FileReader;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import junit.framework.Assert;
import org.apache.pig.impl.plan.OperatorKey;

import org.apache.pig.FuncSpec;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.DefaultBagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.DefaultTuple;
import org.apache.pig.data.DataByteArray;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.PigServer;
import org.apache.pig.impl.io.FileLocalizer;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.backend.local.executionengine.LocalPigLauncher;
import org.apache.pig.backend.local.executionengine.LocalPOStoreImpl;
import org.apache.pig.backend.local.executionengine.physicalLayer.counters.POCounter;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
import org.apache.pig.test.utils.GenPhyOp;
import org.apache.pig.test.utils.GenRandomData;
import org.apache.pig.test.utils.TestHelper;
import org.apache.pig.tools.pigstats.PigStats;
import org.apache.pig.impl.logicalLayer.LOStore;
import org.apache.pig.impl.logicalLayer.LogicalOperator;
import org.apache.pig.impl.logicalLayer.LogicalPlan;
import org.apache.pig.impl.logicalLayer.LogicalPlanBuilder;
import org.apache.pig.backend.datastorage.ContainerDescriptor;
import org.apache.pig.backend.datastorage.DataStorage;
import org.apache.pig.backend.datastorage.DataStorageException;
import org.apache.pig.backend.datastorage.ElementDescriptor;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class TestStore extends junit.framework.TestCase {
    POStore st;
    FileSpec fSpec;
    DataBag inpDB;
    static MiniCluster cluster = MiniCluster.buildCluster();
    PigContext pc;
    POProject proj;
    PigServer pig;
    POCounter pcount;
   
    @Before
    public void setUp() throws Exception {
        st = GenPhyOp.topStoreOp();
        pcount = new POCounter(new OperatorKey("", (new Random()).nextLong()));
        fSpec = new FileSpec("file:/tmp/storeTest.txt",
                      new FuncSpec(PigStorage.class.getName(), new String[]{":"}));
        st.setSFile(fSpec);

        FileLocalizer.deleteTempFiles();
        pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
        pc = pig.getPigContext();

        st.setStoreImpl(new LocalPOStoreImpl(pc));
       
        proj = GenPhyOp.exprProject();
        proj.setColumn(0);
        proj.setResultType(DataType.TUPLE);
        proj.setOverloaded(true);
        List<PhysicalOperator> inps = new ArrayList<PhysicalOperator>();
    }

    @After
    public void tearDown() throws Exception {
    }

    private PigStats store() throws Exception {
        PhysicalPlan pp = new PhysicalPlan();
        pp.add(proj);
        pp.add(st);
        pp.add(pcount);
        //pp.connect(proj, st);
        pp.connect(proj, pcount);
        pp.connect(pcount, st);
        pc.setExecType(ExecType.LOCAL);
        return new LocalPigLauncher().launchPig(pp, "TestStore", pc);
    }

    @Test
    public void testStore() throws Exception {
        inpDB = GenRandomData.genRandSmallTupDataBag(new Random(), 10, 100);
        Tuple t = new DefaultTuple();
        t.append(inpDB);
        proj.attachInput(t);
        assertTrue(store() != null);
       
        int size = 0;
        BufferedReader br = new BufferedReader(new FileReader("/tmp/storeTest.txt"));
        for(String line=br.readLine();line!=null;line=br.readLine()){
            String[] flds = line.split(":",-1);
            t = new DefaultTuple();
            t.append(flds[0].compareTo("")!=0 ? flds[0] : null);
            t.append(flds[1].compareTo("")!=0 ? Integer.parseInt(flds[1]) : null);
           
            System.err.println("Simple data: ");
            System.err.println(line);
            System.err.println("t: ");
            System.err.println(t);
            assertEquals(true, TestHelper.bagContains(inpDB, t));
            ++size;
        }
        assertEquals(true, size==inpDB.size());
        FileLocalizer.delete(fSpec.getFileName(), pc);
    }

    @Test
    public void testStoreComplexData() throws Exception {
        inpDB = GenRandomData.genRandFullTupTextDataBag(new Random(), 10, 100);
        Tuple t = new DefaultTuple();
        t.append(inpDB);
        proj.attachInput(t);
        assertTrue(store() != null);
        PigStorage ps = new PigStorage(":");
       
        int size = 0;
        BufferedReader br = new BufferedReader(new FileReader("/tmp/storeTest.txt"));
        for(String line=br.readLine();line!=null;line=br.readLine()){
            String[] flds = line.split(":",-1);
            t = new DefaultTuple();
            t.append(flds[0].compareTo("")!=0 ? ps.bytesToBag(flds[0].getBytes()) : null);
            t.append(flds[1].compareTo("")!=0 ? ps.bytesToCharArray(flds[1].getBytes()) : null);
            t.append(flds[2].compareTo("")!=0 ? ps.bytesToCharArray(flds[2].getBytes()) : null);
            t.append(flds[3].compareTo("")!=0 ? ps.bytesToDouble(flds[3].getBytes()) : null);
            t.append(flds[4].compareTo("")!=0 ? ps.bytesToFloat(flds[4].getBytes()) : null);
            t.append(flds[5].compareTo("")!=0 ? ps.bytesToInteger(flds[5].getBytes()) : null);
            t.append(flds[6].compareTo("")!=0 ? ps.bytesToLong(flds[6].getBytes()) : null);
            t.append(flds[7].compareTo("")!=0 ? ps.bytesToMap(flds[7].getBytes()) : null);
            t.append(flds[8].compareTo("")!=0 ? ps.bytesToTuple(flds[8].getBytes()) : null);
           
            assertEquals(true, TestHelper.bagContains(inpDB, t));
            ++size;
        }
        assertEquals(true, size==inpDB.size());
        FileLocalizer.delete(fSpec.getFileName(), pc);
    }

    @Test
    public void testStoreComplexDataWithNull() throws Exception {
        Tuple inputTuple = GenRandomData.genRandSmallBagTextTupleWithNulls(new Random(), 10, 100);
        inpDB = DefaultBagFactory.getInstance().newDefaultBag();
        inpDB.add(inputTuple);
        Tuple t = new DefaultTuple();
        t.append(inpDB);
        proj.attachInput(t);
        assertTrue(store() != null);
        PigStorage ps = new PigStorage(":");
       
        int size = 0;
        BufferedReader br = new BufferedReader(new FileReader("/tmp/storeTest.txt"));
        for(String line=br.readLine();line!=null;line=br.readLine()){
            System.err.println("Complex data: ");
            System.err.println(line);
            String[] flds = line.split(":",-1);
            t = new DefaultTuple();
            t.append(flds[0].compareTo("")!=0 ? ps.bytesToBag(flds[0].getBytes()) : null);
            t.append(flds[1].compareTo("")!=0 ? ps.bytesToCharArray(flds[1].getBytes()) : null);
            t.append(flds[2].compareTo("")!=0 ? ps.bytesToCharArray(flds[2].getBytes()) : null);
            t.append(flds[3].compareTo("")!=0 ? ps.bytesToDouble(flds[3].getBytes()) : null);
            t.append(flds[4].compareTo("")!=0 ? ps.bytesToFloat(flds[4].getBytes()) : null);
            t.append(flds[5].compareTo("")!=0 ? ps.bytesToInteger(flds[5].getBytes()) : null);
            t.append(flds[6].compareTo("")!=0 ? ps.bytesToLong(flds[6].getBytes()) : null);
            t.append(flds[7].compareTo("")!=0 ? ps.bytesToMap(flds[7].getBytes()) : null);
            t.append(flds[8].compareTo("")!=0 ? ps.bytesToTuple(flds[8].getBytes()) : null);
            t.append(flds[9].compareTo("")!=0 ? ps.bytesToCharArray(flds[9].getBytes()) : null);
           
            assertTrue(inputTuple.equals(t));
            ++size;
        }
        FileLocalizer.delete(fSpec.getFileName(), pc);
    }

    @Test
    public void testStoreRemoteRel() throws Exception {
        checkStorePath("test","/tmp/test");
    }

    @Test
    public void testStoreRemoteAbs() throws Exception {
        checkStorePath("/tmp/test","/tmp/test");
    }

    @Test
    public void testStoreRemoteRelScheme() throws Exception {
        checkStorePath("test","/tmp/test");
    }

    @Test
    public void testStoreRemoteAbsScheme() throws Exception {
        checkStorePath("hdfs:/tmp/test","/tmp/test");
    }

    @Test
    public void testStoreRemoteAbsAuth() throws Exception {
        checkStorePath("hdfs://localhost:9000/test","/test");
    }

    @Test
    public void testStoreRemoteNormalize() throws Exception {
        checkStorePath("/tmp/foo/../././","/tmp");
    }

    private void checkStorePath(String orig, String expected) throws Exception {
        checkStorePath(orig, expected, false);
    }

    private void checkStorePath(String orig, String expected, boolean isTmp) throws Exception {
        pc.getProperties().setProperty("opt.multiquery",""+true);

        DataStorage dfs = pc.getDfs();
        dfs.setActiveContainer(dfs.asContainer("/tmp"));
        Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>();
        Map<OperatorKey, LogicalOperator> logicalOpTable = new HashMap<OperatorKey, LogicalOperator>();
        Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>();
        Map<String, String> fileNameMap = new HashMap<String, String>();
       
        LogicalPlanBuilder builder = new LogicalPlanBuilder(pc);
       
        String query = "a = load 'foo';";
        LogicalPlan lp = builder.parse("Test-Store",
                                       query,
                                       aliases,
                                       logicalOpTable,
                                       aliasOp,
                                       fileNameMap);
        query = "store a into '"+orig+"';";
        lp = builder.parse("Test-Store",
                           query,
                           aliases,
                           logicalOpTable,
                           aliasOp,
                           fileNameMap);

        Assert.assertTrue(lp.size()>1);
        LogicalOperator op = lp.getLeaves().get(0);
       
        Assert.assertTrue(op instanceof LOStore);
        LOStore store = (LOStore)op;

        String p = store.getOutputFile().getFileName();
        p = p.replaceAll("hdfs://[0-9a-zA-Z:\\.]*/","/");
       
        if (isTmp) {
            Assert.assertTrue(p.matches("/tmp.*"));
        } else {
            Assert.assertEquals(p, expected);
        }
    }
}
TOP

Related Classes of org.apache.pig.test.TestStore

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.