Package com.elastisys.scale.commons.logreplayer

Source Code of com.elastisys.scale.commons.logreplayer.BurstingApacheLogReplayer$ResponseCallback

package com.elastisys.scale.commons.logreplayer;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.util.concurrent.TimeUnit;

import org.apache.http.HttpResponse;
import org.apache.http.HttpStatus;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.concurrent.FutureCallback;
import org.apache.http.impl.NoConnectionReuseStrategy;
import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
import org.apache.http.impl.nio.client.HttpAsyncClients;
import org.apache.http.nio.client.HttpAsyncClient;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.common.base.Charsets;
import com.google.common.base.Stopwatch;
import com.google.common.io.Files;
import com.google.common.util.concurrent.RateLimiter;

/**
* An Apache log file replayer that replays an aoache log file (in in either <a
* href="http://httpd.apache.org/docs/1.3/logs.html#combined">combined</a> or <a
* href="http://httpd.apache.org/docs/1.3/logs.html#common">common</a> log
* format) against a target URL in bursts of a given duration.
* <p/>
* The log replayer accepts a burst duration (in seconds) and will, for every
* burst frame time window, apply the average request rate found in the log file
* over that time frame.
*
*
*
*/
public class BurstingApacheLogReplayer implements Runnable {

  private static final int CONNECTION_REQUEST_TIMEOUT = 3000;

  private static final int CONNECT_TIMEOUT = 3000;

  private static final int SOCKET_TIMEOUT = 3000;

  static Logger LOG = LoggerFactory
      .getLogger(BurstingApacheLogReplayer.class);

  /**
   * Path to an apache log file in either <a
   * href="http://httpd.apache.org/docs/1.3/logs.html#combined">combined</a>
   * or <a href="http://httpd.apache.org/docs/1.3/logs.html#common">common</a>
   * log format
   */
  private final String logFile;

  /** The application URL to subject to load. */
  private final String targetUrl;

  /**
   * The length (in seconds) of a request burst. The request log will be split
   * into burst frames of this duration, during which the average request rate
   * of the frame is applied to the target URL.
   */
  private final int burstDuration;

  /**
   * Creates a new {@link BurstingApacheLogReplayer}.
   *
   * @param logFile
   *            Path to an apache log file in either <a
   *            href="http://httpd.apache.org/docs/1.3/logs.html#combined"
   *            >combined</a> or <a
   *            href="http://httpd.apache.org/docs/1.3/logs.html#common"
   *            >common</a> log format
   * @param targetUrl
   *            The application URL to subject to load.
   * @param burstDuration
   *            The length (in seconds) of a request burst. The request log
   *            will be split into burst frames of this duration, during which
   *            the average request rate of the frame is applied to the target
   *            URL.
   */
  public BurstingApacheLogReplayer(String logFile, String targetUrl,
      int burstDuration) {
    checkNotNull(logFile, "missing logFile");
    checkArgument(new File(logFile).isFile(), "%s is not a valid file",
        logFile);
    checkNotNull(targetUrl, "missing targetUrl");
    checkArgument(burstDuration > 0, "burstDuration must be positive");

    this.logFile = logFile;
    this.targetUrl = targetUrl;
    this.burstDuration = burstDuration;
  }

  @Override
  public void run() {
    try (CloseableHttpAsyncClient httpClient = createHttpClient()) {
      replayLog(httpClient);
    } catch (Exception e) {
      LOG.error(String.format("log replayer failed: %s", e.getMessage()),
          e);
    }
  }

  private void replayLog(HttpAsyncClient httpClient) throws IOException {
    File apacheLog = new File(this.logFile);

    DateTime burstStart = getLogStart(apacheLog);
    LOG.info("Log starts at {}", burstStart);
    try (BufferedReader reader = new BufferedReader(new FileReader(
        apacheLog))) {
      while (reader.ready()) {
        DateTime burstEnd = burstStart.plusSeconds(this.burstDuration);
        double burstRate = getNextBurstRate(reader, burstStart,
            burstEnd);
        LOG.info("applying a burst with request rate {} "
            + "for {} second(s)", burstRate, this.burstDuration);
        applyBurst(httpClient, burstRate, this.burstDuration);
        burstStart = burstEnd;
      }
    }
  }

  private void applyBurst(HttpAsyncClient httpClient, double burstRate,
      int burstDuration) {
    Stopwatch stopwatch = Stopwatch.createStarted();
    RateLimiter rateLimiter = RateLimiter.create(burstRate);
    while (stopwatch.elapsed(TimeUnit.SECONDS) < burstDuration) {
      rateLimiter.acquire();
      ResponseCallback responseCallback = new ResponseCallback();
      httpClient.execute(new HttpGet(this.targetUrl), responseCallback);
    }
  }

  private DateTime getLogStart(File apacheLog) throws IOException {
    CommonFormatLogRecord logRecord = CommonFormatLogRecord.parse(Files
        .readFirstLine(apacheLog, Charsets.UTF_8));
    return logRecord.getFinishedProcessingTimestamp();
  }

  /**
   * Get the request rate in the apache log file for the given burst frame.
   *
   * @param reader
   *            A reader for the apache log file.
   * @param burstStart
   *            Burst frame start (inclusive).
   * @param burstEnd
   *            Burst frame end (exclusive).
   * @return
   * @throws IOException
   */
  private double getNextBurstRate(BufferedReader reader, DateTime burstStart,
      DateTime burstEnd) throws IOException {
    int requestCount = 0;
    int readAheadLimit = 8096;
    reader.mark(readAheadLimit);

    String line = null;
    while ((line = reader.readLine()) != null) {
      CommonFormatLogRecord request = CommonFormatLogRecord.parse(line);
      DateTime requestTimestamp = request
          .getFinishedProcessingTimestamp();
      if (requestTimestamp.isBefore(burstStart)) {
        LOG.warn(String.format("ignoring out-of-order request: "
            + "request timestamp %s is before "
            + "burst start %s. log line: '%s'", requestTimestamp,
            burstStart, line));
        continue;
      }
      if (requestTimestamp.isEqual(burstEnd)
          || requestTimestamp.isAfter(burstEnd)) {
        // frame end passed: un-read the last line and return
        reader.reset();
        break;
      }
      requestCount++;
      reader.mark(readAheadLimit);
    }
    if (LOG.isDebugEnabled()) {
      LOG.debug("burst frame [{},{}) contained {} request(s).",
          burstStart, burstEnd, requestCount);
    }
    return (double) requestCount / this.burstDuration;
  }

  private CloseableHttpAsyncClient createHttpClient() throws RuntimeException {
    RequestConfig requestConfig = RequestConfig.custom()
        .setConnectionRequestTimeout(CONNECTION_REQUEST_TIMEOUT)
        .setSocketTimeout(SOCKET_TIMEOUT)
        .setConnectTimeout(CONNECT_TIMEOUT).build();
    // don't reuse connections
    NoConnectionReuseStrategy reuseStrategy = new NoConnectionReuseStrategy();

    CloseableHttpAsyncClient asyncClient = HttpAsyncClients.custom()
        .setConnectionReuseStrategy(reuseStrategy)
        .setDefaultRequestConfig(requestConfig).build();
    asyncClient.start();
    return asyncClient;
  }

  public static class ResponseCallback implements
      FutureCallback<HttpResponse> {

    @Override
    public void failed(final Exception ex) {
      LOG.error(" ! <- " + ex.getMessage());
    }

    @Override
    public void cancelled() {
      LOG.error(" ! <- cancelled");
    }

    @Override
    public void completed(HttpResponse result) {
      if (result.getStatusLine().getStatusCode() != HttpStatus.SC_OK) {
        LOG.info(" <- '{}'", result.getStatusLine());
      }
    }
  }

}
TOP

Related Classes of com.elastisys.scale.commons.logreplayer.BurstingApacheLogReplayer$ResponseCallback

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.