Package kafka.etl.impl

Source Code of kafka.etl.impl.DataGenerator

/*
* Copyright 2010 LinkedIn
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.etl.impl;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import kafka.etl.KafkaETLCommons;
import kafka.etl.KafkaETLUtils;
import kafka.etl.Props;
import kafka.message.ByteBufferMessageSet;
import kafka.message.Message;
import kafka.producer.SimpleProducer;

/**
* Use this class to produce test events to Kafka server. Each event contains a
* random timestamp in text format.
*/
public class DataGenerator {

  protected final static Random RANDOM = new Random(
      System.currentTimeMillis());

  protected Props _props;
  protected List<SimpleProducer> _producers = null;
  protected String _topic;
  protected int _count;
  protected final int TCP_BUFFER_SIZE = 300 * 1000;
  protected final int CONNECT_TIMEOUT = 20000; // ms
  protected final int RECONNECT_INTERVAL = Integer.MAX_VALUE; // ms

  public DataGenerator(String id, Props props) throws Exception {
    _props = props;
    _topic = props.getProperty("kafka.etl.topic");
    System.out.println("topics=" + _topic);
    _count = props.getInt("event.count");

    // initialize kafka producer to generate count events
    String nodePath = KafkaETLCommons.getNodesPath(_props);
    System.out.println("node path=" + nodePath);
    Props nodesProps = KafkaETLUtils.readProps(nodePath);
    _producers = new ArrayList<SimpleProducer>();
    for (String key : nodesProps.stringPropertyNames()) {
      URI uri = nodesProps.getUri(key);
      System.out.println("server uri:" + uri.toString());
      _producers.add(new SimpleProducer(uri.getHost(), uri.getPort(),
          TCP_BUFFER_SIZE, CONNECT_TIMEOUT, RECONNECT_INTERVAL));
    }
  }

  public void run() throws IOException, URISyntaxException {

    int producerId = RANDOM.nextInt() % _producers.size();
    SimpleProducer producer = _producers.get(producerId);

    List<Message> list = new ArrayList<Message>();
    for (int i = 0; i < _count; i++) {
      Long timestamp = RANDOM.nextLong();
      if (timestamp < 0) timestamp = -timestamp;
      byte[] bytes = timestamp.toString().getBytes("UTF8");
      Message message = new Message(bytes);
      list.add(message);
    }
    // send events
    System.out.println(" send " + list.size() + " " + _topic
        + " count events to " + producerId);
    producer.send(_topic, new ByteBufferMessageSet(list));

    // close all producers
    for (SimpleProducer p : _producers) {
      p.close();
    }
  }

  public static void main(String[] args) throws Exception {

    if (args.length < 1)
      throw new Exception("Usage: - config_file");

    Props props = new Props(args[0]);
    DataGenerator job = new DataGenerator("DataGenerator", props);
    job.run();
  }

}
TOP

Related Classes of kafka.etl.impl.DataGenerator

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.