Package org.apache.flink.test.javaApiOperators.util.CollectionDataSets

Examples of org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType


                Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second,
                Collector<CustomType> out) throws Exception {
              for(POJO p : first) {
                for(Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) {
                  Assert.assertTrue(p.nestedPojo.longNumber == t.f6);
                  out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"));
                }
              }
            }
        });
        coGroupDs.writeAsText(resultPath);
        env.execute();
       
        // return expected result
        return   "-1,20000,Flink\n" +
            "-1,10000,Flink\n" +
            "-1,30000,Flink\n";
      }
      case 12: {
        /*
         * CoGroup field-selector (expression keys) + key selector function
         * The key selector is unnecessary complicated (Tuple1) ;)
         */
       
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
       
        DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env);
        DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
        DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
            .where(new KeySelector<POJO, Tuple1<Long>>() {
              private static final long serialVersionUID = 1L;

              @Override
              public Tuple1<Long> getKey(POJO value)
                  throws Exception {
                return new Tuple1<Long>(value.nestedPojo.longNumber);
              }
            }).equalTo(6).with(new CoGroupFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>, CustomType>() {
              private static final long serialVersionUID = 1L;

            @Override
            public void coGroup(
                Iterable<POJO> first,
                Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second,
                Collector<CustomType> out) throws Exception {
              for(POJO p : first) {
                for(Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) {
                  Assert.assertTrue(p.nestedPojo.longNumber == t.f6);
                  out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"));
                }
              }
            }
        });
        coGroupDs.writeAsText(resultPath);
        env.execute();
       
        // return expected result
        return   "-1,20000,Flink\n" +
            "-1,10000,Flink\n" +
            "-1,30000,Flink\n";
      }
      case 13: {
        /*
         * CoGroup field-selector (expression keys) + key selector function
         * The key selector is simple here
         */
       
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
       
        DataSet<POJO> ds = CollectionDataSets.getSmallPojoDataSet(env);
        DataSet<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> ds2 = CollectionDataSets.getSmallTuplebasedDataSet(env);
        DataSet<CustomType> coGroupDs = ds.coGroup(ds2)
            .where(new KeySelector<POJO, Long>() {
              private static final long serialVersionUID = 1L;

              @Override
              public Long getKey(POJO value)
                  throws Exception {
                return value.nestedPojo.longNumber;
              }
            }).equalTo(6).with(new CoGroupFunction<POJO, Tuple7<Integer, String, Integer, Integer, Long, String, Long>, CustomType>() {
              private static final long serialVersionUID = 1L;

            @Override
            public void coGroup(
                Iterable<POJO> first,
                Iterable<Tuple7<Integer, String, Integer, Integer, Long, String, Long>> second,
                Collector<CustomType> out) throws Exception {
              for(POJO p : first) {
                for(Tuple7<Integer, String, Integer, Integer, Long, String, Long> t: second) {
                  Assert.assertTrue(p.nestedPojo.longNumber == t.f6);
                  out.collect(new CustomType(-1, p.nestedPojo.longNumber, "Flink"));
                }
              }
            }
        });
        coGroupDs.writeAsText(resultPath);
View Full Code Here


    private static final long serialVersionUID = 1L;

    @Override
    public void coGroup(Iterable<CustomType> first, Iterable<CustomType> second, Collector<CustomType> out) {
     
      CustomType o = new CustomType(0,0,"test");
     
      for ( CustomType element : first ) {
        o.myInt = element.myInt;
        o.myLong += element.myLong;
      }
View Full Code Here

    @Override
    public void coGroup(Iterable<CustomType> first,
        Iterable<Tuple5<Integer, Long, Integer, String, Long>> second,
        Collector<CustomType> out)
    {
      CustomType o = new CustomType(0,0,"test");
     
      for (CustomType element : first) {
        o.myInt = element.myInt;
        o.myLong += element.myLong;
      }
View Full Code Here

    @Override
    public void reduce(Iterable<CustomType> values, Collector<CustomType> out) {
      final Iterator<CustomType> iter = values.iterator();
     
      CustomType o = new CustomType();
      CustomType c = iter.next();
     
      o.myString = "Hello!";
      o.myInt = c.myInt;
      o.myLong = c.myLong;
     
      while (iter.hasNext()) {
        CustomType next = iter.next();
        o.myLong += next.myLong;
      }
     
      out.collect(o);
     
View Full Code Here

    private static final long serialVersionUID = 1L;
   
    @Override
    public void reduce(Iterable<CustomType> values, Collector<CustomType> out) {

      CustomType o = new CustomType(0, 0, "Hello!");
     
      for (CustomType next : values) {
        o.myInt += next.myInt;
        o.myLong += next.myLong;
      }
View Full Code Here

    private static final long serialVersionUID = 1L;
   
    @Override
    public void combine(Iterable<CustomType> values, Collector<CustomType> out) throws Exception {
     
      CustomType o = new CustomType();
     
      for ( CustomType c : values ) {
        o.myInt = c.myInt;
        o.myLong += c.myLong;
        o.myString = "test"+c.myInt;
View Full Code Here

    }

    @Override
    public void reduce(Iterable<CustomType> values, Collector<CustomType> out)  {
     
      CustomType o = new CustomType(0, 0, "");
     
      for ( CustomType c : values) {
        o.myInt = c.myInt;
        o.myLong += c.myLong;
        o.myString = c.myString;
View Full Code Here

       
        DataSet<CustomType> ds = CollectionDataSets.getCustomTypeDataSet(env);
        DataSet<CustomType> customMapDs = ds.
            map(new MapFunction<CustomType, CustomType>() {
              private static final long serialVersionUID = 1L;
              private final CustomType out = new CustomType();
             
              @Override
              public CustomType map(CustomType value) throws Exception {
                out.myInt = value.myInt;
                out.myLong = value.myLong;
                out.myString = value.myString.toLowerCase();
                return out;
              }
            });
       
        customMapDs.writeAsText(resultPath);
        env.execute();
       
        // return expected result
        return   "1,0,hi\n" +
            "2,1,hello\n" +
            "2,2,hello world\n" +
            "3,3,hello world, how are you?\n" +
            "3,4,i am fine.\n" +
            "3,5,luke skywalker\n" +
            "4,6,comment#1\n" +
            "4,7,comment#2\n" +
            "4,8,comment#3\n" +
            "4,9,comment#4\n" +
            "5,10,comment#5\n" +
            "5,11,comment#6\n" +
            "5,12,comment#7\n" +
            "5,13,comment#8\n" +
            "5,14,comment#9\n" +
            "6,15,comment#10\n" +
            "6,16,comment#11\n" +
            "6,17,comment#12\n" +
            "6,18,comment#13\n" +
            "6,19,comment#14\n" +
            "6,20,comment#15\n";
      }
      case 7: {
        /*
         * Test mapper if UDF returns input object - increment first field of a tuple
         */
   
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
       
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        DataSet<Tuple3<Integer, Long, String>> inputObjMapDs = ds.
            map(new MapFunction<Tuple3<Integer, Long, String>, Tuple3<Integer, Long, String>>() {
              private static final long serialVersionUID = 1L;
             
              @Override
              public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, String> value)
                  throws Exception {
                Integer incr = new Integer(value.f0.intValue() + 1);
                value.setField(incr, 0);
                return value;
              }
            });
       
        inputObjMapDs.writeAsCsv(resultPath);
        env.execute();
       
        // return expected result
        return   "2,1,Hi\n" +
            "3,2,Hello\n" +
            "4,2,Hello world\n" +
            "5,3,Hello world, how are you?\n" +
            "6,3,I am fine.\n" +
            "7,3,Luke Skywalker\n" +
            "8,4,Comment#1\n" +
            "9,4,Comment#2\n" +
            "10,4,Comment#3\n" +
            "11,4,Comment#4\n" +
            "12,5,Comment#5\n" +
            "13,5,Comment#6\n" +
            "14,5,Comment#7\n" +
            "15,5,Comment#8\n" +
            "16,5,Comment#9\n" +
            "17,6,Comment#10\n" +
            "18,6,Comment#11\n" +
            "19,6,Comment#12\n" +
            "20,6,Comment#13\n" +
            "21,6,Comment#14\n" +
            "22,6,Comment#15\n";
      }
      case 8: {
        /*
         * Test map with broadcast set
         */
         
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
       
        DataSet<Integer> ints = CollectionDataSets.getIntegerDataSet(env);
       
        DataSet<Tuple3<Integer, Long, String>> ds = CollectionDataSets.get3TupleDataSet(env);
        DataSet<Tuple3<Integer, Long, String>> bcMapDs = ds.
            map(new RichMapFunction<Tuple3<Integer,Long,String>, Tuple3<Integer,Long,String>>() {
              private static final long serialVersionUID = 1L;
              private final Tuple3<Integer, Long, String> out = new Tuple3<Integer, Long, String>();
              private Integer f2Replace = 0;
             
              @Override
              public void open(Configuration config) {
                Collection<Integer> ints = this.getRuntimeContext().getBroadcastVariable("ints");
                int sum = 0;
                for(Integer i : ints) {
                  sum += i;
                }
                f2Replace = sum;
              }
             
              @Override
              public Tuple3<Integer, Long, String> map(Tuple3<Integer, Long, String> value)
                  throws Exception {
                out.setFields(f2Replace, value.f1, value.f2);
                return out;
              }
            }).withBroadcastSet(ints, "ints");
        bcMapDs.writeAsCsv(resultPath);
        env.execute();
View Full Code Here

    @Override
    public CustomType cross(CustomType first, CustomType second)
        throws Exception {
     
      return new CustomType(first.myInt * second.myInt, first.myLong + second.myLong, first.myString + second.myString);
    }
View Full Code Here

TOP

Related Classes of org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType

Copyright © 2018 www.massapicom. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.