Package org.apache.hadoop.hbase.regionserver.transactional

Source Code of org.apache.hadoop.hbase.regionserver.transactional.DisabledTestHLogRecovery

/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.transactional;

import java.io.IOException;
import java.util.Collection;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClusterTestCase;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.LocalHBaseCluster;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Scanner;
import org.apache.hadoop.hbase.client.transactional.CommitUnsuccessfulException;
import org.apache.hadoop.hbase.client.transactional.TransactionManager;
import org.apache.hadoop.hbase.client.transactional.TransactionState;
import org.apache.hadoop.hbase.client.transactional.TransactionalTable;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;

public class DisabledTestHLogRecovery extends HBaseClusterTestCase {
  private static final Log LOG = LogFactory.getLog(DisabledTestHLogRecovery.class);

  private static final String TABLE_NAME = "table1";

  private static final byte[] FAMILY = Bytes.toBytes("family:");
  private static final byte[] COL_A = Bytes.toBytes("family:a");

  private static final byte[] ROW1 = Bytes.toBytes("row1");
  private static final byte[] ROW2 = Bytes.toBytes("row2");
  private static final byte[] ROW3 = Bytes.toBytes("row3");
  private static final int TOTAL_VALUE = 10;

  private HBaseAdmin admin;
  private TransactionManager transactionManager;
  private TransactionalTable table;

  /** constructor */
  public DisabledTestHLogRecovery() {
    super(2, false);

    conf.set(HConstants.REGION_SERVER_CLASS, TransactionalRegionInterface.class
        .getName());
    conf.set(HConstants.REGION_SERVER_IMPL, TransactionalRegionServer.class
        .getName());

    // Set flush params so we don't get any
    // FIXME (defaults are probably fine)

    // Copied from TestRegionServerExit
    conf.setInt("ipc.client.connect.max.retries", 5); // reduce ipc retries
    conf.setInt("ipc.client.timeout", 10000); // and ipc timeout
    conf.setInt("hbase.client.pause", 10000); // increase client timeout
    conf.setInt("hbase.client.retries.number", 10); // increase HBase retries
  }

  @Override
  protected void setUp() throws Exception {
    FileSystem.getLocal(conf).delete(new Path(conf.get(HConstants.HBASE_DIR)), true);
    super.setUp();

    HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
    desc.addFamily(new HColumnDescriptor(FAMILY));
    admin = new HBaseAdmin(conf);
    admin.createTable(desc);
    table = new TransactionalTable(conf, desc.getName());

    transactionManager = new TransactionManager(conf);
    writeInitalRows();
  }

  private void writeInitalRows() throws IOException {
    BatchUpdate update = new BatchUpdate(ROW1);
    update.put(COL_A, Bytes.toBytes(TOTAL_VALUE));
    table.commit(update);
    update = new BatchUpdate(ROW2);
    update.put(COL_A, Bytes.toBytes(0));
    table.commit(update);
    update = new BatchUpdate(ROW3);
    update.put(COL_A, Bytes.toBytes(0));
    table.commit(update);
  }

  public void testWithoutFlush() throws IOException,
      CommitUnsuccessfulException {
    writeInitalRows();
    TransactionState state1 = makeTransaction(false);
    transactionManager.tryCommit(state1);
    stopOrAbortRegionServer(true);

    Thread t = startVerificationThread(1);
    t.start();
    threadDumpingJoin(t);
  }

  public void testWithFlushBeforeCommit() throws IOException,
      CommitUnsuccessfulException {
    writeInitalRows();
    TransactionState state1 = makeTransaction(false);
    flushRegionServer();
    transactionManager.tryCommit(state1);
    stopOrAbortRegionServer(true);

    Thread t = startVerificationThread(1);
    t.start();
    threadDumpingJoin(t);
  }

  // FIXME, TODO
  // public void testWithFlushBetweenTransactionWrites() {
  // fail();
  // }

  private void flushRegionServer() {
    List<LocalHBaseCluster.RegionServerThread> regionThreads = cluster
        .getRegionThreads();

    HRegion region = null;
    int server = -1;
    for (int i = 0; i < regionThreads.size() && server == -1; i++) {
      HRegionServer s = regionThreads.get(i).getRegionServer();
      Collection<HRegion> regions = s.getOnlineRegions();
      for (HRegion r : regions) {
        if (Bytes.equals(r.getTableDesc().getName(), Bytes.toBytes(TABLE_NAME))) {
          server = i;
          region = r;
        }
      }
    }
    if (server == -1) {
      LOG.fatal("could not find region server serving table region");
      fail();
    }
    ((TransactionalRegionServer) regionThreads.get(server).getRegionServer())
        .getFlushRequester().request(region);
  }

  /**
   * Stop the region server serving TABLE_NAME.
   *
   * @param abort set to true if region server should be aborted, if false it is
   * just shut down.
   */
  private void stopOrAbortRegionServer(final boolean abort) {
    List<LocalHBaseCluster.RegionServerThread> regionThreads = cluster
        .getRegionThreads();

    int server = -1;
    for (int i = 0; i < regionThreads.size(); i++) {
      HRegionServer s = regionThreads.get(i).getRegionServer();
      Collection<HRegion> regions = s.getOnlineRegions();
      LOG.info("server: " + regionThreads.get(i).getName());
      for (HRegion r : regions) {
        LOG.info("region: " + r.getRegionInfo().getRegionNameAsString());
        if (Bytes.equals(r.getTableDesc().getName(), Bytes.toBytes(TABLE_NAME))) {
          server = i;
        }
      }
    }
    if (server == -1) {
      LOG.fatal("could not find region server serving table region");
      fail();
    }
    if (abort) {
      this.cluster.abortRegionServer(server);

    } else {
      this.cluster.stopRegionServer(server);
    }
    LOG.info(this.cluster.waitOnRegionServer(server) + " has been "
        + (abort ? "aborted" : "shut down"));
  }

  private void verify(final int numRuns) throws IOException {
    // Reads
    int row1 = Bytes.toInt(table.get(ROW1, COL_A).getValue());
    int row2 = Bytes.toInt(table.get(ROW2, COL_A).getValue());
    int row3 = Bytes.toInt(table.get(ROW3, COL_A).getValue());

    assertEquals(TOTAL_VALUE - 2 * numRuns, row1);
    assertEquals(numRuns, row2);
    assertEquals(numRuns, row3);
  }

  // Move 2 out of ROW1 and 1 into ROW2 and 1 into ROW3
  private TransactionState makeTransaction(final boolean flushMidWay)
      throws IOException {
    TransactionState transactionState = transactionManager.beginTransaction();

    // Reads
    int row1 = Bytes.toInt(table.get(transactionState, ROW1, COL_A).getValue());
    int row2 = Bytes.toInt(table.get(transactionState, ROW2, COL_A).getValue());
    int row3 = Bytes.toInt(table.get(transactionState, ROW3, COL_A).getValue());

    row1 -= 2;
    row2 += 1;
    row3 += 1;

    if (flushMidWay) {
      flushRegionServer();
    }

    // Writes
    BatchUpdate write = new BatchUpdate(ROW1);
    write.put(COL_A, Bytes.toBytes(row1));
    table.commit(transactionState, write);

    write = new BatchUpdate(ROW2);
    write.put(COL_A, Bytes.toBytes(row2));
    table.commit(transactionState, write);

    write = new BatchUpdate(ROW3);
    write.put(COL_A, Bytes.toBytes(row3));
    table.commit(transactionState, write);

    return transactionState;
  }

  /*
   * Run verification in a thread so I can concurrently run a thread-dumper
   * while we're waiting (because in this test sometimes the meta scanner looks
   * to be be stuck). @param tableName Name of table to find. @param row Row we
   * expect to find. @return Verification thread. Caller needs to calls start on
   * it.
   */
  private Thread startVerificationThread(final int numRuns) {
    Runnable runnable = new Runnable() {
      public void run() {
        try {
          // Now try to open a scanner on the meta table. Should stall until
          // meta server comes back up.
          HTable t = new HTable(conf, TABLE_NAME);
          Scanner s = t.getScanner(new byte[][] { COL_A },
              HConstants.EMPTY_START_ROW);
          s.close();

        } catch (IOException e) {
          LOG.fatal("could not re-open meta table because", e);
          fail();
        }
        Scanner scanner = null;
        try {
          verify(numRuns);
          LOG.info("Success!");
        } catch (Exception e) {
          e.printStackTrace();
          fail();
        } finally {
          if (scanner != null) {
            LOG.info("Closing scanner " + scanner);
            scanner.close();
          }
        }
      }
    };
    return new Thread(runnable);
  }
}
TOP

Related Classes of org.apache.hadoop.hbase.regionserver.transactional.DisabledTestHLogRecovery

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.