Package org.apache.pig.test

Source Code of org.apache.pig.test.TestAccumulator

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test;

import java.io.*;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;

import junit.framework.TestCase;

import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.parser.ParseException;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@RunWith(JUnit4.class)
public class TestAccumulator extends TestCase{
    private static final String INPUT_FILE = "AccumulatorInput.txt";
    private static final String INPUT_FILE2 = "AccumulatorInput2.txt";
    private static final String INPUT_FILE3 = "AccumulatorInput3.txt";
    private static final String INPUT_FILE4 = "AccumulatorInput4.txt";
    private PigServer pigServer;
    private static MiniCluster cluster = MiniCluster.buildCluster();

    public TestAccumulator() throws ExecException, IOException{
        pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
        // pigServer = new PigServer(ExecType.LOCAL);
        pigServer.getPigContext().getProperties().setProperty("pig.accumulative.batchsize", "2");    
        pigServer.getPigContext().getProperties().setProperty("pig.exec.batchsize", "2");
        pigServer.getPigContext().getProperties().setProperty("pig.exec.nocombiner", "true");
        // reducing the number of retry attempts to speed up test completion
        pigServer.getPigContext().getProperties().setProperty("mapred.map.max.attempts","1");
        pigServer.getPigContext().getProperties().setProperty("mapred.reduce.max.attempts","1");
    }
   
    @Before
    public void setUp() throws Exception {
        pigServer.getPigContext().getProperties().remove("opt.accumulator");
        createFiles();
    }
   
    @AfterClass
    public static void oneTimeTearDown() throws Exception {
        cluster.shutDown();
    }

    private void createFiles() throws IOException {
        PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
               
        w.println("100\tapple");           
        w.println("200\torange");     
        w.println("300\tstrawberry");     
        w.println("300\tpear");
        w.println("100\tapple");
        w.println("300\tpear");
        w.println("400\tapple");   
        w.close();  
       
        Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
       
        w = new PrintWriter(new FileWriter(INPUT_FILE2));
       
        w.println("100\t");     
        w.println("100\t");
        w.println("200\t");     
        w.println("200\t");     
        w.println("300\tstrawberry");
        w.close();  
       
        Util.copyFromLocalToCluster(cluster, INPUT_FILE2, INPUT_FILE2);
       
        w = new PrintWriter(new FileWriter(INPUT_FILE3));
       
        w.println("100\t1.0");     
        w.println("100\t2.0");
        w.println("200\t1.1");     
        w.println("200\t2.1");
        w.println("100\t3.0");     
        w.println("100\t4.0");
        w.println("200\t3.1");
        w.println("100\t5.0");
        w.println("300\t3.3");
        w.println("400\t");
        w.println("400\t");
        w.close();  
       
        Util.copyFromLocalToCluster(cluster, INPUT_FILE3, INPUT_FILE3);
       
        w = new PrintWriter(new FileWriter(INPUT_FILE4));
       
        w.println("100\thttp://ibm.com,ibm");     
        w.println("100\thttp://ibm.com,ibm");
        w.println("200\thttp://yahoo.com,yahoo");     
        w.println("300\thttp://sun.com,sun");
        w.close();  
       
        Util.copyFromLocalToCluster(cluster, INPUT_FILE4, INPUT_FILE4);
    }
   
    @After
    public void tearDown() throws Exception {
        new File(INPUT_FILE).delete();
        Util.deleteFile(cluster, INPUT_FILE);
        new File(INPUT_FILE2).delete();
        Util.deleteFile(cluster, INPUT_FILE2);
        new File(INPUT_FILE3).delete();
        Util.deleteFile(cluster, INPUT_FILE3);       
        new File(INPUT_FILE4).delete();
        Util.deleteFile(cluster, INPUT_FILE4);
    }
   
    @Test
    public void testAccumBasic() throws IOException{
        // test group by
        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
        pigServer.registerQuery("B = group A by id;");
        pigServer.registerQuery("C = foreach B generate group,  org.apache.pig.test.utils.AccumulatorBagCount(A);");                    

        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
        expected.put(100, 2);
        expected.put(200, 1);
        expected.put(300, 3);
        expected.put(400, 1);
       
                 
        Iterator<Tuple> iter = pigServer.openIterator("C");
       
        while(iter.hasNext()) {
            Tuple t = iter.next();
            assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));               
        }           
       
        pigServer.registerQuery("B = group A by id;");
        pigServer.registerQuery("C = foreach B generate group,  " +
                "org.apache.pig.test.utils.AccumulatorBagCount(A), org.apache.pig.test.utils.BagCount(A);");                    
       
        try{
            iter = pigServer.openIterator("C");
       
            while(iter.hasNext()) {
                Tuple t = iter.next();
                assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));               
            }     
            fail("accumulator should not be called.");
        }catch(IOException e) {
            // should throw exception from AccumulatorBagCount.
        }
       
        // test cogroup
        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
        pigServer.registerQuery("B = load '" + INPUT_FILE + "' as (id:int, fruit);");
        pigServer.registerQuery("C = cogroup A by id, B by id;");
        pigServer.registerQuery("D = foreach C generate group,  " +
                "org.apache.pig.test.utils.AccumulatorBagCount(A), org.apache.pig.test.utils.AccumulatorBagCount(B);");                    

        HashMap<Integer, String> expected2 = new HashMap<Integer, String>();
        expected2.put(100, "2,2");
        expected2.put(200, "1,1");
        expected2.put(300, "3,3");
        expected2.put(400, "1,1");
       
                 
        iter = pigServer.openIterator("D");
       
        while(iter.hasNext()) {
            Tuple t = iter.next();
            assertEquals(expected2.get((Integer)t.get(0)), t.get(1).toString()+","+t.get(2).toString());               
        }           
    }     
   
    @Test
    public void testAccumWithNegative() throws IOException{
        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
        pigServer.registerQuery("B = group A by id;");
        pigServer.registerQuery("C = foreach B generate group,  -org.apache.pig.test.utils.AccumulatorBagCount(A);");                    

        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
        expected.put(100, -2);
        expected.put(200, -1);
        expected.put(300, -3);
        expected.put(400, -1);
       
                 
        Iterator<Tuple> iter = pigServer.openIterator("C");
       
        while(iter.hasNext()) {
            Tuple t = iter.next();
            assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));               
        }           
    }
   
    @Test
    public void testAccumWithAdd() throws IOException{
        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
        pigServer.registerQuery("B = group A by id;");
        pigServer.registerQuery("C = foreach B generate group,  org.apache.pig.test.utils.AccumulatorBagCount(A)+1.0;");                    
       
        {
            HashMap<Integer, Double> expected = new HashMap<Integer, Double>();
            expected.put(100, 3.0);
            expected.put(200, 2.0);
            expected.put(300, 4.0);
            expected.put(400, 2.0);
           
                     
            Iterator<Tuple> iter = pigServer.openIterator("C");
           
            while(iter.hasNext()) {
                Tuple t = iter.next();
                assertEquals(expected.get((Integer)t.get(0)), (Double)t.get(1));               
            }                           
        }
       
        {
            pigServer.registerQuery("C = foreach B generate group,  " +
            "org.apache.pig.test.utils.AccumulatorBagCount(A)+org.apache.pig.test.utils.AccumulatorBagCount(A);");                    

            HashMap<Integer, Integer>expected = new HashMap<Integer, Integer>();
            expected.put(100, 4);
            expected.put(200, 2);
            expected.put(300, 6);
            expected.put(400, 2);
   
             
            Iterator<Tuple> iter = pigServer.openIterator("C");
   
            while(iter.hasNext()) {
                Tuple t = iter.next();
                assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));               
            }
        }
    }     
   
    @Test
    public void testAccumWithMinus() throws IOException{
        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
        pigServer.registerQuery("B = group A by id;");
        pigServer.registerQuery("C = foreach B generate group, " +
                " org.apache.pig.test.utils.AccumulatorBagCount(A)*3.0-org.apache.pig.test.utils.AccumulatorBagCount(A);");                    

        HashMap<Integer, Double> expected = new HashMap<Integer, Double>();
        expected.put(100, 4.0);
        expected.put(200, 2.0);
        expected.put(300, 6.0);
        expected.put(400, 2.0);
       
                 
        Iterator<Tuple> iter = pigServer.openIterator("C");
       
        while(iter.hasNext()) {
            Tuple t = iter.next();
            assertEquals(expected.get((Integer)t.get(0)), (Double)t.get(1));               
        }                                  
    }             
   
    @Test
    public void testAccumWithMod() throws IOException{
        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
        pigServer.registerQuery("B = group A by id;");
        pigServer.registerQuery("C = foreach B generate group,  " +
                "org.apache.pig.test.utils.AccumulatorBagCount(A) % 2;");                    

        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
        expected.put(100, 0);
        expected.put(200, 1);
        expected.put(300, 1);
        expected.put(400, 1);
       
                 
        Iterator<Tuple> iter = pigServer.openIterator("C");
       
        while(iter.hasNext()) {
            Tuple t = iter.next();
            assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));               
        }                                  
    }            
   
    @Test
    public void testAccumWithDivide() throws IOException{
        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
        pigServer.registerQuery("B = group A by id;");
        pigServer.registerQuery("C = foreach B generate group,  " +
                "org.apache.pig.test.utils.AccumulatorBagCount(A)/2;");                    

        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
        expected.put(100, 1);
        expected.put(200, 0);
        expected.put(300, 1);
        expected.put(400, 0);
       
                 
        Iterator<Tuple> iter = pigServer.openIterator("C");
       
        while(iter.hasNext()) {
            Tuple t = iter.next();
            assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));               
        }                                  
    }       
   
    @Test
    public void testAccumWithAnd() throws IOException{
        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
        pigServer.registerQuery("B = group A by id;");
        pigServer.registerQuery("C = foreach B generate group,  " +
                "((org.apache.pig.test.utils.AccumulatorBagCount(A)>1 and " +
                "org.apache.pig.test.utils.AccumulatorBagCount(A)<3)?0:1);");                    

        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
        expected.put(100, 0);
        expected.put(200, 1);
        expected.put(300, 1);
        expected.put(400, 1);
       
                 
        Iterator<Tuple> iter = pigServer.openIterator("C");
       
        while(iter.hasNext()) {
            Tuple t = iter.next();
            assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));               
        }                                  
    }         
   
    @Test
    public void testAccumWithOr() throws IOException{
        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
        pigServer.registerQuery("B = group A by id;");
        pigServer.registerQuery("C = foreach B generate group,  " +
                "((org.apache.pig.test.utils.AccumulatorBagCount(A)>3 or " +
                "org.apache.pig.test.utils.AccumulatorBagCount(A)<2)?0:1);");                    

        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
        expected.put(100, 1);
        expected.put(200, 0);
        expected.put(300, 1);
        expected.put(400, 0);
       
                 
        Iterator<Tuple> iter = pigServer.openIterator("C");
       
        while(iter.hasNext()) {
            Tuple t = iter.next();
            assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));               
        }                                  
    } 
   
    @Test
    public void testAccumWithRegexp() throws IOException{
        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
        pigServer.registerQuery("B = group A by id;");
        pigServer.registerQuery("C = foreach B generate group,  " +
                "(((chararray)org.apache.pig.test.utils.AccumulatorBagCount(A)) matches '1*' ?0:1);");                    

        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
        expected.put(100, 1);
        expected.put(200, 0);
        expected.put(300, 1);
        expected.put(400, 0);
       
                 
        Iterator<Tuple> iter = pigServer.openIterator("C");
       
        while(iter.hasNext()) {
            Tuple t = iter.next();
            assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));               
        }                                  
    }             
   
    @Test
    public void testAccumWithIsNull() throws IOException{
        pigServer.registerQuery("A = load '" + INPUT_FILE2 + "' as (id:int, fruit);");
        pigServer.registerQuery("B = group A by id;");
        pigServer.registerQuery("C = foreach B generate group,  " +
                "((chararray)org.apache.pig.test.utils.AccumulativeSumBag(A) is null?0:1);");                    

        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
        expected.put(100, 0);
        expected.put(200, 0);
        expected.put(300, 1);               
                 
        Iterator<Tuple> iter = pigServer.openIterator("C");
       
        while(iter.hasNext()) {
            Tuple t = iter.next();
            assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));               
        }                                  
    }             
   
    @Test
    public void testAccumWithDistinct() throws IOException{
        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, f);");
        pigServer.registerQuery("B = group A by id;");
        pigServer.registerQuery("C = foreach B { D = distinct A; generate group, org.apache.pig.test.utils.AccumulatorBagCount(D)+1;};");                    

        HashMap<Integer, Integer> expected = new HashMap<Integer, Integer>();
        expected.put(100, 2);
        expected.put(200, 2);
        expected.put(300, 3);
        expected.put(400, 2);
                 
        Iterator<Tuple> iter = pigServer.openIterator("C");
       
        while(iter.hasNext()) {
            Tuple t = iter.next();
            assertEquals(expected.get((Integer)t.get(0)), (Integer)t.get(1));               
        }                                  
    }            
   
    @Test
    public void testAccumWithSort() throws IOException{
        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, f);");
        pigServer.registerQuery("B = foreach A generate id, f, id as t;");
        pigServer.registerQuery("C = group B by id;");
        pigServer.registerQuery("D = foreach C { E = order B by f; F = E.f; generate group, org.apache.pig.test.utils.AccumulativeSumBag(F);};");                    

        HashMap<Integer, String> expected = new HashMap<Integer, String>();
        expected.put(100, "(apple)(apple)");
        expected.put(200, "(orange)");
        expected.put(300, "(pear)(pear)(strawberry)");
        expected.put(400, "(apple)");
                 
        Iterator<Tuple> iter = pigServer.openIterator("D");
       
        while(iter.hasNext()) {
            Tuple t = iter.next();
            assertEquals(expected.get((Integer)t.get(0)), (String)t.get(1));               
        }                                  
    }
   
    public void testAccumWithBuildinAvg() throws IOException {
      HashMap<Integer, Double> expected = new HashMap<Integer, Double>();
      expected.put(100, 3.0);
      expected.put(200, 2.1);
      expected.put(300, 3.3);
      expected.put(400, null);
      // Test all the averages for correct behaviour with null values
      String[] types = { "double", "float", "int", "long" };
      for (int i = 0; i < types.length; i++) {
        if (i > 1) { // adjust decimal error for non real types
          expected.put(200, 2.0);
          expected.put(300, 3.0);
        }
        pigServer.registerQuery("A = load '" + INPUT_FILE3 + "' as (id:int, v:"
            + types[i] + ");");
        pigServer.registerQuery("C = group A by id;");
        pigServer.registerQuery("D = foreach C generate group, AVG(A.v);");
        Iterator<Tuple> iter = pigServer.openIterator("D");

        while (iter.hasNext()) {
          Tuple t = iter.next();
          Double v = expected.get((Integer) t.get(0));
          if (v != null) {
            assertEquals(v.doubleValue(), ((Number) t.get(1)).doubleValue(),
                0.0001);
          } else {
            assertEquals(null, t.get(1));
          }
        }
      }
    }
   
    @Test
    public void testAccumWithBuildin() throws IOException{
        pigServer.registerQuery("A = load '" + INPUT_FILE3 + "' as (id:int, v:double);");
        pigServer.registerQuery("C = group A by id;");
        // moving AVG accumulator test to separate test case
        pigServer.registerQuery("D = foreach C generate group, SUM(A.v), COUNT(A.v), MIN(A.v), MAX(A.v);");      

        HashMap<Integer, Double[]> expected = new HashMap<Integer, Double[]>();
        expected.put(100, new Double[]{15.0, 5.0, 1.0, 5.0});
        expected.put(200, new Double[]{6.3, 3.0, 1.1, 3.1});
        expected.put(300, new Double[]{3.3, 1.0, 3.3, 3.3});
        expected.put(400, new Double[] { null, 0.0, null, null });
                 
        Iterator<Tuple> iter = pigServer.openIterator("D");
       
        while(iter.hasNext()) {
            Tuple t = iter.next();
            Double[] v = expected.get((Integer)t.get(0));
            for(int i=0; i<v.length; i++) {
              if (v[i] != null) {
                assertEquals(v[i].doubleValue(), ((Number) t.get(i + 1))
                    .doubleValue(), 0.0001);
              } else {
                assertEquals(null, t.get(i + 1));
              }
            }           
        }   
    }
   
    @Test
    public void testAccumWithMultiBuildin() throws IOException{
        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, c:chararray);");
        pigServer.registerQuery("C = group A by 1;");
        pigServer.registerQuery("D = foreach C generate SUM(A.id), 1+SUM(A.id)+SUM(A.id);");                    

        Iterator<Tuple> iter = pigServer.openIterator("D");
       
        while(iter.hasNext()) {
            Tuple t = iter.next();   
            t.get(0).toString().equals("1700");
            t.get(1).toString().equals("3401");  
        }   
    }

    // Pig 1105
    @Test
    public void testAccumCountStar() throws IOException{
        pigServer.registerQuery("A = load '" + INPUT_FILE3 + "' as (id:int, v:double);");
        pigServer.registerQuery("C = group A by id;");
        pigServer.registerQuery("D = foreach C generate group, COUNT_STAR(A.id);");

        try {
            Iterator<Tuple> iter = pigServer.openIterator("D");
        } catch (Exception e) {
            fail("COUNT_STAR should be supported by accumulator interface");
        }     
    }
   
    /**
     * see PIG-1963.
     * If there is a POSort or PODistinct still remaining in the plan
     * (after secondary sort optimization), accumulative mode can't
     * be used as they are blocking operators
     * @throws IOException
     */
    @Test
    public void testAccumulatorOffOnSort() throws IOException{
        pigServer.registerQuery("A = load '" + INPUT_FILE2 + "' as (id:int, fruit);");
        pigServer.registerQuery("B = group A by id;");
        //one POSort will remain because secondary sort can be used only for one of them
        pigServer.registerQuery("C = foreach B " +
                        "{ " +
                        "  o1 = order A by fruit;" +
                        "  o2 = order A by fruit desc;" +
                        "  generate  org.apache.pig.test.utils.AccumulativeSumBag(o1.fruit), " +
                        "                  org.apache.pig.test.utils.AccumulativeSumBag(o2.fruit); " +
                        "};");
       
        checkAccumulatorOff("C");       
    }
   
    /**
     * see PIG-1963.
     * If there is a POSort or PODistinct still remaining in the plan
     * (after secondary sort optimization), accumulative mode can't
     * be used as they are blocking operators
     * @throws IOException
     */
    @Test
    public void testAccumulatorOffOnDistinct() throws IOException{
        pigServer.registerQuery("A = load '" + INPUT_FILE2 + "' as (id:int, fruit, category);");
        pigServer.registerQuery("B = group A by id;");

        pigServer.registerQuery("C = foreach B " +
                        "{ " +
                        "  o1 = order A by fruit;" +
                        "  d2 = distinct A.category;" +
                        "  generate  org.apache.pig.test.utils.AccumulativeSumBag(o1.fruit), " +
                        "                  org.apache.pig.test.utils.AccumulativeSumBag(d2); " +
                        "};");
       
        checkAccumulatorOff("C");       
    }
   
    @Test   
    public void testAccumulatorOff() throws IOException{
        pigServer.getPigContext().getProperties().setProperty("opt.accumulator", "false");
       
        pigServer.registerQuery("A = load '" + INPUT_FILE2 + "' as (id:int, fruit);");
        pigServer.registerQuery("B = group A by id;");
        pigServer.registerQuery("C = foreach B generate group, org.apache.pig.test.utils.AccumulativeSumBag(A);");
       
        checkAccumulatorOff("C");
        pigServer.getPigContext().getProperties().setProperty("opt.accumulator", "true");
       
    }   
   
    private void checkAccumulatorOff(String alias) {
        try {
            Iterator<Tuple> iter = pigServer.openIterator(alias);
            int c = 0;
            while(iter.hasNext()) {
                iter.next();
                c++;
            }
            fail("Accumulator should be off.");
        }catch(Exception e) {
            // we should get exception
        }
    }
   
    @Test   
    public void testAccumWithMap() throws IOException{
        pigServer.registerQuery("A = load '" + INPUT_FILE4 + "' as (id, url);");
        pigServer.registerQuery("B = group A by (id, url);");
        pigServer.registerQuery("C = foreach B generate COUNT(A), org.apache.pig.test.utils.URLPARSE(group.url)#'url';");                    

        HashMap<Integer, String> expected = new HashMap<Integer, String>();
        expected.put(2, "http://ibm.com");
        expected.put(1, "http://yahoo.com");
        expected.put(1, "http://sun.com");       
                 
        Iterator<Tuple> iter = pigServer.openIterator("C");
       
        while(iter.hasNext()) {
            Tuple t = iter.next();
            assertEquals(expected.get((Long)t.get(0)), (String)t.get(1));               
        }                                  
    }       
   
    /**
     * see PIG-1911 .
     * accumulator udf reading from a nested relational op. generate projects
     * only the accumulator udf.
     * @throws IOException
     * @throws ParseException
     */
    @Test
    public void testAccumAfterNestedOp() throws IOException, ParseException{
        // test group by
        pigServer.registerQuery("A = load '" + INPUT_FILE + "' as (id:int, fruit);");
        pigServer.registerQuery("B = group A by id;");
        pigServer.registerQuery("C = foreach B " +
                        "{ o = order A by id; " +
                        "  generate org.apache.pig.test.utils.AccumulatorBagCount(o);}; ");                    
                
        Iterator<Tuple> iter = pigServer.openIterator("C");
        List<Tuple> expectedRes =
            Util.getTuplesFromConstantTupleStrings(
                    new String[] {
                            "(2)",
                            "(1)",
                            "(3)",
                            "(1)"
                    });
        Util.checkQueryOutputsAfterSort(iter, expectedRes);
    }


}
TOP

Related Classes of org.apache.pig.test.TestAccumulator

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.