}
public static void run(PTypeFamily typeFamily, TemporaryPath tmpDir) throws Exception {
Pipeline pipeline = new MRPipeline(MapsIT.class, tmpDir.getDefaultConfiguration());
String shakesInputPath = tmpDir.copyResourceFileName("shakes.txt");
PCollection<String> shakespeare = pipeline.readTextFile(shakesInputPath);
Iterable<Pair<String, Map<String, Long>>> output = shakespeare
.parallelDo(new DoFn<String, Pair<String, Map<String, Long>>>() {
@Override
public void process(String input, Emitter<Pair<String, Map<String, Long>>> emitter) {
String last = null;