434445464748495051
LocalCluster cluster = new LocalCluster(); cluster.submitTopology("word-count", conf, builder.createTopology()); Thread.sleep(20000); cluster.shutdown(); } } }
122123124125126127128129130131
LocalCluster cluster = new LocalCluster(); cluster.submitTopology("storm-jms-example", conf, builder.createTopology()); Utils.sleep(60000); cluster.killTopology("storm-jms-example"); cluster.shutdown(); } } }
188189190191192193194195196197
String[] urlsToTry = new String[] { "foo.com/blog/1", "engineering.twitter.com/blog/5", "notaurl.com"}; for(String url: urlsToTry) { System.out.println("Reach of " + url + ": " + drpc.execute(TOPOLOGY_NAME, url)); } cluster.shutdown(); drpc.shutdown(); } } }
7273747576777879808182
LocalCluster cluster = new LocalCluster(); cluster.submitTopology(topologyName, conf, builder.createTopology()); Thread.sleep(600000); cluster.shutdown(); } public static void SetRemoteTopology() throws AlreadyAliveException, InvalidTopologyException, TopologyAssignException {
123124125126127128129130131132133
LocalCluster cluster = new LocalCluster(); cluster.submitTopology("SplitMerge", conf, builder.createTopology()); Thread.sleep(60000); cluster.shutdown(); } public static void SetRemoteTopology() throws AlreadyAliveException, InvalidTopologyException, TopologyAssignException {
3233343536373839404142
StormTopology topology = buildTopology(); LocalCluster cluster = new LocalCluster(); cluster.submitTopology("SplitMerge", conf, topology); Thread.sleep(60000); cluster.shutdown(); } @Override public int run(String[] args) throws Exception { Config conf = getConf();
7172737475767778798081
String.valueOf(conf.get("topology.name")), conf, builder.createTopology()); Thread.sleep(200000); cluster.shutdown(); } else { StormSubmitter.submitTopology( String.valueOf(conf.get("topology.name")), conf, builder.createTopology()); }