Package org.apache.samza.system.mock

Source Code of org.apache.samza.system.mock.MockSystemConsumer

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.system.mock;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.samza.metrics.MetricsRegistryMap;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.BlockingEnvelopeMap;
import org.apache.samza.util.Clock;

/**
* MockSystemConsumer is a class that simulates a multi-threaded consumer that
* uses BlockingEnvelopeMap. The primary use for this class is to do performance
* testing.
*
* This class works by starting up (threadCount) threads. Each thread adds
* (messagesPerBatch) to the BlockingEnvelopeMap, then sleeps for
* (brokerSleepMs). The sleep is important to simulate network latency when
* executing a fetch against a remote streaming system (i.e. Kafka).
*/
public class MockSystemConsumer extends BlockingEnvelopeMap {
  private final int messagesPerBatch;
  private final int threadCount;
  private final int brokerSleepMs;

  /**
   * The SystemStreamPartitions that this consumer is in charge of.
   */
  private final Set<SystemStreamPartition> ssps;

  /**
   * The consumer threads that are putting IncomingMessageEnvelopes into
   * BlockingEnvelopeMap.
   */
  private List<Thread> threads;

  /**
   *
   * @param messagesPerBatch
   *          The number of messages to add to the BlockingEnvelopeMap before
   *          sleeping.
   * @param threadCount
   *          How many threads to run.
   * @param brokerSleepMs
   *          How long each thread should sleep between batch writes.
   */
  public MockSystemConsumer(int messagesPerBatch, int threadCount, int brokerSleepMs) {
    super(new MetricsRegistryMap("test-container-performance"), new Clock() {
      @Override
      public long currentTimeMillis() {
        return System.currentTimeMillis();
      }
    });

    this.messagesPerBatch = messagesPerBatch;
    this.threadCount = threadCount;
    this.brokerSleepMs = brokerSleepMs;
    this.ssps = new HashSet<SystemStreamPartition>();
    this.threads = new ArrayList<Thread>(threadCount);
  }

  /**
   * Assign SystemStreamPartitions to all of the threads, and start them up to
   * begin simulating consuming messages.
   */
  @Override
  public void start() {
    for (int i = 0; i < threadCount; ++i) {
      Set<SystemStreamPartition> threadSsps = new HashSet<SystemStreamPartition>();

      // Assign SystemStreamPartitions for this thread.
      for (SystemStreamPartition ssp : ssps) {
        if (Math.abs(ssp.hashCode()) % threadCount == i) {
          threadSsps.add(ssp);
        }
      }

      // Start thread.
      Thread thread = new Thread(new MockSystemConsumerRunnable(threadSsps), "MockSystemConsumer-" + i);
      thread.setDaemon(true);
      threads.add(thread);
      thread.start();
    }
  }

  /**
   * Kill all the threads, and shutdown.
   */
  @Override
  public void stop() {
    for (Thread thread : threads) {
      thread.interrupt();
    }

    try {
      for (Thread thread : threads) {
        thread.join();
      }
    } catch (InterruptedException e) {
    }
  }

  @Override
  public void register(SystemStreamPartition systemStreamPartition, String offset) {
    super.register(systemStreamPartition, offset);
    ssps.add(systemStreamPartition);
    setIsAtHead(systemStreamPartition, true);
  }

  /**
   * The worker thread for MockSystemConsumer that simulates reading messages
   * from a remote streaming system (i.e. Kafka), and writing them to the
   * BlockingEnvelopeMap.
   */
  public class MockSystemConsumerRunnable implements Runnable {
    private final Set<SystemStreamPartition> ssps;

    public MockSystemConsumerRunnable(Set<SystemStreamPartition> ssps) {
      this.ssps = ssps;
    }

    @Override
    public void run() {
      try {
        while (!Thread.interrupted() && ssps.size() > 0) {
          Set<SystemStreamPartition> sspsToFetch = new HashSet<SystemStreamPartition>();

          // Only fetch messages when there are no outstanding messages left.
          for (SystemStreamPartition ssp : ssps) {
            if (getNumMessagesInQueue(ssp) <= 0) {
              sspsToFetch.add(ssp);
            }
          }

          // Simulate a broker fetch request's network latency.
          Thread.sleep(brokerSleepMs);

          // Add messages to the BlockingEnvelopeMap.
          for (SystemStreamPartition ssp : sspsToFetch) {
            for (int i = 0; i < messagesPerBatch; ++i) {
              put(ssp, new IncomingMessageEnvelope(ssp, "0", "key", "value"));
            }
          }
        }
      } catch (InterruptedException e) {
        System.out.println("Got interrupt. Shutting down.");
      }
    }
  }
}
TOP

Related Classes of org.apache.samza.system.mock.MockSystemConsumer

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.