Package org.apache.spark

Source Code of org.apache.spark.Java8APISuite

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark;

import java.io.File;
import java.io.Serializable;
import java.util.*;

import scala.Tuple2;

import com.google.common.collections.Iterables;
import com.google.common.base.Optional;
import com.google.common.io.Files;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;

/**
* Most of these tests replicate org.apache.spark.JavaAPISuite using java 8
* lambda syntax.
*/
public class Java8APISuite implements Serializable {
  static int foreachCalls = 0;
  private transient JavaSparkContext sc;

  @Before
  public void setUp() {
    sc = new JavaSparkContext("local", "JavaAPISuite");
  }

  @After
  public void tearDown() {
    sc.stop();
    sc = null;
    // To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
    System.clearProperty("spark.driver.port");
  }

  @Test
  public void foreachWithAnonymousClass() {
    foreachCalls = 0;
    JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
    rdd.foreach(new VoidFunction<String>() {
      @Override
      public void call(String s) {
        foreachCalls++;
      }
    });
    Assert.assertEquals(2, foreachCalls);
  }

  @Test
  public void foreach() {
    foreachCalls = 0;
    JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
    rdd.foreach((x) -> foreachCalls++);
    Assert.assertEquals(2, foreachCalls);
  }

  @Test
  public void groupBy() {
    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
    Function<Integer, Boolean> isOdd = x -> x % 2 == 0;
    JavaPairRDD<Boolean, Iterable<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
    Assert.assertEquals(2, oddsAndEvens.count());
    Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0)))// Evens
    Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds

    oddsAndEvens = rdd.groupBy(isOdd, 1);
    Assert.assertEquals(2, oddsAndEvens.count());
    Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0)))// Evens
    Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
  }

  @Test
  public void leftOuterJoin() {
    JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
      new Tuple2<Integer, Integer>(1, 1),
      new Tuple2<Integer, Integer>(1, 2),
      new Tuple2<Integer, Integer>(2, 1),
      new Tuple2<Integer, Integer>(3, 1)
    ));
    JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
      new Tuple2<Integer, Character>(1, 'x'),
      new Tuple2<Integer, Character>(2, 'y'),
      new Tuple2<Integer, Character>(2, 'z'),
      new Tuple2<Integer, Character>(4, 'w')
    ));
    List<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>> joined =
      rdd1.leftOuterJoin(rdd2).collect();
    Assert.assertEquals(5, joined.size());
    Tuple2<Integer, Tuple2<Integer, Optional<Character>>> firstUnmatched =
      rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first();
    Assert.assertEquals(3, firstUnmatched._1().intValue());
  }

  @Test
  public void foldReduce() {
    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
    Function2<Integer, Integer, Integer> add = (a, b) -> a + b;

    int sum = rdd.fold(0, add);
    Assert.assertEquals(33, sum);

    sum = rdd.reduce(add);
    Assert.assertEquals(33, sum);
  }

  @Test
  public void foldByKey() {
    List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
      new Tuple2<Integer, Integer>(2, 1),
      new Tuple2<Integer, Integer>(2, 1),
      new Tuple2<Integer, Integer>(1, 1),
      new Tuple2<Integer, Integer>(3, 2),
      new Tuple2<Integer, Integer>(3, 1)
    );
    JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
    JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0, (a, b) -> a + b);
    Assert.assertEquals(1, sums.lookup(1).get(0).intValue());
    Assert.assertEquals(2, sums.lookup(2).get(0).intValue());
    Assert.assertEquals(3, sums.lookup(3).get(0).intValue());
  }

  @Test
  public void reduceByKey() {
    List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
      new Tuple2<Integer, Integer>(2, 1),
      new Tuple2<Integer, Integer>(2, 1),
      new Tuple2<Integer, Integer>(1, 1),
      new Tuple2<Integer, Integer>(3, 2),
      new Tuple2<Integer, Integer>(3, 1)
    );
    JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
    JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
    Assert.assertEquals(1, counts.lookup(1).get(0).intValue());
    Assert.assertEquals(2, counts.lookup(2).get(0).intValue());
    Assert.assertEquals(3, counts.lookup(3).get(0).intValue());

    Map<Integer, Integer> localCounts = counts.collectAsMap();
    Assert.assertEquals(1, localCounts.get(1).intValue());
    Assert.assertEquals(2, localCounts.get(2).intValue());
    Assert.assertEquals(3, localCounts.get(3).intValue());

    localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
    Assert.assertEquals(1, localCounts.get(1).intValue());
    Assert.assertEquals(2, localCounts.get(2).intValue());
    Assert.assertEquals(3, localCounts.get(3).intValue());
  }

  @Test
  public void map() {
    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
    JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x).cache();
    doubles.collect();
    JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<Integer, Integer>(x, x))
      .cache();
    pairs.collect();
    JavaRDD<String> strings = rdd.map(x -> x.toString()).cache();
    strings.collect();
  }

  @Test
  public void flatMap() {
    JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello World!",
      "The quick brown fox jumps over the lazy dog."));
    JavaRDD<String> words = rdd.flatMap(x -> Arrays.asList(x.split(" ")));

    Assert.assertEquals("Hello", words.first());
    Assert.assertEquals(11, words.count());

    JavaPairRDD<String, String> pairs = rdd.flatMapToPair(s -> {
      List<Tuple2<String, String>> pairs2 = new LinkedList<Tuple2<String, String>>();
      for (String word : s.split(" ")) pairs2.add(new Tuple2<String, String>(word, word));
      return pairs2;
    });

    Assert.assertEquals(new Tuple2<String, String>("Hello", "Hello"), pairs.first());
    Assert.assertEquals(11, pairs.count());

    JavaDoubleRDD doubles = rdd.flatMapToDouble(s -> {
      List<Double> lengths = new LinkedList<Double>();
      for (String word : s.split(" ")) lengths.add(word.length() * 1.0);
      return lengths;
    });

    Double x = doubles.first();
    Assert.assertEquals(5.0, doubles.first().doubleValue(), 0.01);
    Assert.assertEquals(11, pairs.count());
  }

  @Test
  public void mapsFromPairsToPairs() {
    List<Tuple2<Integer, String>> pairs = Arrays.asList(
      new Tuple2<Integer, String>(1, "a"),
      new Tuple2<Integer, String>(2, "aa"),
      new Tuple2<Integer, String>(3, "aaa")
    );
    JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);

    // Regression test for SPARK-668:
    JavaPairRDD<String, Integer> swapped =
      pairRDD.flatMapToPair(x -> Collections.singletonList(x.swap()));
    swapped.collect();

    // There was never a bug here, but it's worth testing:
    pairRDD.map(item -> item.swap()).collect();
  }

  @Test
  public void mapPartitions() {
    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
    JavaRDD<Integer> partitionSums = rdd.mapPartitions(iter -> {
      int sum = 0;
      while (iter.hasNext()) {
        sum += iter.next();
      }
      return Collections.singletonList(sum);
    });

    Assert.assertEquals("[3, 7]", partitionSums.collect().toString());
  }

  @Test
  public void sequenceFile() {
    File tempDir = Files.createTempDir();
    String outputDir = new File(tempDir, "output").getAbsolutePath();
    List<Tuple2<Integer, String>> pairs = Arrays.asList(
      new Tuple2<Integer, String>(1, "a"),
      new Tuple2<Integer, String>(2, "aa"),
      new Tuple2<Integer, String>(3, "aaa")
    );
    JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);

    rdd.mapToPair(pair ->
      new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2())))
      .saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);

    // Try reading the output back as an object file
    JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class, Text.class)
      .mapToPair(pair -> new Tuple2<Integer, String>(pair._1().get(), pair._2().toString()));
    Assert.assertEquals(pairs, readRDD.collect());
  }

  @Test
  public void zip() {
    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
    JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x);
    JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles);
    zipped.count();
  }

  @Test
  public void zipPartitions() {
    JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);
    JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("1", "2", "3", "4"), 2);
    FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn =
      (Iterator<Integer> i, Iterator<String> s) -> {
        int sizeI = 0;
        int sizeS = 0;
        while (i.hasNext()) {
          sizeI += 1;
          i.next();
        }
        while (s.hasNext()) {
          sizeS += 1;
          s.next();
        }
        return Arrays.asList(sizeI, sizeS);
      };
    JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn);
    Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString());
  }

  @Test
  public void accumulators() {
    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));

    final Accumulator<Integer> intAccum = sc.intAccumulator(10);
    rdd.foreach(x -> intAccum.add(x));
    Assert.assertEquals((Integer) 25, intAccum.value());

    final Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0);
    rdd.foreach(x -> doubleAccum.add((double) x));
    Assert.assertEquals((Double) 25.0, doubleAccum.value());

    // Try a custom accumulator type
    AccumulatorParam<Float> floatAccumulatorParam = new AccumulatorParam<Float>() {
      public Float addInPlace(Float r, Float t) {
        return r + t;
      }

      public Float addAccumulator(Float r, Float t) {
        return r + t;
      }

      public Float zero(Float initialValue) {
        return 0.0f;
      }
    };

    final Accumulator<Float> floatAccum = sc.accumulator((Float) 10.0f, floatAccumulatorParam);
    rdd.foreach(x -> floatAccum.add((float) x));
    Assert.assertEquals((Float) 25.0f, floatAccum.value());

    // Test the setValue method
    floatAccum.setValue(5.0f);
    Assert.assertEquals((Float) 5.0f, floatAccum.value());
  }

  @Test
  public void keyBy() {
    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2));
    List<Tuple2<String, Integer>> s = rdd.keyBy(x -> x.toString()).collect();
    Assert.assertEquals(new Tuple2<String, Integer>("1", 1), s.get(0));
    Assert.assertEquals(new Tuple2<String, Integer>("2", 2), s.get(1));
  }

  @Test
  public void mapOnPairRDD() {
    JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4));
    JavaPairRDD<Integer, Integer> rdd2 =
      rdd1.mapToPair(i -> new Tuple2<Integer, Integer>(i, i % 2));
    JavaPairRDD<Integer, Integer> rdd3 =
      rdd2.mapToPair(in -> new Tuple2<Integer, Integer>(in._2(), in._1()));
    Assert.assertEquals(Arrays.asList(
      new Tuple2<Integer, Integer>(1, 1),
      new Tuple2<Integer, Integer>(0, 2),
      new Tuple2<Integer, Integer>(1, 3),
      new Tuple2<Integer, Integer>(0, 4)), rdd3.collect());
  }

  @Test
  public void collectPartitions() {
    JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);

    JavaPairRDD<Integer, Integer> rdd2 =
      rdd1.mapToPair(i -> new Tuple2<Integer, Integer>(i, i % 2));
    List[] parts = rdd1.collectPartitions(new int[]{0});
    Assert.assertEquals(Arrays.asList(1, 2), parts[0]);

    parts = rdd1.collectPartitions(new int[]{1, 2});
    Assert.assertEquals(Arrays.asList(3, 4), parts[0]);
    Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]);

    Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(1, 1),
      new Tuple2<Integer, Integer>(2, 0)),
      rdd2.collectPartitions(new int[]{0})[0]);

    parts = rdd2.collectPartitions(new int[]{1, 2});
    Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(3, 1),
      new Tuple2<Integer, Integer>(4, 0)), parts[0]);
    Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(5, 1),
      new Tuple2<Integer, Integer>(6, 0),
      new Tuple2<Integer, Integer>(7, 1)), parts[1]);
  }

  @Test
  public void collectAsMapWithIntArrayValues() {
    // Regression test for SPARK-1040
    JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(new Integer[]{1}));
    JavaPairRDD<Integer, int[]> pairRDD =
      rdd.mapToPair(x -> new Tuple2<Integer, int[]>(x, new int[]{x}));
    pairRDD.collect()// Works fine
    Map<Integer, int[]> map = pairRDD.collectAsMap()// Used to crash with ClassCastException
  }
}
TOP

Related Classes of org.apache.spark.Java8APISuite

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.