Package org.apache.hadoop.hbase

Source Code of org.apache.hadoop.hbase.IntegrationTestRegionReplicaPerf$PerfEvalCallable

/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;

import com.google.common.base.Objects;
import com.yammer.metrics.core.Histogram;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.chaos.actions.MoveRandomRegionOfTableAction;
import org.apache.hadoop.hbase.chaos.actions.RestartRandomRsExceptMetaAction;
import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy;
import org.apache.hadoop.hbase.chaos.policies.Policy;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.ipc.RpcClient;
import org.apache.hadoop.hbase.regionserver.DisabledRegionSplitPolicy;
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
import org.apache.hadoop.hbase.util.YammerHistogramUtils;
import org.apache.hadoop.mapreduce.Counters;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.util.ToolRunner;
import org.junit.experimental.categories.Category;

import java.util.*;
import java.util.concurrent.Callable;

import static java.lang.String.format;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

/**
* Test for comparing the performance impact of region replicas. Uses
* components of {@link PerformanceEvaluation}. Does not run from
* {@code IntegrationTestsDriver} because IntegrationTestBase is incompatible
* with the JUnit runner. Hence no @Test annotations either. See {@code -help}
* for full list of options.
*/
@Category(IntegrationTests.class)
public class IntegrationTestRegionReplicaPerf extends IntegrationTestBase {

  private static final Log LOG = LogFactory.getLog(IntegrationTestRegionReplicaPerf.class);

  private static final String SLEEP_TIME_KEY = "sleeptime";
  // short default interval because tests don't run very long.
  private static final String SLEEP_TIME_DEFAULT = "" + (10 * 1000l);
  private static final String TABLE_NAME_KEY = "tableName";
  private static final String TABLE_NAME_DEFAULT = "IntegrationTestRegionReplicaPerf";
  private static final String REPLICA_COUNT_KEY = "replicas";
  private static final String REPLICA_COUNT_DEFAULT = "" + 3;
  private static final String PRIMARY_TIMEOUT_KEY = "timeout";
  private static final String PRIMARY_TIMEOUT_DEFAULT = "" + 10 * 1000; // 10 ms
  private static final String NUM_RS_KEY = "numRs";
  private static final String NUM_RS_DEFAULT = "" + 3;

  /** Extract a descriptive statistic from a {@link com.yammer.metrics.core.Histogram}. */
  private enum Stat {
    STDEV {
      @Override
      double apply(Histogram hist) {
        return hist.stdDev();
      }
    },
    FOUR_9S {
      @Override
      double apply(Histogram hist) {
        return hist.getSnapshot().getValue(0.9999);
      }
    };

    abstract double apply(Histogram hist);
  }

  private TableName tableName;
  private long sleepTime;
  private int replicaCount;
  private int primaryTimeout;
  private int clusterSize;

  /**
   * Wraps the invocation of {@link PerformanceEvaluation} in a {@code Callable}.
   */
  static class PerfEvalCallable implements Callable<TimingResult> {
    private final Queue<String> argv = new LinkedList<String>();
    private final Admin admin;

    public PerfEvalCallable(Admin admin, String argv) {
      // TODO: this API is awkward, should take HConnection, not HBaseAdmin
      this.admin = admin;
      this.argv.addAll(Arrays.asList(argv.split(" ")));
      LOG.debug("Created PerformanceEvaluationCallable with args: " + argv);
    }

    @Override
    public TimingResult call() throws Exception {
      PerformanceEvaluation.TestOptions opts = PerformanceEvaluation.parseOpts(argv);
      PerformanceEvaluation.checkTable(admin, opts);
      PerformanceEvaluation.RunResult results[] = null;
      long numRows = opts.totalRows;
      long elapsedTime = 0;
      if (opts.nomapred) {
        results = PerformanceEvaluation.doLocalClients(opts, admin.getConfiguration());
        for (PerformanceEvaluation.RunResult r : results) {
          elapsedTime = Math.max(elapsedTime, r.duration);
        }
      } else {
        Job job = PerformanceEvaluation.doMapReduce(opts, admin.getConfiguration());
        Counters counters = job.getCounters();
        numRows = counters.findCounter(PerformanceEvaluation.Counter.ROWS).getValue();
        elapsedTime = counters.findCounter(PerformanceEvaluation.Counter.ELAPSED_TIME).getValue();
      }
      return new TimingResult(numRows, elapsedTime, results);
    }
  }

  /**
   * Record the results from a single {@link PerformanceEvaluation} job run.
   */
  static class TimingResult {
    public final long numRows;
    public final long elapsedTime;
    public final PerformanceEvaluation.RunResult results[];

    public TimingResult(long numRows, long elapsedTime, PerformanceEvaluation.RunResult results[]) {
      this.numRows = numRows;
      this.elapsedTime = elapsedTime;
      this.results = results;
    }

    @Override
    public String toString() {
      return Objects.toStringHelper(this)
        .add("numRows", numRows)
        .add("elapsedTime", elapsedTime)
        .toString();
    }
  }

  @Override
  public void setUp() throws Exception {
    super.setUp();
    Configuration conf = util.getConfiguration();

    // sanity check cluster
    // TODO: this should reach out to master and verify online state instead
    assertEquals("Master must be configured with StochasticLoadBalancer",
      "org.apache.hadoop.hbase.master.balancer.StochasticLoadBalancer",
      conf.get("hbase.master.loadbalancer.class"));
    // TODO: this should reach out to master and verify online state instead
    assertTrue("hbase.regionserver.storefile.refresh.period must be greater than zero.",
      conf.getLong("hbase.regionserver.storefile.refresh.period", 0) > 0);

    // enable client-side settings
    conf.setBoolean(RpcClient.SPECIFIC_WRITE_THREAD, true);
    // TODO: expose these settings to CLI override
    conf.setLong("hbase.client.primaryCallTimeout.get", primaryTimeout);
    conf.setLong("hbase.client.primaryCallTimeout.multiget", primaryTimeout);
  }

  @Override
  public void setUpCluster() throws Exception {
    util = getTestingUtil(getConf());
    util.initializeCluster(clusterSize);
  }

  @Override
  public void setUpMonkey() throws Exception {
    Policy p = new PeriodicRandomActionPolicy(sleepTime,
      new RestartRandomRsExceptMetaAction(sleepTime),
      new MoveRandomRegionOfTableAction(tableName));
    this.monkey = new PolicyBasedChaosMonkey(util, p);
    // don't start monkey right away
  }

  @Override
  protected void addOptions() {
    addOptWithArg(TABLE_NAME_KEY, "Alternate table name. Default: '"
      + TABLE_NAME_DEFAULT + "'");
    addOptWithArg(SLEEP_TIME_KEY, "How long the monkey sleeps between actions. Default: "
      + SLEEP_TIME_DEFAULT);
    addOptWithArg(REPLICA_COUNT_KEY, "Number of region replicas. Default: "
      + REPLICA_COUNT_DEFAULT);
    addOptWithArg(PRIMARY_TIMEOUT_KEY, "Overrides hbase.client.primaryCallTimeout. Default: "
      + PRIMARY_TIMEOUT_DEFAULT + " (10ms)");
    addOptWithArg(NUM_RS_KEY, "Specify the number of RegionServers to use. Default: "
        + NUM_RS_DEFAULT);
  }

  @Override
  protected void processOptions(CommandLine cmd) {
    tableName = TableName.valueOf(cmd.getOptionValue(TABLE_NAME_KEY, TABLE_NAME_DEFAULT));
    sleepTime = Long.parseLong(cmd.getOptionValue(SLEEP_TIME_KEY, SLEEP_TIME_DEFAULT));
    replicaCount = Integer.parseInt(cmd.getOptionValue(REPLICA_COUNT_KEY, REPLICA_COUNT_DEFAULT));
    primaryTimeout =
      Integer.parseInt(cmd.getOptionValue(PRIMARY_TIMEOUT_KEY, PRIMARY_TIMEOUT_DEFAULT));
    clusterSize = Integer.parseInt(cmd.getOptionValue(NUM_RS_KEY, NUM_RS_DEFAULT));
    LOG.debug(Objects.toStringHelper("Parsed Options")
      .add(TABLE_NAME_KEY, tableName)
      .add(SLEEP_TIME_KEY, sleepTime)
      .add(REPLICA_COUNT_KEY, replicaCount)
      .add(PRIMARY_TIMEOUT_KEY, primaryTimeout)
      .add(NUM_RS_KEY, clusterSize)
      .toString());
  }

  @Override
  public int runTestFromCommandLine() throws Exception {
    test();
    return 0;
  }

  @Override
  public TableName getTablename() {
    return tableName;
  }

  @Override
  protected Set<String> getColumnFamilies() {
    return null;
  }

  /** Compute the mean of the given {@code stat} from a timing results. */
  private static double calcMean(String desc, Stat stat, List<TimingResult> results) {
    double sum = 0;
    int count = 0;

    for (TimingResult tr : results) {
      for (PerformanceEvaluation.RunResult r : tr.results) {
        assertNotNull("One of the run results is missing detailed run data.", r.hist);
        sum += stat.apply(r.hist);
        count += 1;
        LOG.debug(desc + "{" + YammerHistogramUtils.getHistogramReport(r.hist) + "}");
      }
    }
    return sum / count;
  }

  public void test() throws Exception {
    int maxIters = 3;
    String replicas = "--replicas=" + replicaCount;
    // TODO: splits disabled until "phase 2" is complete.
    String splitPolicy = "--splitPolicy=" + DisabledRegionSplitPolicy.class.getName();
    String writeOpts = format("%s --nomapred --table=%s --presplit=16 sequentialWrite 4",
      splitPolicy, tableName);
    String readOpts =
      format("--nomapred --table=%s --latency --sampleRate=0.1 randomRead 4", tableName);
    String replicaReadOpts = format("%s %s", replicas, readOpts);

    ArrayList<TimingResult> resultsWithoutReplicas = new ArrayList<TimingResult>(maxIters);
    ArrayList<TimingResult> resultsWithReplicas = new ArrayList<TimingResult>(maxIters);

    // create/populate the table, replicas disabled
    LOG.debug("Populating table.");
    new PerfEvalCallable(util.getHBaseAdmin(), writeOpts).call();

    // one last sanity check, then send in the clowns!
    assertEquals("Table must be created with DisabledRegionSplitPolicy. Broken test.",
        DisabledRegionSplitPolicy.class.getName(),
        util.getHBaseAdmin().getTableDescriptor(tableName).getRegionSplitPolicyClassName());
    startMonkey();

    // collect a baseline without region replicas.
    for (int i = 0; i < maxIters; i++) {
      LOG.debug("Launching non-replica job " + (i + 1) + "/" + maxIters);
      resultsWithoutReplicas.add(new PerfEvalCallable(util.getHBaseAdmin(), readOpts).call());
      // TODO: sleep to let cluster stabilize, though monkey continues. is it necessary?
      Thread.sleep(5000l);
    }

    // disable monkey, enable region replicas, enable monkey
    cleanUpMonkey("Altering table.");
    LOG.debug("Altering " + tableName + " replica count to " + replicaCount);
    IntegrationTestingUtility.setReplicas(util.getHBaseAdmin(), tableName, replicaCount);
    setUpMonkey();
    startMonkey();

    // run test with region replicas.
    for (int i = 0; i < maxIters; i++) {
      LOG.debug("Launching replica job " + (i + 1) + "/" + maxIters);
      resultsWithReplicas.add(new PerfEvalCallable(util.getHBaseAdmin(), replicaReadOpts).call());
      // TODO: sleep to let cluster stabilize, though monkey continues. is it necessary?
      Thread.sleep(5000l);
    }

    // compare the average of the stdev and 99.99pct across runs to determine if region replicas
    // are having an overall improvement on response variance experienced by clients.
    double withoutReplicasStdevMean =
        calcMean("withoutReplicas", Stat.STDEV, resultsWithoutReplicas);
    double withoutReplicas9999Mean =
        calcMean("withoutReplicas", Stat.FOUR_9S, resultsWithoutReplicas);
    double withReplicasStdevMean =
        calcMean("withReplicas", Stat.STDEV, resultsWithReplicas);
    double withReplicas9999Mean =
        calcMean("withReplicas", Stat.FOUR_9S, resultsWithReplicas);

    LOG.info(Objects.toStringHelper(this)
      .add("withoutReplicas", resultsWithoutReplicas)
      .add("withReplicas", resultsWithReplicas)
      .add("withoutReplicasStdevMean", withoutReplicasStdevMean)
      .add("withoutReplicas99.99Mean", withoutReplicas9999Mean)
      .add("withReplicasStdevMean", withReplicasStdevMean)
      .add("withReplicas99.99Mean", withReplicas9999Mean)
      .toString());

    assertTrue(
      "Running with region replicas under chaos should have less request variance than without. "
      + "withReplicas.stdev.mean: " + withReplicasStdevMean + "ms "
      + "withoutReplicas.stdev.mean: " + withoutReplicasStdevMean + "ms.",
      withReplicasStdevMean <= withoutReplicasStdevMean);
    assertTrue(
        "Running with region replicas under chaos should improve 99.99pct latency. "
            + "withReplicas.99.99.mean: " + withReplicas9999Mean + "ms "
            + "withoutReplicas.99.99.mean: " + withoutReplicas9999Mean + "ms.",
        withReplicas9999Mean <= withoutReplicas9999Mean);
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = HBaseConfiguration.create();
    IntegrationTestingUtility.setUseDistributedCluster(conf);
    int status = ToolRunner.run(conf, new IntegrationTestRegionReplicaPerf(), args);
    System.exit(status);
  }
}
TOP

Related Classes of org.apache.hadoop.hbase.IntegrationTestRegionReplicaPerf$PerfEvalCallable

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.