Package org.apache.hadoop.hbase

Source Code of org.apache.hadoop.hbase.TestAcidGuarantees$AtomicScanReader

/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;

import java.io.IOException;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.MultithreadedTestUtil.RepeatingTestThread;
import org.apache.hadoop.hbase.MultithreadedTestUtil.TestContext;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.junit.Test;
import org.junit.experimental.categories.Category;

import com.google.common.collect.Lists;

/**
* Test case that uses multiple threads to read and write multifamily rows
* into a table, verifying that reads never see partially-complete writes.
*
* This can run as a junit test, or with a main() function which runs against
* a real cluster (eg for testing with failures, region movement, etc)
*/
@Category(MediumTests.class)
public class TestAcidGuarantees implements Tool {
  protected static final Log LOG = LogFactory.getLog(TestAcidGuarantees.class);
  public static final byte [] TABLE_NAME = Bytes.toBytes("TestAcidGuarantees");
  public static final byte [] FAMILY_A = Bytes.toBytes("A");
  public static final byte [] FAMILY_B = Bytes.toBytes("B");
  public static final byte [] FAMILY_C = Bytes.toBytes("C");
  public static final byte [] QUALIFIER_NAME = Bytes.toBytes("data");

  public static final byte[][] FAMILIES = new byte[][] {
    FAMILY_A, FAMILY_B, FAMILY_C };

  private HBaseTestingUtility util;

  public static int NUM_COLS_TO_CHECK = 50;

  // when run as main
  private Configuration conf;

  private void createTableIfMissing()
    throws IOException {
    try {
      util.createTable(TABLE_NAME, FAMILIES);
    } catch (TableExistsException tee) {
    }
  }

  public TestAcidGuarantees() {
    // Set small flush size for minicluster so we exercise reseeking scanners
    Configuration conf = HBaseConfiguration.create();
    conf.set(HConstants.HREGION_MEMSTORE_FLUSH_SIZE, String.valueOf(128*1024));
    util = new HBaseTestingUtility(conf);
  }
 
  /**
   * Thread that does random full-row writes into a table.
   */
  public static class AtomicityWriter extends RepeatingTestThread {
    Random rand = new Random();
    byte data[] = new byte[10];
    byte targetRows[][];
    byte targetFamilies[][];
    HTable table;
    AtomicLong numWritten = new AtomicLong();
   
    public AtomicityWriter(TestContext ctx, byte targetRows[][],
                           byte targetFamilies[][]) throws IOException {
      super(ctx);
      this.targetRows = targetRows;
      this.targetFamilies = targetFamilies;
      table = new HTable(ctx.getConf(), TABLE_NAME);
    }
    public void doAnAction() throws Exception {
      // Pick a random row to write into
      byte[] targetRow = targetRows[rand.nextInt(targetRows.length)];
      Put p = new Put(targetRow);
      rand.nextBytes(data);

      for (byte[] family : targetFamilies) {
        for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
          byte qualifier[] = Bytes.toBytes("col" + i);
          p.add(family, qualifier, data);
        }
      }
      table.put(p);
      numWritten.getAndIncrement();
    }
  }
 
  /**
   * Thread that does single-row reads in a table, looking for partially
   * completed rows.
   */
  public static class AtomicGetReader extends RepeatingTestThread {
    byte targetRow[];
    byte targetFamilies[][];
    HTable table;
    int numVerified = 0;
    AtomicLong numRead = new AtomicLong();

    public AtomicGetReader(TestContext ctx, byte targetRow[],
                           byte targetFamilies[][]) throws IOException {
      super(ctx);
      this.targetRow = targetRow;
      this.targetFamilies = targetFamilies;
      table = new HTable(ctx.getConf(), TABLE_NAME);
    }

    public void doAnAction() throws Exception {
      Get g = new Get(targetRow);
      Result res = table.get(g);
      byte[] gotValue = null;
      if (res.getRow() == null) {
        // Trying to verify but we didn't find the row - the writing
        // thread probably just hasn't started writing yet, so we can
        // ignore this action
        return;
      }
     
      for (byte[] family : targetFamilies) {
        for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
          byte qualifier[] = Bytes.toBytes("col" + i);
          byte thisValue[] = res.getValue(family, qualifier);
          if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
            gotFailure(gotValue, res);
          }
          numVerified++;
          gotValue = thisValue;
        }
      }
      numRead.getAndIncrement();
    }

    private void gotFailure(byte[] expected, Result res) {
      StringBuilder msg = new StringBuilder();
      msg.append("Failed after ").append(numVerified).append("!");
      msg.append("Expected=").append(Bytes.toStringBinary(expected));
      msg.append("Got:\n");
      for (KeyValue kv : res.list()) {
        msg.append(kv.toString());
        msg.append(" val= ");
        msg.append(Bytes.toStringBinary(kv.getValue()));
        msg.append("\n");
      }
      throw new RuntimeException(msg.toString());
    }
  }
 
  /**
   * Thread that does full scans of the table looking for any partially completed
   * rows.
   */
  public static class AtomicScanReader extends RepeatingTestThread {
    byte targetFamilies[][];
    HTable table;
    AtomicLong numScans = new AtomicLong();
    AtomicLong numRowsScanned = new AtomicLong();

    public AtomicScanReader(TestContext ctx,
                           byte targetFamilies[][]) throws IOException {
      super(ctx);
      this.targetFamilies = targetFamilies;
      table = new HTable(ctx.getConf(), TABLE_NAME);
    }

    public void doAnAction() throws Exception {
      Scan s = new Scan();
      for (byte[] family : targetFamilies) {
        s.addFamily(family);
      }
      ResultScanner scanner = table.getScanner(s);
     
      for (Result res : scanner) {
        byte[] gotValue = null;
 
        for (byte[] family : targetFamilies) {
          for (int i = 0; i < NUM_COLS_TO_CHECK; i++) {
            byte qualifier[] = Bytes.toBytes("col" + i);
            byte thisValue[] = res.getValue(family, qualifier);
            if (gotValue != null && !Bytes.equals(gotValue, thisValue)) {
              gotFailure(gotValue, res);
            }
            gotValue = thisValue;
          }
        }
        numRowsScanned.getAndIncrement();
      }
      numScans.getAndIncrement();
    }

    private void gotFailure(byte[] expected, Result res) {
      StringBuilder msg = new StringBuilder();
      msg.append("Failed after ").append(numRowsScanned).append("!");
      msg.append("Expected=").append(Bytes.toStringBinary(expected));
      msg.append("Got:\n");
      for (KeyValue kv : res.list()) {
        msg.append(kv.toString());
        msg.append(" val= ");
        msg.append(Bytes.toStringBinary(kv.getValue()));
        msg.append("\n");
      }
      throw new RuntimeException(msg.toString());
    }
  }


  public void runTestAtomicity(long millisToRun,
      int numWriters,
      int numGetters,
      int numScanners,
      int numUniqueRows) throws Exception {
      runTestAtomicity(millisToRun, numWriters, numGetters, numScanners,
           numUniqueRows, true);
  }

  public void runTestAtomicity(long millisToRun,
      int numWriters,
      int numGetters,
      int numScanners,
      int numUniqueRows, boolean useFlusher) throws Exception {
    createTableIfMissing();
    TestContext ctx = new TestContext(util.getConfiguration());
   
    byte rows[][] = new byte[numUniqueRows][];
    for (int i = 0; i < numUniqueRows; i++) {
      rows[i] = Bytes.toBytes("test_row_" + i);
    }
   
    List<AtomicityWriter> writers = Lists.newArrayList();
    for (int i = 0; i < numWriters; i++) {
      AtomicityWriter writer = new AtomicityWriter(
          ctx, rows, FAMILIES);
      writers.add(writer);
      ctx.addThread(writer);
    }
    // Add a flusher
    if (useFlusher) {
      ctx.addThread(new RepeatingTestThread(ctx) {
        public void doAnAction() throws Exception {
          util.flush();
        }
      });
    }

    List<AtomicGetReader> getters = Lists.newArrayList();
    for (int i = 0; i < numGetters; i++) {
      AtomicGetReader getter = new AtomicGetReader(
          ctx, rows[i % numUniqueRows], FAMILIES);
      getters.add(getter);
      ctx.addThread(getter);
    }
   
    List<AtomicScanReader> scanners = Lists.newArrayList();
    for (int i = 0; i < numScanners; i++) {
      AtomicScanReader scanner = new AtomicScanReader(ctx, FAMILIES);
      scanners.add(scanner);
      ctx.addThread(scanner);
    }
   
    ctx.startThreads();
    ctx.waitFor(millisToRun);
    ctx.stop();
   
    LOG.info("Finished test. Writers:");
    for (AtomicityWriter writer : writers) {
      LOG.info("  wrote " + writer.numWritten.get());
    }
    LOG.info("Readers:");
    for (AtomicGetReader reader : getters) {
      LOG.info("  read " + reader.numRead.get());
    }
    LOG.info("Scanners:");
    for (AtomicScanReader scanner : scanners) {
      LOG.info("  scanned " + scanner.numScans.get());
      LOG.info("  verified " + scanner.numRowsScanned.get() + " rows");
    }
  }

  @Test
  public void testGetAtomicity() throws Exception {
    util.startMiniCluster(1);
    try {
      runTestAtomicity(20000, 5, 5, 0, 3);
    } finally {
      util.shutdownMiniCluster();
    }   
  }

  @Test
  public void testScanAtomicity() throws Exception {
    util.startMiniCluster(1);
    try {
      runTestAtomicity(20000, 5, 0, 5, 3);
    } finally {
      util.shutdownMiniCluster();
    }   
  }

  @Test
  public void testMixedAtomicity() throws Exception {
    util.startMiniCluster(1);
    try {
      runTestAtomicity(20000, 5, 2, 2, 3);
    } finally {
      util.shutdownMiniCluster();
    }   
  }

  ////////////////////////////////////////////////////////////////////////////
  // Tool interface
  ////////////////////////////////////////////////////////////////////////////
  @Override
  public Configuration getConf() {
    return conf;
  }

  @Override
  public void setConf(Configuration c) {
    this.conf = c;
    this.util = new HBaseTestingUtility(c);
  }

  @Override
  public int run(String[] arg0) throws Exception {
    Configuration c = getConf();
    int millis = c.getInt("millis", 5000);
    int numWriters = c.getInt("numWriters", 50);
    int numGetters = c.getInt("numGetters", 2);
    int numScanners = c.getInt("numScanners", 2);
    int numUniqueRows = c.getInt("numUniqueRows", 3);
    // cannot run flusher in real cluster case.
    runTestAtomicity(millis, numWriters, numGetters, numScanners, numUniqueRows, false);
    return 0;
  }

  public static void main(String args[]) throws Exception {
    Configuration c = HBaseConfiguration.create();
    int status;
    try {
      TestAcidGuarantees test = new TestAcidGuarantees();
      status = ToolRunner.run(c, test, args);
    } catch (Exception e) {
      LOG.error("Exiting due to error", e);
      status = -1;
    }
    System.exit(status);
  }


  @org.junit.Rule
  public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
    new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
}
TOP

Related Classes of org.apache.hadoop.hbase.TestAcidGuarantees$AtomicScanReader

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.