Package org.apache.crunch.impl.spark

Examples of org.apache.crunch.impl.spark.SparkPipeline


  @Test
  public void testUnionOfGroupedOutputAndNonGroupedOutput() throws IOException {
    String inputPath = tempDir.copyResourceFileName("set1.txt");
    String inputPath2 = tempDir.copyResourceFileName("set2.txt");

    Pipeline pipeline = new SparkPipeline("local", "unionresults");

    PCollection<String> set1Lines = pipeline.read(At.textFile(inputPath, Writables.strings()));
    PCollection<Pair<String, Long>> set1Lengths = set1Lines.parallelDo(new StringLengthMapFn(),
        Writables.pairs(Writables.strings(), Writables.longs()));
    PCollection<Pair<String, Long>> set2Counts = pipeline.read(At.textFile(inputPath2, Writables.strings())).count();

    PCollection<Pair<String, Long>> union = set1Lengths.union(set2Counts);

    Set<Pair<String, Long>> unionValues = Sets.newHashSet(union.materialize());
    assertEquals(7, unionValues.size());

    Set<Pair<String, Long>> expectedPairs = Sets.newHashSet();
    expectedPairs.add(Pair.of("b", 10L));
    expectedPairs.add(Pair.of("c", 10L));
    expectedPairs.add(Pair.of("a", 10L));
    expectedPairs.add(Pair.of("e", 10L));
    expectedPairs.add(Pair.of("a", 1L));
    expectedPairs.add(Pair.of("c", 1L));
    expectedPairs.add(Pair.of("d", 1L));

    assertEquals(expectedPairs, unionValues);

    pipeline.done();
  }
View Full Code Here


  public void testMultiWrite() throws Exception {
    String inputPath = tempDir.copyResourceFileName("set1.txt");
    String inputPath2 = tempDir.copyResourceFileName("set2.txt");
    String output = tempDir.getFileName("output");

    Pipeline pipeline = new SparkPipeline("local", "multiwrite");

    PCollection<String> set1Lines = pipeline.read(At.textFile(inputPath, Writables.strings()));
    PTable<String, Long> set1Lengths = set1Lines.parallelDo(new StringLengthMapFn(),
        Writables.tableOf(Writables.strings(), Writables.longs()));
    PTable<String, Long> set2Counts = pipeline.read(At.textFile(inputPath2, Writables.strings())).count();

    TableSourceTarget<String, Long> inter = At.sequenceFile(output, Writables.strings(), Writables.longs());
    set1Lengths.write(inter);
    set2Counts.write(inter, Target.WriteMode.APPEND);

    pipeline.run();

    PTable<String, Long> in = pipeline.read(inter);
    Set<Pair<String, Long>> values = Sets.newHashSet(in.materialize());
    assertEquals(7, values.size());

    Set<Pair<String, Long>> expectedPairs = Sets.newHashSet();
    expectedPairs.add(Pair.of("b", 10L));
    expectedPairs.add(Pair.of("c", 10L));
    expectedPairs.add(Pair.of("a", 10L));
    expectedPairs.add(Pair.of("e", 10L));
    expectedPairs.add(Pair.of("a", 1L));
    expectedPairs.add(Pair.of("c", 1L));
    expectedPairs.add(Pair.of("d", 1L));

    assertEquals(expectedPairs, values);

    pipeline.done();
  }
View Full Code Here

  private PCollection<String> lines3;
  private PCollection<String> lines4;

  @Before
  public void setUp() throws IOException {
    pipeline = new SparkPipeline("local", "wordcount");
    lines1 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src1.txt")));
    lines2 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src2.txt")));
    lines3 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src1.txt")));
    lines4 = pipeline.readTextFile(tmpDir.copyResourceFileName(Tests.resource(this, "src2.txt")));
  }
View Full Code Here

      Writables.writables(LongWritable.class));

  @Test
  public void testSkipPTypes() throws Exception {
    String out = tempDir.getFileName("out");
    SparkPipeline pipeline = new SparkPipeline("local", "skipptypes");
    PCollection<String> shakes = pipeline.read(From.textFile(tempDir.copyResourceFileName("shakes.txt")));
    PTable<String, Long> wcnt = shakes.count();
    wcnt.write(new MySeqFileTableSourceTarget(out, ptt));
    pipeline.run();

    PTable<Text, LongWritable> wcntIn = pipeline.read(new MySeqFileTableSourceTarget(out, ptt));
    assertEquals(new LongWritable(1L), wcntIn.materialize().iterator().next().second());
    pipeline.done();
  }
View Full Code Here

  private SparkPipeline pipeline;

  @Before
  public void setUp() throws IOException {
    pipeline = new SparkPipeline("local", "taskattempt");
  }
View Full Code Here

  public TemporaryPath tmpDir = new TemporaryPath();
  private Pipeline pipeline;

  @Before
  public void setUp() throws Exception {
    pipeline = new SparkPipeline("local", "pagerank");
  }
View Full Code Here

  @Rule
  public TemporaryPath tmpDir = new TemporaryPath();

  @Test
  public void testMapsideJoin_RightSideIsEmpty() throws IOException {
    Pipeline pipeline = new SparkPipeline("local", "mapside");
    PTable<Integer, String> customerTable = readTable(pipeline, "customers.txt");
    PTable<Integer, String> orderTable = readTable(pipeline, "orders.txt");

    PTable<Integer, String> filteredOrderTable = orderTable
        .parallelDo(FilterFns.<Pair<Integer, String>>REJECT_ALL(), orderTable.getPTableType());


    JoinStrategy<Integer, String, String> mapsideJoin = new MapsideJoinStrategy<Integer, String, String>();
    PTable<Integer, Pair<String, String>> joined = mapsideJoin.join(customerTable, filteredOrderTable, JoinType.INNER_JOIN);

    List<Pair<Integer, Pair<String, String>>> materializedJoin = Lists.newArrayList(joined.materialize());

    assertTrue(materializedJoin.isEmpty());
    pipeline.done();
  }
View Full Code Here

    pipeline.done();
  }

  @Test
  public void testMapsideJoin() throws IOException {
    runMapsideJoin(new SparkPipeline("local", "mapside"), false);
  }
View Full Code Here

    runMapsideJoin(new SparkPipeline("local", "mapside"), false);
  }

  @Test
  public void testMapsideJoin_Materialized() throws IOException {
    runMapsideJoin(new SparkPipeline("local", "mapside"), true);
  }
View Full Code Here

    runMapsideJoin(new SparkPipeline("local", "mapside"), true);
  }

  @Test
  public void testMapsideJoin_LeftOuterJoin() throws IOException {
    runMapsideLeftOuterJoin(new SparkPipeline("local", "mapside"), false);
  }
View Full Code Here

TOP

Related Classes of org.apache.crunch.impl.spark.SparkPipeline

Copyright © 2018 www.massapicom. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.