Package io.druid.examples.rabbitmq

Source Code of io.druid.examples.rabbitmq.RabbitMQProducerMain

package io.druid.examples.rabbitmq;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.apache.commons.cli.BasicParser;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;

import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Random;

/**
*
*/
public class RabbitMQProducerMain
{
  public static void main(String[] args)
      throws Exception
  {
    // We use a List to keep track of option insertion order. See below.
    final List<Option> optionList = new ArrayList<Option>();

    optionList.add(OptionBuilder.withLongOpt("help")
        .withDescription("display this help message")
        .create("h"));
    optionList.add(OptionBuilder.withLongOpt("hostname")
        .hasArg()
        .withDescription("the hostname of the AMQP broker [defaults to AMQP library default]")
        .create("b"));
    optionList.add(OptionBuilder.withLongOpt("port")
        .hasArg()
        .withDescription("the port of the AMQP broker [defaults to AMQP library default]")
        .create("n"));
    optionList.add(OptionBuilder.withLongOpt("username")
        .hasArg()
        .withDescription("username to connect to the AMQP broker [defaults to AMQP library default]")
        .create("u"));
    optionList.add(OptionBuilder.withLongOpt("password")
        .hasArg()
        .withDescription("password to connect to the AMQP broker [defaults to AMQP library default]")
        .create("p"));
    optionList.add(OptionBuilder.withLongOpt("vhost")
        .hasArg()
        .withDescription("name of virtual host on the AMQP broker [defaults to AMQP library default]")
        .create("v"));
    optionList.add(OptionBuilder.withLongOpt("exchange")
        .isRequired()
        .hasArg()
        .withDescription("name of the AMQP exchange [required - no default]")
        .create("e"));
    optionList.add(OptionBuilder.withLongOpt("key")
        .hasArg()
        .withDescription("the routing key to use when sending messages [default: 'default.routing.key']")
        .create("k"));
    optionList.add(OptionBuilder.withLongOpt("type")
        .hasArg()
        .withDescription("the type of exchange to create [default: 'topic']")
        .create("t"));
    optionList.add(OptionBuilder.withLongOpt("durable")
        .withDescription("if set, a durable exchange will be declared [default: not set]")
        .create("d"));
    optionList.add(OptionBuilder.withLongOpt("autodelete")
        .withDescription("if set, an auto-delete exchange will be declared [default: not set]")
        .create("a"));
    optionList.add(OptionBuilder.withLongOpt("single")
        .withDescription("if set, only a single message will be sent [default: not set]")
        .create("s"));
    optionList.add(OptionBuilder.withLongOpt("start")
        .hasArg()
        .withDescription("time to use to start sending messages from [default: 2010-01-01T00:00:00]")
        .create());
    optionList.add(OptionBuilder.withLongOpt("stop")
        .hasArg()
        .withDescription("time to use to send messages until (format: '2013-07-18T23:45:59') [default: current time]")
        .create());
    optionList.add(OptionBuilder.withLongOpt("interval")
        .hasArg()
        .withDescription("the interval to add to the timestamp between messages in seconds [default: 10]")
        .create());
    optionList.add(OptionBuilder.withLongOpt("delay")
        .hasArg()
        .withDescription("the delay between sending messages in milliseconds [default: 100]")
        .create());

    // An extremely silly hack to maintain the above order in the help formatting.
    HelpFormatter formatter = new HelpFormatter();
    // Add a comparator to the HelpFormatter using the ArrayList above to sort by insertion order.
    formatter.setOptionComparator(new Comparator(){
      @Override
      public int compare(Object o1, Object o2)
      {
        // I know this isn't fast, but who cares! The list is short.
        return optionList.indexOf(o1) - optionList.indexOf(o2);
      }
    });

    // Now we can add all the options to an Options instance. This is dumb!
    Options options = new Options();
    for (Option option : optionList) {
      options.addOption(option);
    }

    CommandLine cmd = null;

    try{
      cmd = new BasicParser().parse(options, args);

    }
    catch(ParseException e){
      formatter.printHelp("RabbitMQProducerMain", e.getMessage(), options, null);
      System.exit(1);
    }

    if(cmd.hasOption("h")) {
      formatter.printHelp("RabbitMQProducerMain", options);
      System.exit(2);
    }

    ConnectionFactory factory = new ConnectionFactory();

    if(cmd.hasOption("b")){
      factory.setHost(cmd.getOptionValue("b"));
    }
    if(cmd.hasOption("u")){
      factory.setUsername(cmd.getOptionValue("u"));
    }
    if(cmd.hasOption("p")){
      factory.setPassword(cmd.getOptionValue("p"));
    }
    if(cmd.hasOption("v")){
      factory.setVirtualHost(cmd.getOptionValue("v"));
    }
    if(cmd.hasOption("n")){
      factory.setPort(Integer.parseInt(cmd.getOptionValue("n")));
    }

    String exchange = cmd.getOptionValue("e");
    String routingKey = "default.routing.key";
    if(cmd.hasOption("k")){
      routingKey = cmd.getOptionValue("k");
    }

    boolean durable = cmd.hasOption("d");
    boolean autoDelete = cmd.hasOption("a");
    String type = cmd.getOptionValue("t", "topic");
    boolean single = cmd.hasOption("single");
    int interval = Integer.parseInt(cmd.getOptionValue("interval", "10"));
    int delay = Integer.parseInt(cmd.getOptionValue("delay", "100"));

    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss");
    Date stop = sdf.parse(cmd.getOptionValue("stop", sdf.format(new Date())));

    Random r = new Random();
    Calendar timer = Calendar.getInstance();
    timer.setTime(sdf.parse(cmd.getOptionValue("start", "2010-01-01T00:00:00")));

    String msg_template = "{\"utcdt\": \"%s\", \"wp\": %d, \"gender\": \"%s\", \"age\": %d}";

    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.exchangeDeclare(exchange, type, durable, autoDelete, null);

    do{
      int wp = (10 + r.nextInt(90)) * 100;
      String gender = r.nextBoolean() ? "male" : "female";
      int age = 20 + r.nextInt(70);

      String line = String.format(msg_template, sdf.format(timer.getTime()), wp, gender, age);

      channel.basicPublish(exchange, routingKey, null, line.getBytes());

      System.out.println("Sent message: " + line);

      timer.add(Calendar.SECOND, interval);

      Thread.sleep(delay);
    }while((!single && stop.after(timer.getTime())));

    connection.close();
  }

}
TOP

Related Classes of io.druid.examples.rabbitmq.RabbitMQProducerMain

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.