/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerLoad;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.catalog.RootLocationEditor;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.RegionTransitionData;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.master.AssignmentManager.RegionState.State;
import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
import org.apache.hadoop.hbase.master.handler.SplitRegionHandler;
import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZKTable;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.data.Stat;
/**
* Manages and performs region assignment.
* <p>
* Monitors ZooKeeper for events related to regions in transition.
* <p>
* Handles existing regions in transition during master failover.
*/
public class AssignmentManager extends ZooKeeperListener {
private static final Log LOG = LogFactory.getLog(AssignmentManager.class);
protected Server master;
private ServerManager serverManager;
private CatalogTracker catalogTracker;
private TimeoutMonitor timeoutMonitor;
private TimerUpdater timerUpdater;
private LoadBalancer balancer;
/**
* Map of regions to reopen after the schema of a table is changed. Key -
* encoded region name, value - HRegionInfo
*/
private final Map <String, HRegionInfo> regionsToReopen;
/*
* Maximum times we recurse an assignment. See below in {@link #assign()}.
*/
private final int maximumAssignmentAttempts;
/**
* Regions currently in transition. Map of encoded region names to the master
* in-memory state for that region.
*/
final ConcurrentSkipListMap<String, RegionState> regionsInTransition =
new ConcurrentSkipListMap<String, RegionState>();
/** Plans for region movement. Key is the encoded version of a region name*/
// TODO: When do plans get cleaned out? Ever? In server open and in server
// shutdown processing -- St.Ack
// All access to this Map must be synchronized.
final NavigableMap<String, RegionPlan> regionPlans =
new TreeMap<String, RegionPlan>();
private final ZKTable zkTable;
// store all the table names in disabling state
Set<String> disablingTables = new HashSet<String>(1);
// store all the enabling state table names and corresponding online servers' regions.
// This may be needed to avoid calling assign twice for the regions of the ENABLING table
// that could have been assigned through processRIT.
Map<String, List<HRegionInfo>> enablingTables = new HashMap<String, List<HRegionInfo>>(1);
/**
* Server to regions assignment map.
* Contains the set of regions currently assigned to a given server.
* This Map and {@link #regions} are tied. Always update this in tandem
* with the other under a lock on {@link #regions}.
* @see #regions
*/
private final NavigableMap<ServerName, Set<HRegionInfo>> servers =
new TreeMap<ServerName, Set<HRegionInfo>>();
/**
* Contains the server which need to update timer, these servers will be
* handled by {@link TimerUpdater}
*/
private final ConcurrentSkipListSet<ServerName> serversInUpdatingTimer =
new ConcurrentSkipListSet<ServerName>();
/**
* Region to server assignment map.
* Contains the server a given region is currently assigned to.
* This Map and {@link #servers} are tied. Always update this in tandem
* with the other under a lock on {@link #regions}.
* @see #servers
*/
private final SortedMap<HRegionInfo, ServerName> regions =
new TreeMap<HRegionInfo, ServerName>();
private final ExecutorService executorService;
//Thread pool executor service for timeout monitor
private java.util.concurrent.ExecutorService threadPoolExecutorService;
private List<EventType> ignoreStatesRSOffline = Arrays.asList(new EventType[]{
EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED });
/**
* Set when we are doing master failover processing; cleared when failover
* completes.
*/
private volatile boolean failover = false;
// Set holding all the regions which got processed while RIT was not
// populated during master failover.
private Map<String, HRegionInfo> failoverProcessedRegions =
new HashMap<String, HRegionInfo>();
/**
* Constructs a new assignment manager.
*
* @param master
* @param serverManager
* @param catalogTracker
* @param service
* @throws KeeperException
* @throws IOException
*/
public AssignmentManager(Server master, ServerManager serverManager,
CatalogTracker catalogTracker, final LoadBalancer balancer,
final ExecutorService service) throws KeeperException, IOException {
super(master.getZooKeeper());
this.master = master;
this.serverManager = serverManager;
this.catalogTracker = catalogTracker;
this.executorService = service;
this.regionsToReopen = Collections.synchronizedMap
(new HashMap<String, HRegionInfo> ());
Configuration conf = master.getConfiguration();
this.timeoutMonitor = new TimeoutMonitor(
conf.getInt("hbase.master.assignment.timeoutmonitor.period", 10000),
master, serverManager,
conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", 1800000));
this.timerUpdater = new TimerUpdater(conf.getInt(
"hbase.master.assignment.timerupdater.period", 10000), master);
Threads.setDaemonThreadRunning(timerUpdater.getThread(),
master.getServerName() + ".timerUpdater");
this.zkTable = new ZKTable(this.master.getZooKeeper());
this.maximumAssignmentAttempts =
this.master.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10);
this.balancer = balancer;
this.threadPoolExecutorService = Executors.newCachedThreadPool();
}
void startTimeOutMonitor() {
Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), master.getServerName()
+ ".timeoutMonitor");
}
/**
* Compute the average load across all region servers.
* Currently, this uses a very naive computation - just uses the number of
* regions being served, ignoring stats about number of requests.
* @return the average load
*/
double getAverageLoad() {
int totalLoad = 0;
int numServers = 0;
// Sync on this.regions because access to this.servers always synchronizes
// in this order.
synchronized (this.regions) {
for (Map.Entry<ServerName, Set<HRegionInfo>> e: servers.entrySet()) {
numServers++;
totalLoad += e.getValue().size();
}
}
return (double)totalLoad / (double)numServers;
}
/**
* @return Instance of ZKTable.
*/
public ZKTable getZKTable() {
// These are 'expensive' to make involving trip to zk ensemble so allow
// sharing.
return this.zkTable;
}
/**
* Returns the RegionServer to which hri is assigned.
*
* @param hri
* HRegion for which this function returns the region server
* @return HServerInfo The region server to which hri belongs
*/
public ServerName getRegionServerOfRegion(HRegionInfo hri) {
synchronized (this.regions ) {
return regions.get(hri);
}
}
/**
* Checks whether the region is assigned.
* @param hri HRegion for which this function returns the result
* @return True iff assigned.
*/
public boolean isRegionAssigned(HRegionInfo hri) {
synchronized (this.regions ) {
return regions.containsKey(hri);
}
}
/**
* Gives enabling table regions.
*
* @param tableName
* @return list of regionInfos
*/
public List<HRegionInfo> getEnablingTableRegions(String tableName){
return this.enablingTables.get(tableName);
}
/**
* Add a regionPlan for the specified region.
* @param encodedName
* @param plan
*/
public void addPlan(String encodedName, RegionPlan plan) {
synchronized (regionPlans) {
regionPlans.put(encodedName, plan);
}
}
/**
* Add a map of region plans.
*/
public void addPlans(Map<String, RegionPlan> plans) {
synchronized (regionPlans) {
regionPlans.putAll(plans);
}
}
/**
* Set the list of regions that will be reopened
* because of an update in table schema
*
* @param regions
* list of regions that should be tracked for reopen
*/
public void setRegionsToReopen(List <HRegionInfo> regions) {
for(HRegionInfo hri : regions) {
regionsToReopen.put(hri.getEncodedName(), hri);
}
}
/**
* Used by the client to identify if all regions have the schema updates
*
* @param tableName
* @return Pair indicating the status of the alter command
* @throws IOException
*/
public Pair<Integer, Integer> getReopenStatus(byte[] tableName)
throws IOException {
List <HRegionInfo> hris =
MetaReader.getTableRegions(this.master.getCatalogTracker(), tableName);
Integer pending = 0;
for(HRegionInfo hri : hris) {
String name = hri.getEncodedName();
if (regionsToReopen.containsKey(name) || regionsInTransition.containsKey(name)) {
pending++;
}
}
return new Pair<Integer, Integer>(pending, hris.size());
}
/**
* Reset all unassigned znodes. Called on startup of master.
* Call {@link #assignAllUserRegions()} after root and meta have been assigned.
* @throws IOException
* @throws KeeperException
*/
void cleanoutUnassigned() throws IOException, KeeperException {
// Cleanup any existing ZK nodes and start watching
ZKAssign.deleteAllNodes(watcher);
ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
this.watcher.assignmentZNode);
}
/**
* Called on startup.
* Figures whether a fresh cluster start of we are joining extant running cluster.
* @throws IOException
* @throws KeeperException
* @throws InterruptedException
*/
void joinCluster() throws IOException,
KeeperException, InterruptedException {
// Concurrency note: In the below the accesses on regionsInTransition are
// outside of a synchronization block where usually all accesses to RIT are
// synchronized. The presumption is that in this case it is safe since this
// method is being played by a single thread on startup.
// TODO: Regions that have a null location and are not in regionsInTransitions
// need to be handled.
// Scan META to build list of existing regions, servers, and assignment
// Returns servers who have not checked in (assumed dead) and their regions
Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers = rebuildUserRegions();
processDeadServersAndRegionsInTransition(deadServers);
// Recover the tables that were not fully moved to DISABLED state.
// These tables are in DISABLING state when the master restarted/switched.
boolean isWatcherCreated = recoverTableInDisablingState(this.disablingTables);
recoverTableInEnablingState(this.enablingTables.keySet(), isWatcherCreated);
this.enablingTables.clear();
this.disablingTables.clear();
}
/**
* Process all regions that are in transition up in zookeeper. Used by
* master joining an already running cluster.
* @throws KeeperException
* @throws IOException
* @throws InterruptedException
*/
void processDeadServersAndRegionsInTransition()
throws KeeperException, IOException, InterruptedException {
// Pass null to signify no dead servers in this context.
processDeadServersAndRegionsInTransition(null);
}
/**
* Process all regions that are in transition in zookeeper and also
* processes the list of dead servers by scanning the META.
* Used by master joining an cluster.
* @param deadServers
* Map of dead servers and their regions. Can be null.
* @throws KeeperException
* @throws IOException
* @throws InterruptedException
*/
void processDeadServersAndRegionsInTransition(
final Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers)
throws KeeperException, IOException, InterruptedException {
List<String> nodes = ZKUtil.listChildrenAndWatchForNewChildren(watcher,
watcher.assignmentZNode);
if (nodes == null) {
String errorMessage = "Failed to get the children from ZK";
master.abort(errorMessage, new IOException(errorMessage));
return;
}
// Run through all regions. If they are not assigned and not in RIT, then
// its a clean cluster startup, else its a failover.
synchronized (this.regions) {
for (Map.Entry<HRegionInfo, ServerName> e : this.regions.entrySet()) {
if (!e.getKey().isMetaTable() && e.getValue() != null) {
LOG.debug("Found " + e + " out on cluster");
this.failover = true;
break;
}
if (nodes.contains(e.getKey().getEncodedName())) {
LOG.debug("Found " + e.getKey().getRegionNameAsString() + " in RITs");
// Could be a meta region.
this.failover = true;
break;
}
}
}
// Remove regions in RIT, they are possibly being processed by
// ServerShutdownHandler.
synchronized (regionsInTransition) {
nodes.removeAll(regionsInTransition.keySet());
}
// If some dead servers are processed by ServerShutdownHandler, we shouldn't
// assign all user regions( some would be assigned by
// ServerShutdownHandler), consider it as a failover
if (!this.serverManager.getDeadServers().isEmpty()) {
this.failover = true;
}
// If we found user regions out on cluster, its a failover.
if (this.failover) {
LOG.info("Found regions out on cluster or in RIT; failover");
// Process list of dead servers and regions in RIT.
// See HBASE-4580 for more information.
processDeadServersAndRecoverLostRegions(deadServers, nodes);
this.failover = false;
failoverProcessedRegions.clear();
} else {
// Fresh cluster startup.
LOG.info("Clean cluster startup. Assigning userregions");
cleanoutUnassigned();
assignAllUserRegions();
}
}
/**
* If region is up in zk in transition, then do fixup and block and wait until
* the region is assigned and out of transition. Used on startup for
* catalog regions.
* @param hri Region to look for.
* @return True if we processed a region in transition else false if region
* was not up in zk in transition.
* @throws InterruptedException
* @throws KeeperException
* @throws IOException
*/
boolean processRegionInTransitionAndBlockUntilAssigned(final HRegionInfo hri)
throws InterruptedException, KeeperException, IOException {
boolean intransistion =
processRegionInTransition(hri.getEncodedName(), hri, null);
if (!intransistion) return intransistion;
LOG.debug("Waiting on " + HRegionInfo.prettyPrint(hri.getEncodedName()));
synchronized(this.regionsInTransition) {
while (!this.master.isStopped() &&
this.regionsInTransition.containsKey(hri.getEncodedName())) {
// We expect a notify, but by security we set a timout
this.regionsInTransition.wait(100);
}
}
return intransistion;
}
/**
* Process failover of new master for region <code>encodedRegionName</code>
* up in zookeeper.
* @param encodedRegionName Region to process failover for.
* @param regionInfo If null we'll go get it from meta table.
* @param deadServers Can be null
* @return True if we processed <code>regionInfo</code> as a RIT.
* @throws KeeperException
* @throws IOException
*/
boolean processRegionInTransition(final String encodedRegionName,
final HRegionInfo regionInfo,
final Map<ServerName,List<Pair<HRegionInfo,Result>>> deadServers)
throws KeeperException, IOException {
Stat stat = new Stat();
RegionTransitionData data = ZKAssign.getDataAndWatch(watcher,
encodedRegionName, stat);
if (data == null) return false;
HRegionInfo hri = regionInfo;
if (hri == null) {
if ((hri = getHRegionInfo(data)) == null) return false;
}
processRegionsInTransition(data, hri, deadServers, stat.getVersion());
return true;
}
void processRegionsInTransition(final RegionTransitionData data,
final HRegionInfo regionInfo,
final Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers,
int expectedVersion)
throws KeeperException {
String encodedRegionName = regionInfo.getEncodedName();
LOG.info("Processing region " + regionInfo.getRegionNameAsString() +
" in state " + data.getEventType());
List<HRegionInfo> hris = this.enablingTables.get(regionInfo.getTableNameAsString());
if (hris != null && !hris.isEmpty()) {
hris.remove(regionInfo);
}
synchronized (regionsInTransition) {
RegionState regionState = regionsInTransition.get(encodedRegionName);
if (regionState != null ||
failoverProcessedRegions.containsKey(encodedRegionName)) {
// Just return
return;
}
switch (data.getEventType()) {
case M_ZK_REGION_CLOSING:
// If zk node of the region was updated by a live server skip this
// region and just add it into RIT.
if (isOnDeadServer(regionInfo, deadServers) &&
(data.getOrigin() == null || !serverManager.isServerOnline(data.getOrigin()))) {
// If was on dead server, its closed now. Force to OFFLINE and this
// will get it reassigned if appropriate
forceOffline(regionInfo, data);
} else {
// Just insert region into RIT.
// If this never updates the timeout will trigger new assignment
regionsInTransition.put(encodedRegionName, new RegionState(
regionInfo, RegionState.State.CLOSING,
data.getStamp(), data.getOrigin()));
}
failoverProcessedRegions.put(encodedRegionName, regionInfo);
break;
case RS_ZK_REGION_CLOSED:
case RS_ZK_REGION_FAILED_OPEN:
// Region is closed, insert into RIT and handle it
addToRITandCallClose(regionInfo, RegionState.State.CLOSED, data);
failoverProcessedRegions.put(encodedRegionName, regionInfo);
break;
case M_ZK_REGION_OFFLINE:
// If zk node of the region was updated by a live server skip this
// region and just add it into RIT.
if (isOnDeadServer(regionInfo, deadServers) &&
(data.getOrigin() == null ||
!serverManager.isServerOnline(data.getOrigin()))) {
// Region is offline, insert into RIT and handle it like a closed
addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, data);
} else if (data.getOrigin() != null &&
!serverManager.isServerOnline(data.getOrigin())) {
// to handle cases where offline node is created but sendRegionOpen
// RPC is not yet sent
addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, data);
} else {
regionsInTransition.put(encodedRegionName, new RegionState(
regionInfo, RegionState.State.PENDING_OPEN, data.getStamp(), data
.getOrigin()));
}
failoverProcessedRegions.put(encodedRegionName, regionInfo);
break;
case RS_ZK_REGION_OPENING:
// TODO: Could check if it was on deadServers. If it was, then we could
// do what happens in TimeoutMonitor when it sees this condition.
// Just insert region into RIT
// If this never updates the timeout will trigger new assignment
if (regionInfo.isMetaTable()) {
regionsInTransition.put(encodedRegionName, new RegionState(
regionInfo, RegionState.State.OPENING, data.getStamp(), data
.getOrigin()));
// If ROOT or .META. table is waiting for timeout monitor to assign
// it may take lot of time when the assignment.timeout.period is
// the default value which may be very long. We will not be able
// to serve any request during this time.
// So we will assign the ROOT and .META. region immediately.
processOpeningState(regionInfo);
break;
}
regionsInTransition.put(encodedRegionName, new RegionState(regionInfo,
RegionState.State.OPENING, data.getStamp(), data.getOrigin()));
failoverProcessedRegions.put(encodedRegionName, regionInfo);
break;
case RS_ZK_REGION_OPENED:
// Region is opened, insert into RIT and handle it
regionsInTransition.put(encodedRegionName, new RegionState(
regionInfo, RegionState.State.OPEN,
data.getStamp(), data.getOrigin()));
ServerName sn = data.getOrigin() == null? null: data.getOrigin();
// sn could be null if this server is no longer online. If
// that is the case, just let this RIT timeout; it'll be assigned
// to new server then.
if (sn == null) {
LOG.warn("Region in transition " + regionInfo.getEncodedName() +
" references a null server; letting RIT timeout so will be " +
"assigned elsewhere");
} else if (!serverManager.isServerOnline(sn)
&& (isOnDeadServer(regionInfo, deadServers)
|| regionInfo.isMetaRegion() || regionInfo.isRootRegion())) {
forceOffline(regionInfo, data);
} else {
new OpenedRegionHandler(master, this, regionInfo, sn, expectedVersion)
.process();
}
failoverProcessedRegions.put(encodedRegionName, regionInfo);
break;
}
}
}
/**
* Put the region <code>hri</code> into an offline state up in zk.
* @param hri
* @param oldData
* @throws KeeperException
*/
private void forceOffline(final HRegionInfo hri,
final RegionTransitionData oldData)
throws KeeperException {
// If was on dead server, its closed now. Force to OFFLINE and then
// handle it like a close; this will get it reassigned if appropriate
LOG.debug("RIT " + hri.getEncodedName() + " in state=" +
oldData.getEventType() + " was on deadserver; forcing offline");
ZKAssign.createOrForceNodeOffline(this.watcher, hri,
this.master.getServerName());
addToRITandCallClose(hri, RegionState.State.OFFLINE, oldData);
}
/**
* Add to the in-memory copy of regions in transition and then call close
* handler on passed region <code>hri</code>
* @param hri
* @param state
* @param oldData
*/
private void addToRITandCallClose(final HRegionInfo hri,
final RegionState.State state, final RegionTransitionData oldData) {
this.regionsInTransition.put(hri.getEncodedName(),
new RegionState(hri, state, oldData.getStamp(), oldData.getOrigin()));
new ClosedRegionHandler(this.master, this, hri).process();
}
/**
* When a region is closed, it should be removed from the regionsToReopen
* @param hri HRegionInfo of the region which was closed
*/
public void removeClosedRegion(HRegionInfo hri) {
if (!regionsToReopen.isEmpty()) {
if (regionsToReopen.remove(hri.getEncodedName()) != null) {
LOG.debug("Removed region from reopening regions because it was closed");
}
}
}
/**
* @param regionInfo
* @param deadServers Map of deadServers and the regions they were carrying;
* can be null.
* @return True if the passed regionInfo in the passed map of deadServers?
*/
private boolean isOnDeadServer(final HRegionInfo regionInfo,
final Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers) {
if (deadServers == null) return false;
for (Map.Entry<ServerName, List<Pair<HRegionInfo, Result>>> deadServer:
deadServers.entrySet()) {
for (Pair<HRegionInfo, Result> e: deadServer.getValue()) {
if (e.getFirst().equals(regionInfo)) return true;
}
}
return false;
}
/**
* Handles various states an unassigned node can be in.
* <p>
* Method is called when a state change is suspected for an unassigned node.
* <p>
* This deals with skipped transitions (we got a CLOSED but didn't see CLOSING
* yet).
* @param data
* @param expectedVersion
*/
private void handleRegion(final RegionTransitionData data, int expectedVersion) {
synchronized(regionsInTransition) {
HRegionInfo hri = null;
if (data == null || data.getOrigin() == null) {
LOG.warn("Unexpected NULL input " + data);
return;
}
ServerName sn = data.getOrigin();
// Check if this is a special HBCK transition
if (sn.equals(HConstants.HBCK_CODE_SERVERNAME)) {
handleHBCK(data);
return;
}
String encodedName = HRegionInfo.encodeRegionName(data.getRegionName());
String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
// Verify this is a known server
if (!serverManager.isServerOnline(sn) &&
!this.master.getServerName().equals(sn)
&& !ignoreStatesRSOffline.contains(data.getEventType())) {
LOG.warn("Attempted to handle region transition for server but " +
"server is not online: " + prettyPrintedRegionName);
return;
}
// Printing if the event was created a long time ago helps debugging
boolean lateEvent = data.getStamp() <
(System.currentTimeMillis() - 15000);
LOG.debug("Handling transition=" + data.getEventType() +
", server=" + data.getOrigin() + ", region=" +
(prettyPrintedRegionName == null? "null": prettyPrintedRegionName) +
(lateEvent? ", which is more than 15 seconds late" : ""));
RegionState regionState = regionsInTransition.get(encodedName);
switch (data.getEventType()) {
case M_ZK_REGION_OFFLINE:
// Nothing to do.
break;
case RS_ZK_REGION_SPLITTING:
if (!isInStateForSplitting(regionState)) break;
addSplittingToRIT(sn, encodedName);
break;
case RS_ZK_REGION_SPLIT:
// RegionState must be null, or SPLITTING or PENDING_CLOSE.
if (!isInStateForSplitting(regionState)) break;
// If null, add SPLITTING state before going to SPLIT
if (regionState == null) {
regionState = addSplittingToRIT(sn, encodedName);
String message = "Received SPLIT for region " + prettyPrintedRegionName +
" from server " + sn;
// If still null, it means we cannot find it and it was already processed
if (regionState == null) {
LOG.warn(message + " but it doesn't exist anymore," +
" probably already processed its split");
break;
}
LOG.info(message +
" but region was not first in SPLITTING state; continuing");
}
// Check it has daughters.
byte [] payload = data.getPayload();
List<HRegionInfo> daughters = null;
try {
daughters = Writables.getHRegionInfos(payload, 0, payload.length);
} catch (IOException e) {
LOG.error("Dropped split! Failed reading split payload for " +
prettyPrintedRegionName);
break;
}
assert daughters.size() == 2;
// Assert that we can get a serverinfo for this server.
if (!this.serverManager.isServerOnline(sn)) {
LOG.error("Dropped split! ServerName=" + sn + " unknown.");
break;
}
// Run handler to do the rest of the SPLIT handling.
this.executorService.submit(new SplitRegionHandler(master, this,
regionState.getRegion(), sn, daughters));
break;
case M_ZK_REGION_CLOSING:
hri = checkIfInFailover(regionState, encodedName, data);
if (hri != null) {
regionState = new RegionState(hri, RegionState.State.CLOSING, data
.getStamp(), data.getOrigin());
regionsInTransition.put(encodedName, regionState);
failoverProcessedRegions.put(encodedName, hri);
break;
}
// Should see CLOSING after we have asked it to CLOSE or additional
// times after already being in state of CLOSING
if (regionState == null ||
(!regionState.isPendingClose() && !regionState.isClosing())) {
LOG.warn("Received CLOSING for region " + prettyPrintedRegionName +
" from server " + data.getOrigin() + " but region was in " +
" the state " + regionState + " and not " +
"in expected PENDING_CLOSE or CLOSING states");
return;
}
// Transition to CLOSING (or update stamp if already CLOSING)
regionState.update(RegionState.State.CLOSING,
data.getStamp(), data.getOrigin());
break;
case RS_ZK_REGION_CLOSED:
hri = checkIfInFailover(regionState, encodedName, data);
if (hri != null) {
regionState = new RegionState(hri, RegionState.State.CLOSED, data
.getStamp(), data.getOrigin());
regionsInTransition.put(encodedName, regionState);
remo