    protected FlowConnector makeFlowConnector() {
        Map<Object, Object> props = new HashMap<Object, Object>();
        TupleSerializationProps.addSerialization(props, BytesSerialization.class.getName());
        return new HadoopFlowConnector(props);
    String tokenPath = args[ 2 ];
    String similarityPath = args[ 3 ];

    Properties properties = new Properties();
    AppProps.setApplicationJarClass( properties, Main.class );
    FlowConnector flowConnector = new HadoopFlowConnector( properties );

    // create SOURCE taps, and read from local file system if inputs are not URLs
    Tap tweetTap = makeTap( tweetPath, new TextDelimited( true, "\t" ) );

    Tap stopTap = makeTap( stopWords, new TextDelimited( new Fields( "stop" ), true, "\t" ) );

    // create SINK taps, replacing previous output if needed
    Tap tokenTap = new Hfs( new TextDelimited( true, "\t" ), tokenPath, SinkMode.REPLACE );
    Tap similarityTap = new Hfs( new TextDelimited( true, "\t" ), similarityPath, SinkMode.REPLACE );

    flow part #1
    generate a bipartite map of (uid, token), while filtering out stop-words

    // create a STREAM ASSERTION to validate the input data
    Pipe tweetPipe = new Pipe( "tweet" ); // name branch
    AssertMatches assertMatches = new AssertMatches( ".{6,150}" );
    tweetPipe = new Each( tweetPipe, AssertionLevel.STRICT, assertMatches );

    // create an OPERATION split the text into a token stream
    RegexSplitGenerator splitter = new RegexSplitGenerator( new Fields( "token" ), " " );
    Fields outputSelector = new Fields( "uid", "token" );
    tweetPipe = new Each( tweetPipe, new Fields( "text" ), splitter, outputSelector );

    tweetPipe = new Unique( tweetPipe, Fields.ALL );

    RegexFilter filter = new RegexFilter( "^\\S\\S+$" );
    tweetPipe = new Each( tweetPipe, new Fields( "token" ), filter );

    // create PIPEs for left join on the stop words
    Pipe stopPipe = new Pipe( "stop" ); // name branch
    Pipe joinPipe = new HashJoin( tweetPipe, new Fields( "token" ), stopPipe, new Fields( "stop" ), new LeftJoin() );
    joinPipe = new Each( joinPipe, new Fields( "stop" ), new RegexFilter( "^$" ) );

    joinPipe = new Retain( joinPipe, new Fields( "uid", "token" ) );

    flow part #2
    create SINK tap to measure token frequency, which will need to be used to adjust
    stop words -- based on an R script

    Pipe tokenPipe = new Pipe( "token", joinPipe ); // name branch
    tokenPipe = new GroupBy( tokenPipe, new Fields( "token" ) );
    tokenPipe = new Every( tokenPipe, Fields.ALL, new Count(), Fields.ALL );

    flow part #3
    generate an inverted index for ((uid1,uid2), token) to avoid having to perform
    a cross-product, which would impose a bottleneck in the parallelism

    Pipe invertPipe = new Pipe( "inverted index", joinPipe );
    invertPipe = new CoGroup( invertPipe, new Fields( "token" ), 1, new Fields( "uid1", "ignore", "uid2", "token" ) );

    Fields filterArguments = new Fields( "uid1", "uid2" );
    String uidFilter = "uid1.compareToIgnoreCase( uid2 ) >= 0";
    invertPipe = new Each( invertPipe, filterArguments, new ExpressionFilter( uidFilter, String.class ) );
    Fields ignore = new Fields( "ignore" );
    invertPipe = new Discard( invertPipe, ignore );

    flow part #4
    count the number of tokens in common for each uid pair and apply a threshold

    Pipe commonPipe = new GroupBy( new Pipe( "uid common", invertPipe ), new Fields( "uid1", "uid2" ) );
    commonPipe = new Every( commonPipe, Fields.ALL, new Count( new Fields( "common" ) ), Fields.ALL );

    String commonFilter = String.format( "common < %d", MIN_COMMON_TOKENS );
    commonPipe = new Each( commonPipe, new Fields( "common" ), new ExpressionFilter( commonFilter, Integer.TYPE ) );

    flow part #5
    count the number of tokens overall for each uid, then join to calculate
    the vector length for uid1

    Fields tokenCount = new Fields( "token_count" );
    Pipe countPipe = new GroupBy( "count", joinPipe, new Fields( "uid" ) );
    countPipe = new Every( countPipe, Fields.ALL, new Count( tokenCount ), Fields.ALL );

    joinPipe = new CoGroup( countPipe, new Fields( "uid" ), commonPipe, new Fields( "uid1" ) );
    joinPipe = new Pipe( "common", joinPipe );
    joinPipe = new Discard( joinPipe, new Fields( "uid" ) );

    joinPipe = new Rename( joinPipe, tokenCount, new Fields( "token_count1" ) );

    flow part #6 join to be able to calculate the vector length for
    uid2, remove instances where one uid merely retweets another,
    then calculate an Ochiai similarity metric to find the nearest
    "neighbors" for each uid -- as recommended users to "follow"

    joinPipe = new CoGroup( "similarity", countPipe, new Fields( "uid" ), joinPipe, new Fields( "uid2" ) );

    joinPipe = new Rename( joinPipe, tokenCount, new Fields( "token_count2" ) );

    // use a DEBUG to check the values in the tuple stream; turn off in the FLOWDEF below
    joinPipe = new Each( joinPipe, DebugLevel.VERBOSE, new Debug( true ) );

    Fields expressionArguments = new Fields( "token_count1", "token_count2", "common" );
    commonFilter = "( token_count1 == common ) || ( token_count2 == common )";
    joinPipe = new Each( joinPipe, expressionArguments, new ExpressionFilter( commonFilter, Integer.TYPE ) );

    Fields ochiaiArguments = new Fields( "uid1", "token_count1", "uid2", "token_count2", "common" );
    Fields resultFields = new Fields( "uid", "recommend_uid", "similarity" );
    joinPipe = new Each( joinPipe, ochiaiArguments, new OchiaiFunction( resultFields ), Fields.RESULTS );

    flow part #7
    apply thresholds to filter out poor recommendations

    Fields similarityArguments = new Fields( "similarity" );
    commonFilter = String.format(Locale.US, "similarity < %f || similarity > %f", MIN_SIMILARITY, MAX_SIMILARITY );
    joinPipe = new Each( joinPipe, similarityArguments, new ExpressionFilter( commonFilter, Double.TYPE ) );

    connect up all the flow, generate a flow diagram, then run the flow.
    results for recommended users get stored in the "similarityPath" sink tap.

    FlowDef flowDef = FlowDef.flowDef().setName( "similarity" );
    flowDef.addSource( tweetPipe, tweetTap );
    flowDef.addSource( stopPipe, stopTap );
    flowDef.addTailSink( tokenPipe, tokenTap );
    flowDef.addTailSink( joinPipe, similarityTap );

    // set to DebugLevel.VERBOSE for trace, or DebugLevel.NONE in production
    flowDef.setDebugLevel( DebugLevel.VERBOSE );

    // set to AssertionLevel.STRICT for all assertions, or AssertionLevel.NONE in production
    flowDef.setAssertionLevel( AssertionLevel.STRICT );

    Flow similarityFlow = flowConnector.connect( flowDef );
    similarityFlow.writeDOT( "dot/similarity.dot" );
    String stopPath = "src/test/data/en.stop";
    String tfidfPath = "output/out2";

    Properties properties = new Properties();
    AppProps.setApplicationJarClass( properties, Main.class );
    HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );

    // create source and sink taps
    Tap docTap = new Hfs( new TextDelimited( true, "\t" ), docPath );
    Tap wcTap = new Hfs( new TextDelimited( true, "\t" ), wcPath );

    Fields stop = new Fields( "stop" );
    Tap stopTap = new Hfs( new TextDelimited( stop, true, "\t" ), stopPath );
    Tap tfidfTap = new Hfs( new TextDelimited( true, "\t" ), tfidfPath );

    // specify a regex operation to split the "document" text lines into a token stream
    Fields token = new Fields( "token" );
    Fields text = new Fields( "text" );
    RegexSplitGenerator splitter = new RegexSplitGenerator( token, "[ \\[\\]\\(\\),.]" );
    Fields fieldSelector = new Fields( "doc_id", "token" );
    Pipe docPipe = new Each( "token", text, splitter, fieldSelector );

    // define "ScrubFunction" to clean up the token stream
    Fields scrubArguments = new Fields( "doc_id", "token" );
    docPipe = new Each( docPipe, scrubArguments, new ScrubFunction( scrubArguments ), Fields.RESULTS );

    // perform a left join to remove stop words, discarding the rows
    // which joined with stop words, i.e., were non-null after left join
    Pipe stopPipe = new Pipe( "stop" );
    Pipe tokenPipe = new HashJoin( docPipe, token, stopPipe, stop, new LeftJoin() );
    tokenPipe = new Each( tokenPipe, stop, new RegexFilter( "^$" ) );
    tokenPipe = new Retain( tokenPipe, fieldSelector );

    // one branch of the flow tallies the token counts for term frequency (TF)
    Pipe tfPipe = new Pipe( "TF", tokenPipe );
    Fields tf_count = new Fields( "tf_count" );
    tfPipe = new CountBy( tfPipe, new Fields( "doc_id", "token" ), tf_count );

    Fields tf_token = new Fields( "tf_token" );
    tfPipe = new Rename( tfPipe, token, tf_token );

    // one branch counts the number of documents (D)
    Fields doc_id = new Fields( "doc_id" );
    Fields tally = new Fields( "tally" );
    Fields rhs_join = new Fields( "rhs_join" );
    Fields n_docs = new Fields( "n_docs" );
    Pipe dPipe = new Unique( "D", tokenPipe, doc_id );
    dPipe = new Each( dPipe, new Insert( tally, 1 ), Fields.ALL );
    dPipe = new Each( dPipe, new Insert( rhs_join, 1 ), Fields.ALL );
    dPipe = new SumBy( dPipe, rhs_join, tally, n_docs, long.class );

    // one branch tallies the token counts for document frequency (DF)
    Pipe dfPipe = new Unique( "DF", tokenPipe, Fields.ALL );
    Fields df_count = new Fields( "df_count" );
    dfPipe = new CountBy( dfPipe, token, df_count );

    Fields df_token = new Fields( "df_token" );
    Fields lhs_join = new Fields( "lhs_join" );
    dfPipe = new Rename( dfPipe, token, df_token );
    dfPipe = new Each( dfPipe, new Insert( lhs_join, 1 ), Fields.ALL );

    // join to bring together all the components for calculating TF-IDF
    // the D side of the join is smaller, so it goes on the RHS
    Pipe idfPipe = new HashJoin( dfPipe, lhs_join, dPipe, rhs_join );

    // the IDF side of the join is smaller, so it goes on the RHS
    Pipe tfidfPipe = new CoGroup( tfPipe, tf_token, idfPipe, df_token );

    // calculate the TF-IDF weights, per token, per document
    Fields tfidf = new Fields( "tfidf" );
    String expression = "(double) tf_count * Math.log( (double) n_docs / ( 1.0 + df_count ) )";
    ExpressionFunction tfidfExpression = new ExpressionFunction( tfidf, expression, Double.class );
    Fields tfidfArguments = new Fields( "tf_count", "df_count", "n_docs" );
    tfidfPipe = new Each( tfidfPipe, tfidfArguments, tfidfExpression, Fields.ALL );

    fieldSelector = new Fields( "tf_token", "doc_id", "tfidf" );
    tfidfPipe = new Retain( tfidfPipe, fieldSelector );
    tfidfPipe = new Rename( tfidfPipe, tf_token, token );

    // keep track of the word counts, which are useful for QA
    Pipe wcPipe = new Pipe( "wc", tfPipe );

    Fields count = new Fields( "count" );
    wcPipe = new SumBy( wcPipe, tf_token, tf_count, count, long.class );
    wcPipe = new Rename( wcPipe, tf_token, token );

    // additionally, sort by count
    wcPipe = new GroupBy( wcPipe, count, count );

    // connect the taps, pipes, etc., into a flow
    FlowDef flowDef = FlowDef.flowDef()
     .setName( "tfidf" )
     .addSource( docPipe, docTap )
     .addSource( stopPipe, stopTap )
     .addTailSink( tfidfPipe, tfidfTap )
     .addTailSink( wcPipe, wcTap );

    //run ambrose and cascading
    Flow tfidfFlow = flowConnector.connect( flowDef );
    EmbeddedAmbroseCascadingNotifier server = new EmbeddedAmbroseCascadingNotifier();
public class CascadingUtils {
    public static void identityFlow(Tap source, Tap sink, Fields selectFields) {
        Pipe pipe = new Pipe("pipe");
        pipe = new Each(pipe, selectFields, new Identity());
        Flow flow = new HadoopFlowConnector().connect(source, sink, pipe);
    String classifyPath = args[ 1 ];

    // set up the config properties
    Properties properties = new Properties();
    AppProps.setApplicationJarClass( properties, Main.class );
    HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );

    // create source and sink taps
    Tap inputTap = new Hfs( new TextDelimited( true, "\t" ), inputPath );
    Tap classifyTap = new Hfs( new TextDelimited( true, "\t" ), classifyPath );

    // handle command line options
    OptionParser optParser = new OptionParser();
    optParser.accepts( "pmml" ).withRequiredArg();

    OptionSet options = optParser.parse( args );

    // connect the taps, pipes, etc., into a flow
    FlowDef flowDef = FlowDef.flowDef()
      .setName( "classify" )
      .addSource( "input", inputTap )
      .addSink( "classify", classifyTap );

    // build a Cascading assembly from the PMML description
    if( options.hasArgument( "pmml" ) )
      String pmmlPath = (String) options.valuesOf( "pmml" ).get( 0 );

      PMMLPlanner pmmlPlanner = new PMMLPlanner()
        .setPMMLInput( new File( pmmlPath ) )
        .setDefaultPredictedField( new Fields( "predict", Double.class ) ); // default value if missing from the model

      flowDef.addAssemblyPlanner( pmmlPlanner );

    // write a DOT file and run the flow
    Flow classifyFlow = flowConnector.connect( flowDef );
    classifyFlow.writeDOT( "dot/classify.dot" );
    String shadePath = args[ 9 ];
    String recoPath = args[ 10 ];

    Properties properties = new Properties();
    AppProps.setApplicationJarClass( properties, Main.class );
    HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );

    // create taps for sources, sinks, traps
    Tap gisTap = new Hfs( new TextLine( new Fields( "line" ) ), gisPath );
    Tap metaTreeTap = new Hfs( new TextDelimited( true, "\t" ), metaTreePath );
    Tap metaRoadTap = new Hfs( new TextDelimited( true, "\t" ), metaRoadPath );
    Tap logsTap = new Hfs( new TextDelimited( true, "," ), logsPath );
    Tap trapTap = new Hfs( new TextDelimited( true, "\t" ), trapPath );
    Tap tsvTap = new Hfs( new TextDelimited( true, "\t" ), tsvPath );
    Tap treeTap = new Hfs( new TextDelimited( true, "\t" ), treePath );
    Tap roadTap = new Hfs( new TextDelimited( true, "\t" ), roadPath );
    Tap parkTap = new Hfs( new TextDelimited( true, "\t" ), parkPath );
    Tap shadeTap = new Hfs( new TextDelimited( true, "\t" ), shadePath );
    Tap recoTap = new Hfs( new TextDelimited( true, "\t" ), recoPath );

    // specify a regex to split the GIS dump into known fields
    Fields fieldDeclaration = new Fields( "blurb", "misc", "geo", "kind" );
    String regex =  "^\"(.*)\",\"(.*)\",\"(.*)\",\"(.*)\"$";
    int[] gisGroups = { 1, 2, 3, 4 };
    RegexParser parser = new RegexParser( fieldDeclaration, regex, gisGroups );
    Pipe gisPipe = new Each( new Pipe( "gis" ), new Fields( "line" ), parser );

    // checkpoint the cleaned-up GIS data
    Checkpoint tsvCheck = new Checkpoint( "tsv", gisPipe );

    // parse the "park" output
    Pipe parkPipe = new Pipe( "park", tsvCheck );
    regex = "^\\s+Community Type\\:\\s+Park.*$";
    parkPipe = new Each( parkPipe, new Fields( "misc" ), new RegexFilter( regex ) );

    // parse the "tree" output
    Pipe treePipe = new Pipe( "tree", tsvCheck );
    regex = "^\\s+Private\\:\\s+(\\S+)\\s+Tree ID\\:\\s+(\\d+)\\s+.*Situs Number\\:\\s+(\\d+)\\s+Tree Site\\:\\s+(\\d+)\\s+Species\\:\\s+(\\S.*\\S)\\s+Source.*$";
    treePipe = new Each( treePipe, new Fields( "misc" ), new RegexFilter( regex ) );

    Fields treeFields = new Fields( "priv", "tree_id", "situs", "tree_site", "raw_species" );
    int[] treeGroups = { 1, 2, 3, 4, 5 };
    parser = new RegexParser( treeFields, regex, treeGroups );
    treePipe = new Each( treePipe, new Fields( "misc" ), parser, Fields.ALL );

    // scrub "species" as a primary key
    regex = "^([\\w\\s]+).*$";
    int[] speciesGroups = { 1 };
    parser = new RegexParser( new Fields( "scrub_species" ), regex, speciesGroups );
    treePipe = new Each( treePipe, new Fields( "raw_species" ), parser, Fields.ALL );
    String expression = "scrub_species.trim().toLowerCase()";
    ExpressionFunction exprFunc = new ExpressionFunction( new Fields( "tree_species" ), expression, String.class );
    treePipe = new Each( treePipe, new Fields( "scrub_species" ), exprFunc, Fields.ALL );

    // join with tree metadata
    Pipe metaTreePipe = new Pipe( "meta_tree" );
    treePipe = new HashJoin( treePipe, new Fields( "tree_species" ), metaTreePipe, new Fields( "species" ), new InnerJoin() );
    treePipe = new Rename( treePipe, new Fields( "blurb" ), new Fields( "tree_name" ) );

    regex = "^(\\S+),(\\S+),(\\S+)\\s*$";
    int[] gpsGroups = { 1, 2, 3 };
    parser = new RegexParser( new Fields( "tree_lat", "tree_lng", "tree_alt" ), regex, gpsGroups );
    treePipe = new Each( treePipe, new Fields( "geo" ), parser, Fields.ALL );

    // determine a tree geohash
    Fields geohashArguments = new Fields( "tree_lat", "tree_lng" );
    treePipe = new Each( treePipe, geohashArguments, new GeoHashFunction( new Fields( "tree_geohash" ), 6 ), Fields.ALL );

    Fields fieldSelector = new Fields( "tree_name", "priv", "tree_id", "situs", "tree_site", "species", "wikipedia", "calflora", "min_height", "max_height", "tree_lat", "tree_lng", "tree_alt", "tree_geohash" );
    treePipe = new Retain( treePipe, fieldSelector );

    // parse the "road" output
    Pipe roadPipe = new Pipe( "road", tsvCheck );
    regex = "^\\s+Sequence\\:.*\\s+Year Constructed\\:\\s+(\\d+)\\s+Traffic Count\\:\\s+(\\d+)\\s+Traffic Index\\:\\s+(\\w.*\\w)\\s+Traffic Class\\:\\s+(\\w.*\\w)\\s+Traffic Date.*\\s+Paving Length\\:\\s+(\\d+)\\s+Paving Width\\:\\s+(\\d+)\\s+Paving Area\\:\\s+(\\d+)\\s+Surface Type\\:\\s+(\\w.*\\w)\\s+Surface Thickness.*\\s+Bike Lane\\:\\s+(\\w+)\\s+Bus Route\\:\\s+(\\w+)\\s+Truck Route\\:\\s+(\\w+)\\s+Remediation.*$";
    roadPipe = new Each( roadPipe, new Fields( "misc" ), new RegexFilter( regex ) );
    Fields roadFields = new Fields( "year_construct", "traffic_count", "traffic_index", "traffic_class", "paving_length", "paving_width", "paving_area", "surface_type", "bike_lane", "bus_route", "truck_route" );
    int[] roadGroups = { 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11 };
    parser = new RegexParser( roadFields, regex, roadGroups );
    roadPipe = new Each( roadPipe, new Fields( "misc" ), parser, Fields.ALL );

    // join with road metadata
    Pipe metaRoadPipe = new Pipe( "meta_road" );
    roadPipe = new HashJoin( roadPipe, new Fields( "surface_type" ), metaRoadPipe, new Fields( "pavement_type" ), new InnerJoin() );
    roadPipe = new Rename( roadPipe, new Fields( "blurb" ), new Fields( "road_name" ) );

    // estimate albedo based on the road surface age and pavement type
    Fields albedoArguments = new Fields( "year_construct", "albedo_new", "albedo_worn" );
    roadPipe = new Each( roadPipe, albedoArguments, new AlbedoFunction( new Fields( "albedo" ), 2002 ), Fields.ALL );

    // generate road segments, with midpoint, y=mx+b, and road_geohash for each
    Fields segmentArguments = new Fields( "geo" );
    Fields segmentResults = new Fields( "lat0", "lng0", "alt0", "lat1", "lng1", "alt1", "lat_mid", "lng_mid" );
    roadPipe = new Each( roadPipe, segmentArguments, new RoadSegmentFunction( segmentResults ), Fields.ALL );

    geohashArguments = new Fields( "lat_mid", "lng_mid" );
    roadPipe = new Each( roadPipe, geohashArguments, new GeoHashFunction( new Fields( "road_geohash" ), 6 ), Fields.ALL );

    fieldSelector = new Fields( "road_name", "year_construct", "traffic_count", "traffic_index", "traffic_class", "paving_length", "paving_width", "paving_area", "surface_type", "bike_lane", "bus_route", "truck_route", "albedo", "lat0", "lng0", "alt0", "lat1", "lng1", "alt1", "road_geohash" );
    roadPipe = new Retain( roadPipe, fieldSelector );

    // join the tree and road pipes to estimate shade
    Pipe shadePipe = new Pipe( "shade", roadPipe );
    shadePipe = new CoGroup( shadePipe, new Fields( "road_geohash" ), treePipe, new Fields( "tree_geohash" ), new InnerJoin() );

    // calculate a rough estimate for distance from tree to road, then filter for "< ~1 block"
    Fields treeDistArguments = new Fields( "tree_lat", "tree_lng", "lat0", "lng0", "lat1", "lng1" );
    Fields tree_dist = new Fields( "tree_dist" );
    shadePipe = new Each( shadePipe, treeDistArguments, new TreeDistanceFunction( tree_dist ), Fields.ALL );

    ExpressionFilter distFilter = new ExpressionFilter( "tree_dist > 25.0", Double.class );
    shadePipe = new Each( shadePipe, tree_dist, distFilter );

    // checkpoint this (big) calculation too
    fieldSelector = new Fields( "road_name", "year_construct", "traffic_count", "traffic_index", "traffic_class", "paving_length", "paving_width", "paving_area", "surface_type", "bike_lane", "bus_route", "truck_route", "albedo", "lat0", "lng0", "lat1", "lng1", "tree_name", "priv", "tree_id", "situs", "tree_site", "species", "wikipedia", "calflora", "min_height", "max_height", "tree_lat", "tree_lng", "tree_alt", "tree_dist", "tree_geohash" );
    shadePipe = new Retain( shadePipe, fieldSelector );
    shadePipe = new GroupBy( shadePipe, new Fields( "tree_name" ), new Fields( "tree_dist" ) );

    Checkpoint shadeCheck = new Checkpoint( "shade", shadePipe );

    // determine the geohash for GPS tracks log events
    Pipe logsPipe = new Pipe( "logs" );
    geohashArguments = new Fields( "lat", "lng" );
    logsPipe = new Each( logsPipe, geohashArguments, new GeoHashFunction( new Fields( "gps_geohash" ), 6 ), Fields.ALL );

    // prepare data for recommendations
    // NB: RHS is large given the sample data, but in practice the logs on the LHS could be much larger
    Pipe recoPipe = new Pipe( "reco", logsPipe );
    recoPipe = new CoGroup( recoPipe, new Fields( "gps_geohash" ), shadeCheck, new Fields( "tree_geohash" ), new InnerJoin() );

    // connect the taps, pipes, etc., into a flow
    FlowDef flowDef = FlowDef.flowDef()
     .setName( "copa" )
     .addSource( gisPipe, gisTap )
     .addTrap( gisPipe, trapTap )
     .addCheckpoint( tsvCheck, tsvTap )
     .addTailSink( parkPipe, parkTap )
     .addSource( metaTreePipe, metaTreeTap )
     .addSource( metaRoadPipe, metaRoadTap )
     .addSink( treePipe, treeTap )
     .addSink( roadPipe, roadTap )
     .addCheckpoint( shadeCheck, shadeTap )
     .addSource( logsPipe, logsTap )
     .addTailSink( recoPipe, recoTap )

    // write a DOT file and run the flow
    Flow copaFlow = flowConnector.connect( flowDef );
    copaFlow.writeDOT( "dot/copa.dot" );
     String docPath = args[ 0 ];
   String wcPath = args[ 1 ];

   Properties properties = new Properties();
   AppProps.setApplicationJarClass( properties, Main.class );
   HadoopFlowConnector flowConnector = new HadoopFlowConnector( properties );

   FlowDef flowDef = createFlowDef(docPath, wcPath);

   // write a DOT file and run the flow
   Flow wcFlow = flowConnector.connect( flowDef );
   wcFlow.writeDOT( "dot/wc.dot" );
  public FlowConnector getFlowConnector( Map<Object, Object> properties )
    return new HadoopFlowConnector( properties );
    Hfs sink = new Hfs(new ProtobufScheme("value", Example.Person.class), "/tmp/output");
    Map<Object, Object> properties = new HashMap<Object, Object>(){{
      put("io.serializations", new JobConf().get("io.serializations") + "," + ProtobufSerialization.class.getName());
    new HadoopFlowConnector(properties).connect(t, sink, groupByPipe).complete();

    TupleEntryIterator tei = sink.openForRead(new HadoopFlowProcess(new JobConf()));
    Set<Tuple> tuples = new HashSet<Tuple>();
    while (tei.hasNext()) {
    Hfs sink = new Hfs(new ProtobufScheme("value", Example.Person.class), "/tmp/output");
    Map<Object, Object> properties = new HashMap<Object, Object>(){{
          new JobConf().get("io.serializations") + "," + ProtobufSerialization.class.getName());
    new HadoopFlowConnector(properties).connect(t, sink, groupByPipe).complete();

    TupleEntryIterator tei = sink.openForRead(new HadoopFlowProcess(new JobConf()));
    Set<Tuple> tuples = new HashSet<Tuple>();
    while (tei.hasNext()) {
