/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark;
import java.io.File;
import java.io.Serializable;
import java.util.*;
import scala.Tuple2;
import com.google.common.collections.Iterables;
import com.google.common.base.Optional;
import com.google.common.io.Files;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.apache.spark.api.java.JavaDoubleRDD;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
/**
* Most of these tests replicate org.apache.spark.JavaAPISuite using java 8
* lambda syntax.
*/
public class Java8APISuite implements Serializable {
static int foreachCalls = 0;
private transient JavaSparkContext sc;
@Before
public void setUp() {
sc = new JavaSparkContext("local", "JavaAPISuite");
}
@After
public void tearDown() {
sc.stop();
sc = null;
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port");
}
@Test
public void foreachWithAnonymousClass() {
foreachCalls = 0;
JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
rdd.foreach(new VoidFunction<String>() {
@Override
public void call(String s) {
foreachCalls++;
}
});
Assert.assertEquals(2, foreachCalls);
}
@Test
public void foreach() {
foreachCalls = 0;
JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
rdd.foreach((x) -> foreachCalls++);
Assert.assertEquals(2, foreachCalls);
}
@Test
public void groupBy() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
Function<Integer, Boolean> isOdd = x -> x % 2 == 0;
JavaPairRDD<Boolean, Iterable<Integer>> oddsAndEvens = rdd.groupBy(isOdd);
Assert.assertEquals(2, oddsAndEvens.count());
Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens
Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
oddsAndEvens = rdd.groupBy(isOdd, 1);
Assert.assertEquals(2, oddsAndEvens.count());
Assert.assertEquals(2, Iterables.size(oddsAndEvens.lookup(true).get(0))); // Evens
Assert.assertEquals(5, Iterables.size(oddsAndEvens.lookup(false).get(0))); // Odds
}
@Test
public void leftOuterJoin() {
JavaPairRDD<Integer, Integer> rdd1 = sc.parallelizePairs(Arrays.asList(
new Tuple2<Integer, Integer>(1, 1),
new Tuple2<Integer, Integer>(1, 2),
new Tuple2<Integer, Integer>(2, 1),
new Tuple2<Integer, Integer>(3, 1)
));
JavaPairRDD<Integer, Character> rdd2 = sc.parallelizePairs(Arrays.asList(
new Tuple2<Integer, Character>(1, 'x'),
new Tuple2<Integer, Character>(2, 'y'),
new Tuple2<Integer, Character>(2, 'z'),
new Tuple2<Integer, Character>(4, 'w')
));
List<Tuple2<Integer, Tuple2<Integer, Optional<Character>>>> joined =
rdd1.leftOuterJoin(rdd2).collect();
Assert.assertEquals(5, joined.size());
Tuple2<Integer, Tuple2<Integer, Optional<Character>>> firstUnmatched =
rdd1.leftOuterJoin(rdd2).filter(tup -> !tup._2()._2().isPresent()).first();
Assert.assertEquals(3, firstUnmatched._1().intValue());
}
@Test
public void foldReduce() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 1, 2, 3, 5, 8, 13));
Function2<Integer, Integer, Integer> add = (a, b) -> a + b;
int sum = rdd.fold(0, add);
Assert.assertEquals(33, sum);
sum = rdd.reduce(add);
Assert.assertEquals(33, sum);
}
@Test
public void foldByKey() {
List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
new Tuple2<Integer, Integer>(2, 1),
new Tuple2<Integer, Integer>(2, 1),
new Tuple2<Integer, Integer>(1, 1),
new Tuple2<Integer, Integer>(3, 2),
new Tuple2<Integer, Integer>(3, 1)
);
JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
JavaPairRDD<Integer, Integer> sums = rdd.foldByKey(0, (a, b) -> a + b);
Assert.assertEquals(1, sums.lookup(1).get(0).intValue());
Assert.assertEquals(2, sums.lookup(2).get(0).intValue());
Assert.assertEquals(3, sums.lookup(3).get(0).intValue());
}
@Test
public void reduceByKey() {
List<Tuple2<Integer, Integer>> pairs = Arrays.asList(
new Tuple2<Integer, Integer>(2, 1),
new Tuple2<Integer, Integer>(2, 1),
new Tuple2<Integer, Integer>(1, 1),
new Tuple2<Integer, Integer>(3, 2),
new Tuple2<Integer, Integer>(3, 1)
);
JavaPairRDD<Integer, Integer> rdd = sc.parallelizePairs(pairs);
JavaPairRDD<Integer, Integer> counts = rdd.reduceByKey((a, b) -> a + b);
Assert.assertEquals(1, counts.lookup(1).get(0).intValue());
Assert.assertEquals(2, counts.lookup(2).get(0).intValue());
Assert.assertEquals(3, counts.lookup(3).get(0).intValue());
Map<Integer, Integer> localCounts = counts.collectAsMap();
Assert.assertEquals(1, localCounts.get(1).intValue());
Assert.assertEquals(2, localCounts.get(2).intValue());
Assert.assertEquals(3, localCounts.get(3).intValue());
localCounts = rdd.reduceByKeyLocally((a, b) -> a + b);
Assert.assertEquals(1, localCounts.get(1).intValue());
Assert.assertEquals(2, localCounts.get(2).intValue());
Assert.assertEquals(3, localCounts.get(3).intValue());
}
@Test
public void map() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x).cache();
doubles.collect();
JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<Integer, Integer>(x, x))
.cache();
pairs.collect();
JavaRDD<String> strings = rdd.map(x -> x.toString()).cache();
strings.collect();
}
@Test
public void flatMap() {
JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello World!",
"The quick brown fox jumps over the lazy dog."));
JavaRDD<String> words = rdd.flatMap(x -> Arrays.asList(x.split(" ")));
Assert.assertEquals("Hello", words.first());
Assert.assertEquals(11, words.count());
JavaPairRDD<String, String> pairs = rdd.flatMapToPair(s -> {
List<Tuple2<String, String>> pairs2 = new LinkedList<Tuple2<String, String>>();
for (String word : s.split(" ")) pairs2.add(new Tuple2<String, String>(word, word));
return pairs2;
});
Assert.assertEquals(new Tuple2<String, String>("Hello", "Hello"), pairs.first());
Assert.assertEquals(11, pairs.count());
JavaDoubleRDD doubles = rdd.flatMapToDouble(s -> {
List<Double> lengths = new LinkedList<Double>();
for (String word : s.split(" ")) lengths.add(word.length() * 1.0);
return lengths;
});
Double x = doubles.first();
Assert.assertEquals(5.0, doubles.first().doubleValue(), 0.01);
Assert.assertEquals(11, pairs.count());
}
@Test
public void mapsFromPairsToPairs() {
List<Tuple2<Integer, String>> pairs = Arrays.asList(
new Tuple2<Integer, String>(1, "a"),
new Tuple2<Integer, String>(2, "aa"),
new Tuple2<Integer, String>(3, "aaa")
);
JavaPairRDD<Integer, String> pairRDD = sc.parallelizePairs(pairs);
// Regression test for SPARK-668:
JavaPairRDD<String, Integer> swapped =
pairRDD.flatMapToPair(x -> Collections.singletonList(x.swap()));
swapped.collect();
// There was never a bug here, but it's worth testing:
pairRDD.map(item -> item.swap()).collect();
}
@Test
public void mapPartitions() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4), 2);
JavaRDD<Integer> partitionSums = rdd.mapPartitions(iter -> {
int sum = 0;
while (iter.hasNext()) {
sum += iter.next();
}
return Collections.singletonList(sum);
});
Assert.assertEquals("[3, 7]", partitionSums.collect().toString());
}
@Test
public void sequenceFile() {
File tempDir = Files.createTempDir();
String outputDir = new File(tempDir, "output").getAbsolutePath();
List<Tuple2<Integer, String>> pairs = Arrays.asList(
new Tuple2<Integer, String>(1, "a"),
new Tuple2<Integer, String>(2, "aa"),
new Tuple2<Integer, String>(3, "aaa")
);
JavaPairRDD<Integer, String> rdd = sc.parallelizePairs(pairs);
rdd.mapToPair(pair ->
new Tuple2<IntWritable, Text>(new IntWritable(pair._1()), new Text(pair._2())))
.saveAsHadoopFile(outputDir, IntWritable.class, Text.class, SequenceFileOutputFormat.class);
// Try reading the output back as an object file
JavaPairRDD<Integer, String> readRDD = sc.sequenceFile(outputDir, IntWritable.class, Text.class)
.mapToPair(pair -> new Tuple2<Integer, String>(pair._1().get(), pair._2().toString()));
Assert.assertEquals(pairs, readRDD.collect());
}
@Test
public void zip() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
JavaDoubleRDD doubles = rdd.mapToDouble(x -> 1.0 * x);
JavaPairRDD<Integer, Double> zipped = rdd.zip(doubles);
zipped.count();
}
@Test
public void zipPartitions() {
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 2);
JavaRDD<String> rdd2 = sc.parallelize(Arrays.asList("1", "2", "3", "4"), 2);
FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn =
(Iterator<Integer> i, Iterator<String> s) -> {
int sizeI = 0;
int sizeS = 0;
while (i.hasNext()) {
sizeI += 1;
i.next();
}
while (s.hasNext()) {
sizeS += 1;
s.next();
}
return Arrays.asList(sizeI, sizeS);
};
JavaRDD<Integer> sizes = rdd1.zipPartitions(rdd2, sizesFn);
Assert.assertEquals("[3, 2, 3, 2]", sizes.collect().toString());
}
@Test
public void accumulators() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
final Accumulator<Integer> intAccum = sc.intAccumulator(10);
rdd.foreach(x -> intAccum.add(x));
Assert.assertEquals((Integer) 25, intAccum.value());
final Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0);
rdd.foreach(x -> doubleAccum.add((double) x));
Assert.assertEquals((Double) 25.0, doubleAccum.value());
// Try a custom accumulator type
AccumulatorParam<Float> floatAccumulatorParam = new AccumulatorParam<Float>() {
public Float addInPlace(Float r, Float t) {
return r + t;
}
public Float addAccumulator(Float r, Float t) {
return r + t;
}
public Float zero(Float initialValue) {
return 0.0f;
}
};
final Accumulator<Float> floatAccum = sc.accumulator((Float) 10.0f, floatAccumulatorParam);
rdd.foreach(x -> floatAccum.add((float) x));
Assert.assertEquals((Float) 25.0f, floatAccum.value());
// Test the setValue method
floatAccum.setValue(5.0f);
Assert.assertEquals((Float) 5.0f, floatAccum.value());
}
@Test
public void keyBy() {
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2));
List<Tuple2<String, Integer>> s = rdd.keyBy(x -> x.toString()).collect();
Assert.assertEquals(new Tuple2<String, Integer>("1", 1), s.get(0));
Assert.assertEquals(new Tuple2<String, Integer>("2", 2), s.get(1));
}
@Test
public void mapOnPairRDD() {
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4));
JavaPairRDD<Integer, Integer> rdd2 =
rdd1.mapToPair(i -> new Tuple2<Integer, Integer>(i, i % 2));
JavaPairRDD<Integer, Integer> rdd3 =
rdd2.mapToPair(in -> new Tuple2<Integer, Integer>(in._2(), in._1()));
Assert.assertEquals(Arrays.asList(
new Tuple2<Integer, Integer>(1, 1),
new Tuple2<Integer, Integer>(0, 2),
new Tuple2<Integer, Integer>(1, 3),
new Tuple2<Integer, Integer>(0, 4)), rdd3.collect());
}
@Test
public void collectPartitions() {
JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7), 3);
JavaPairRDD<Integer, Integer> rdd2 =
rdd1.mapToPair(i -> new Tuple2<Integer, Integer>(i, i % 2));
List[] parts = rdd1.collectPartitions(new int[]{0});
Assert.assertEquals(Arrays.asList(1, 2), parts[0]);
parts = rdd1.collectPartitions(new int[]{1, 2});
Assert.assertEquals(Arrays.asList(3, 4), parts[0]);
Assert.assertEquals(Arrays.asList(5, 6, 7), parts[1]);
Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(1, 1),
new Tuple2<Integer, Integer>(2, 0)),
rdd2.collectPartitions(new int[]{0})[0]);
parts = rdd2.collectPartitions(new int[]{1, 2});
Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(3, 1),
new Tuple2<Integer, Integer>(4, 0)), parts[0]);
Assert.assertEquals(Arrays.asList(new Tuple2<Integer, Integer>(5, 1),
new Tuple2<Integer, Integer>(6, 0),
new Tuple2<Integer, Integer>(7, 1)), parts[1]);
}
@Test
public void collectAsMapWithIntArrayValues() {
// Regression test for SPARK-1040
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(new Integer[]{1}));
JavaPairRDD<Integer, int[]> pairRDD =
rdd.mapToPair(x -> new Tuple2<Integer, int[]>(x, new int[]{x}));
pairRDD.collect(); // Works fine
Map<Integer, int[]> map = pairRDD.collectAsMap(); // Used to crash with ClassCastException
}
}