/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.chaos.actions;
import java.io.IOException;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;
import org.apache.commons.lang.math.RandomUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HBaseCluster;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.IntegrationTestingUtility;
import org.apache.hadoop.hbase.ServerLoad;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.util.Bytes;
/**
* A (possibly mischievous) action that the ChaosMonkey can perform.
*/
public class Action {
protected static Log LOG = LogFactory.getLog(Action.class);
protected ActionContext context;
protected HBaseCluster cluster;
protected ClusterStatus initialStatus;
protected ServerName[] initialServers;
public void init(ActionContext context) throws IOException {
this.context = context;
cluster = context.getHBaseCluster();
initialStatus = cluster.getInitialClusterStatus();
Collection<ServerName> regionServers = initialStatus.getServers();
initialServers = regionServers.toArray(new ServerName[regionServers.size()]);
}
public void perform() throws Exception { };
/** Returns current region servers */
protected ServerName[] getCurrentServers() throws IOException {
Collection<ServerName> regionServers = cluster.getClusterStatus().getServers();
if (regionServers == null || regionServers.size() <= 0) return new ServerName [] {};
return regionServers.toArray(new ServerName[regionServers.size()]);
}
protected void killMaster(ServerName server) throws IOException {
LOG.info("Killing master:" + server);
cluster.killMaster(server);
cluster.waitForMasterToStop(server, PolicyBasedChaosMonkey.TIMEOUT);
LOG.info("Killed master server:" + server);
}
protected void startMaster(ServerName server) throws IOException {
LOG.info("Starting master:" + server.getHostname());
cluster.startMaster(server.getHostname());
cluster.waitForActiveAndReadyMaster(PolicyBasedChaosMonkey.TIMEOUT);
LOG.info("Started master: " + server);
}
protected void killRs(ServerName server) throws IOException {
LOG.info("Killing region server:" + server);
cluster.killRegionServer(server);
cluster.waitForRegionServerToStop(server, PolicyBasedChaosMonkey.TIMEOUT);
LOG.info("Killed region server:" + server + ". Reported num of rs:"
+ cluster.getClusterStatus().getServersSize());
}
protected void startRs(ServerName server) throws IOException {
LOG.info("Starting region server:" + server.getHostname());
cluster.startRegionServer(server.getHostname());
cluster.waitForRegionServerToStart(server.getHostname(), PolicyBasedChaosMonkey.TIMEOUT);
LOG.info("Started region server:" + server + ". Reported num of rs:"
+ cluster.getClusterStatus().getServersSize());
}
protected void unbalanceRegions(ClusterStatus clusterStatus,
List<ServerName> fromServers, List<ServerName> toServers,
double fractionOfRegions) throws Exception {
List<byte[]> victimRegions = new LinkedList<byte[]>();
for (ServerName server : fromServers) {
ServerLoad serverLoad = clusterStatus.getLoad(server);
// Ugh.
List<byte[]> regions = new LinkedList<byte[]>(serverLoad.getRegionsLoad().keySet());
int victimRegionCount = (int)Math.ceil(fractionOfRegions * regions.size());
LOG.debug("Removing " + victimRegionCount + " regions from " + server.getServerName());
for (int i = 0; i < victimRegionCount; ++i) {
int victimIx = RandomUtils.nextInt(regions.size());
String regionId = HRegionInfo.encodeRegionName(regions.remove(victimIx));
victimRegions.add(Bytes.toBytes(regionId));
}
}
LOG.info("Moving " + victimRegions.size() + " regions from " + fromServers.size()
+ " servers to " + toServers.size() + " different servers");
HBaseAdmin admin = this.context.getHaseIntegrationTestingUtility().getHBaseAdmin();
for (byte[] victimRegion : victimRegions) {
int targetIx = RandomUtils.nextInt(toServers.size());
admin.move(victimRegion, Bytes.toBytes(toServers.get(targetIx).getServerName()));
}
}
protected void forceBalancer() throws Exception {
HBaseAdmin admin = this.context.getHaseIntegrationTestingUtility().getHBaseAdmin();
boolean result = admin.balancer();
if (!result) {
LOG.error("Balancer didn't succeed");
}
}
/**
* Context for Action's
*/
public static class ActionContext {
private IntegrationTestingUtility util;
public ActionContext(IntegrationTestingUtility util) {
this.util = util;
}
public IntegrationTestingUtility getHaseIntegrationTestingUtility() {
return util;
}
public HBaseCluster getHBaseCluster() {
return util.getHBaseClusterInterface();
}
}
}