public void testMultiWrite() throws Exception {
String inputPath = tempDir.copyResourceFileName("set1.txt");
String inputPath2 = tempDir.copyResourceFileName("set2.txt");
String output = tempDir.getFileName("output");
Pipeline pipeline = new SparkPipeline("local", "multiwrite");
PCollection<String> set1Lines = pipeline.read(At.textFile(inputPath, Writables.strings()));
PTable<String, Long> set1Lengths = set1Lines.parallelDo(new StringLengthMapFn(),
Writables.tableOf(Writables.strings(), Writables.longs()));
PTable<String, Long> set2Counts = pipeline.read(At.textFile(inputPath2, Writables.strings())).count();
TableSourceTarget<String, Long> inter = At.sequenceFile(output, Writables.strings(), Writables.longs());
set1Lengths.write(inter);
set2Counts.write(inter, Target.WriteMode.APPEND);
pipeline.run();
PTable<String, Long> in = pipeline.read(inter);
Set<Pair<String, Long>> values = Sets.newHashSet(in.materialize());
assertEquals(7, values.size());
Set<Pair<String, Long>> expectedPairs = Sets.newHashSet();
expectedPairs.add(Pair.of("b", 10L));
expectedPairs.add(Pair.of("c", 10L));
expectedPairs.add(Pair.of("a", 10L));
expectedPairs.add(Pair.of("e", 10L));
expectedPairs.add(Pair.of("a", 1L));
expectedPairs.add(Pair.of("c", 1L));
expectedPairs.add(Pair.of("d", 1L));
assertEquals(expectedPairs, values);
pipeline.done();
}