Package kafka.perf.consumer

Source Code of kafka.perf.consumer.SimplePerfConsumer

package kafka.perf.consumer;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;

import scala.collection.Iterator;

import kafka.api.FetchRequest;
import kafka.api.MultiFetchResponse;
import kafka.consumer.SimpleConsumer;
import kafka.message.ByteBufferMessageSet;
import kafka.message.Message;

public class SimplePerfConsumer extends Thread
{
  private SimpleConsumer simpleConsumer;
  private String topic;
  private String consumerName;
  private int fetchSize;
  private AtomicLong bytesRec;
  private AtomicLong messagesRec;
  private AtomicLong lastReportMessageRec;
  private AtomicLong lastReportBytesRec;
  private long offset = 0;
  private final int numParts;

  public SimplePerfConsumer(String topic, String kafkaServerURL, int kafkaServerPort,
                            int kafkaProducerBufferSize, int connectionTimeOut, int reconnectInterval,
                            int fetchSize, String name, int numParts)
  {
    simpleConsumer = new SimpleConsumer(kafkaServerURL,
                                        kafkaServerPort,
                                        connectionTimeOut,
                                        kafkaProducerBufferSize);
    this.topic = topic;
    this.fetchSize = fetchSize;
    consumerName = name;
    bytesRec =  new AtomicLong(0L);
    messagesRec =  new AtomicLong(0L);
    lastReportMessageRec = new AtomicLong(System.currentTimeMillis());
    lastReportBytesRec = new AtomicLong(System.currentTimeMillis());
    this.numParts = numParts;
  }

  public void run() {
    while(true)
    {
      List<FetchRequest> list = new ArrayList<FetchRequest>();
      for(int i=0 ; i < numParts; i++)
      {
        FetchRequest req = new FetchRequest(topic, i, offset, fetchSize);
        list.add(req);
      }
      MultiFetchResponse response = simpleConsumer.multifetch(list);
      if(response.hasNext())
      {
        ByteBufferMessageSet messages = response.next();
        offset+= messages.validBytes();
        bytesRec.getAndAdd(messages.sizeInBytes());
       
        Iterator<Message> it =  messages.iterator();
        while(it.hasNext())
        {
          it.next();
          messagesRec.getAndIncrement();
        }
      }
    }
  }

  public double getMessagesRecPs()
  {
    double val = (double)messagesRec.get() / (System.currentTimeMillis() - lastReportMessageRec.get());
    return val * 1000;
  }

  public String getConsumerName()
  {
    return consumerName;
  }

  public double getMBytesRecPs()
  {
    double val = ((double)bytesRec.get() / (System.currentTimeMillis() - lastReportBytesRec.get())) / (1024*1024);
    return val * 1000;
  }

}
TOP

Related Classes of kafka.perf.consumer.SimplePerfConsumer

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.