Package com.cloudera.oryx.kafka.util

Source Code of com.cloudera.oryx.kafka.util.ProduceConsumeIT

/*
* Copyright (c) 2014, Cloudera, Inc. All Rights Reserved.
*
* Cloudera, Inc. licenses this file to you under the Apache License,
* Version 2.0 (the "License"). You may not use this file except in
* compliance with the License. You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
* CONDITIONS OF ANY KIND, either express or implied. See the License for
* the specific language governing permissions and limitations under the
* License.
*/

package com.cloudera.oryx.kafka.util;

import java.util.Collection;
import java.util.HashSet;

import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.cloudera.oryx.common.OryxTest;
import com.cloudera.oryx.common.collection.CloseableIterator;
import com.cloudera.oryx.common.collection.Pair;
import com.cloudera.oryx.common.io.IOUtils;
import com.cloudera.oryx.common.lang.LoggingRunnable;
import com.cloudera.oryx.zk.LocalZKServer;

/**
* Tests {@link ProduceData} and {@link ConsumeData} together.
*/
public final class ProduceConsumeIT extends OryxTest {

  private static final Logger log = LoggerFactory.getLogger(ProduceConsumeIT.class);

  private static final String TOPIC = "OryxTest";
  private static final int NUM_DATA = 100;

  @Test
  public void testProduceConsume() throws Exception {
    int zkPort = IOUtils.chooseFreePort();
    int kafkaBrokerPort = IOUtils.chooseFreePort();
    try (LocalZKServer localZKServer = new LocalZKServer(zkPort);
         LocalKafkaBroker localKafkaBroker = new LocalKafkaBroker(kafkaBrokerPort, zkPort)) {

      localZKServer.start();
      localKafkaBroker.start();

      KafkaUtils.deleteTopic("localhost", zkPort, TOPIC);
      KafkaUtils.maybeCreateTopic("localhost", zkPort, TOPIC);

      ProduceData produce = new ProduceData(new DefaultCSVDatumGenerator(),
                                            zkPort,
                                            localKafkaBroker.getPort(),
                                            TOPIC,
                                            NUM_DATA,
                                            50);

      final Collection<Integer> keys = new HashSet<>();

      try (CloseableIterator<Pair<String,String>> data = new ConsumeData(TOPIC, zkPort).iterator()) {

        new Thread(new LoggingRunnable() {
          @Override
          public void doRun() {
            while (data.hasNext()) {
              keys.add(Integer.valueOf(data.next().getFirst()));
            }
          }
        }).start();

        // Sleep for a while after starting consumer to let it init
        Thread.sleep(1000L);

        log.info("Producing data");
        produce.start();

        // Sleep for a while before shutting down producer to let both finish
        Thread.sleep(1000L);

      } finally {
        KafkaUtils.deleteTopic("localhost", zkPort, TOPIC);
      }

      assertEquals(NUM_DATA, keys.size());
      for (int i = 0; i < NUM_DATA; i++) {
        assertTrue(keys.contains(i));
      }
    }
  }

}
TOP

Related Classes of com.cloudera.oryx.kafka.util.ProduceConsumeIT

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.