Package org.apache.hadoop.hbase.master

Source Code of org.apache.hadoop.hbase.master.AssignmentManager

/**
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.master;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NavigableMap;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerLoad;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.catalog.RootLocationEditor;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.RegionTransitionData;
import org.apache.hadoop.hbase.ipc.ServerNotRunningYetException;
import org.apache.hadoop.hbase.master.AssignmentManager.RegionState.State;
import org.apache.hadoop.hbase.master.handler.ClosedRegionHandler;
import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
import org.apache.hadoop.hbase.master.handler.OpenedRegionHandler;
import org.apache.hadoop.hbase.master.handler.ServerShutdownHandler;
import org.apache.hadoop.hbase.master.handler.SplitRegionHandler;
import org.apache.hadoop.hbase.regionserver.RegionAlreadyInTransitionException;
import org.apache.hadoop.hbase.regionserver.RegionOpeningState;
import org.apache.hadoop.hbase.regionserver.RegionServerStoppedException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZKTable;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.zookeeper.AsyncCallback;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.NoNodeException;
import org.apache.zookeeper.KeeperException.NodeExistsException;
import org.apache.zookeeper.data.Stat;

/**
* Manages and performs region assignment.
* <p>
* Monitors ZooKeeper for events related to regions in transition.
* <p>
* Handles existing regions in transition during master failover.
*/
public class AssignmentManager extends ZooKeeperListener {

  private static final Log LOG = LogFactory.getLog(AssignmentManager.class);

  protected Server master;

  private ServerManager serverManager;

  private CatalogTracker catalogTracker;

  private TimeoutMonitor timeoutMonitor;

  private TimerUpdater timerUpdater;

  private LoadBalancer balancer;

  /**
   * Map of regions to reopen after the schema of a table is changed. Key -
   * encoded region name, value - HRegionInfo
   */
  private final Map <String, HRegionInfo> regionsToReopen;

  /*
   * Maximum times we recurse an assignment.  See below in {@link #assign()}.
   */
  private final int maximumAssignmentAttempts;

  /**
   * Regions currently in transition.  Map of encoded region names to the master
   * in-memory state for that region.
   */
  final ConcurrentSkipListMap<String, RegionState> regionsInTransition =
    new ConcurrentSkipListMap<String, RegionState>();

  /** Plans for region movement. Key is the encoded version of a region name*/
  // TODO: When do plans get cleaned out?  Ever? In server open and in server
  // shutdown processing -- St.Ack
  // All access to this Map must be synchronized.
  final NavigableMap<String, RegionPlan> regionPlans =
    new TreeMap<String, RegionPlan>();

  private final ZKTable zkTable;

  // store all the table names in disabling state
  Set<String> disablingTables = new HashSet<String>(1);
  // store all the enabling state table names and corresponding online servers' regions.
  // This may be needed to avoid calling assign twice for the regions of the ENABLING table
  // that could have been assigned through processRIT.
  Map<String, List<HRegionInfo>> enablingTables = new HashMap<String, List<HRegionInfo>>(1);
  /**
   * Server to regions assignment map.
   * Contains the set of regions currently assigned to a given server.
   * This Map and {@link #regions} are tied.  Always update this in tandem
   * with the other under a lock on {@link #regions}.
   * @see #regions
   */
  private final NavigableMap<ServerName, Set<HRegionInfo>> servers =
    new TreeMap<ServerName, Set<HRegionInfo>>();

  /**
   * Contains the server which need to update timer, these servers will be
   * handled by {@link TimerUpdater}
   */
  private final ConcurrentSkipListSet<ServerName> serversInUpdatingTimer =
    new ConcurrentSkipListSet<ServerName>();

  /**
   * Region to server assignment map.
   * Contains the server a given region is currently assigned to.
   * This Map and {@link #servers} are tied.  Always update this in tandem
   * with the other under a lock on {@link #regions}.
   * @see #servers
   */
  private final SortedMap<HRegionInfo, ServerName> regions =
    new TreeMap<HRegionInfo, ServerName>();

  private final ExecutorService executorService;

  //Thread pool executor service for timeout monitor
  private java.util.concurrent.ExecutorService threadPoolExecutorService;
 
  private List<EventType> ignoreStatesRSOffline = Arrays.asList(new EventType[]{
      EventType.RS_ZK_REGION_FAILED_OPEN, EventType.RS_ZK_REGION_CLOSED });

  /**
   * Set when we are doing master failover processing; cleared when failover
   * completes.
   */
  private volatile boolean failover = false;

  // Set holding all the regions which got processed while RIT was not
  // populated during master failover.
  private Map<String, HRegionInfo> failoverProcessedRegions =
    new HashMap<String, HRegionInfo>();

  /**
   * Constructs a new assignment manager.
   *
   * @param master
   * @param serverManager
   * @param catalogTracker
   * @param service
   * @throws KeeperException
   * @throws IOException
   */
  public AssignmentManager(Server master, ServerManager serverManager,
      CatalogTracker catalogTracker, final LoadBalancer balancer,
      final ExecutorService service) throws KeeperException, IOException {
    super(master.getZooKeeper());
    this.master = master;
    this.serverManager = serverManager;
    this.catalogTracker = catalogTracker;
    this.executorService = service;
    this.regionsToReopen = Collections.synchronizedMap
                           (new HashMap<String, HRegionInfo> ());
    Configuration conf = master.getConfiguration();
    this.timeoutMonitor = new TimeoutMonitor(
      conf.getInt("hbase.master.assignment.timeoutmonitor.period", 10000),
      master, serverManager,
      conf.getInt("hbase.master.assignment.timeoutmonitor.timeout", 1800000));
    this.timerUpdater = new TimerUpdater(conf.getInt(
        "hbase.master.assignment.timerupdater.period", 10000), master);
    Threads.setDaemonThreadRunning(timerUpdater.getThread(),
        master.getServerName() + ".timerUpdater");
    this.zkTable = new ZKTable(this.master.getZooKeeper());
    this.maximumAssignmentAttempts =
      this.master.getConfiguration().getInt("hbase.assignment.maximum.attempts", 10);
    this.balancer = balancer;
    this.threadPoolExecutorService = Executors.newCachedThreadPool();
  }
 
  void startTimeOutMonitor() {
    Threads.setDaemonThreadRunning(timeoutMonitor.getThread(), master.getServerName()
        + ".timeoutMonitor");
  }

  /**
   * Compute the average load across all region servers.
   * Currently, this uses a very naive computation - just uses the number of
   * regions being served, ignoring stats about number of requests.
   * @return the average load
   */
  double getAverageLoad() {
    int totalLoad = 0;
    int numServers = 0;
    // Sync on this.regions because access to this.servers always synchronizes
    // in this order.
    synchronized (this.regions) {
      for (Map.Entry<ServerName, Set<HRegionInfo>> e: servers.entrySet()) {
        numServers++;
        totalLoad += e.getValue().size();
      }
    }
    return (double)totalLoad / (double)numServers;
  }

  /**
   * @return Instance of ZKTable.
   */
  public ZKTable getZKTable() {
    // These are 'expensive' to make involving trip to zk ensemble so allow
    // sharing.
    return this.zkTable;
  }
  /**
   * Returns the RegionServer to which hri is assigned.
   *
   * @param hri
   *          HRegion for which this function returns the region server
   * @return HServerInfo The region server to which hri belongs
   */
  public ServerName getRegionServerOfRegion(HRegionInfo hri) {
    synchronized (this.regions ) {
      return regions.get(hri);
    }
  }

  /**
   * Checks whether the region is assigned.
   * @param hri HRegion for which this function returns the result
   * @return True iff assigned.
   */
  public boolean isRegionAssigned(HRegionInfo hri) {
    synchronized (this.regions ) {
      return regions.containsKey(hri);
    }
  }

  /**
   * Gives enabling table regions.
   *
   * @param tableName
   * @return list of regionInfos
   */
  public List<HRegionInfo> getEnablingTableRegions(String tableName){
    return this.enablingTables.get(tableName);
  }

  /**
   * Add a regionPlan for the specified region.
   * @param encodedName
   * @param plan
   */
  public void addPlan(String encodedName, RegionPlan plan) {
    synchronized (regionPlans) {
      regionPlans.put(encodedName, plan);
    }
  }

  /**
   * Add a map of region plans.
   */
  public void addPlans(Map<String, RegionPlan> plans) {
    synchronized (regionPlans) {
      regionPlans.putAll(plans);
    }
  }

  /**
   * Set the list of regions that will be reopened
   * because of an update in table schema
   *
   * @param regions
   *          list of regions that should be tracked for reopen
   */
  public void setRegionsToReopen(List <HRegionInfo> regions) {
    for(HRegionInfo hri : regions) {
      regionsToReopen.put(hri.getEncodedName(), hri);
    }
  }

  /**
   * Used by the client to identify if all regions have the schema updates
   *
   * @param tableName
   * @return Pair indicating the status of the alter command
   * @throws IOException
   */
  public Pair<Integer, Integer> getReopenStatus(byte[] tableName)
  throws IOException {
    List <HRegionInfo> hris =
      MetaReader.getTableRegions(this.master.getCatalogTracker(), tableName);
    Integer pending = 0;
    for(HRegionInfo hri : hris) {
      String name = hri.getEncodedName();
      if (regionsToReopen.containsKey(name) || regionsInTransition.containsKey(name)) {
        pending++;
      }
    }
    return new Pair<Integer, Integer>(pending, hris.size());
  }
  /**
   * Reset all unassigned znodes.  Called on startup of master.
   * Call {@link #assignAllUserRegions()} after root and meta have been assigned.
   * @throws IOException
   * @throws KeeperException
   */
  void cleanoutUnassigned() throws IOException, KeeperException {
    // Cleanup any existing ZK nodes and start watching
    ZKAssign.deleteAllNodes(watcher);
    ZKUtil.listChildrenAndWatchForNewChildren(this.watcher,
      this.watcher.assignmentZNode);
  }

  /**
   * Called on startup.
   * Figures whether a fresh cluster start of we are joining extant running cluster.
   * @throws IOException
   * @throws KeeperException
   * @throws InterruptedException
   */
  void joinCluster() throws IOException,
      KeeperException, InterruptedException {
    // Concurrency note: In the below the accesses on regionsInTransition are
    // outside of a synchronization block where usually all accesses to RIT are
    // synchronized.  The presumption is that in this case it is safe since this
    // method is being played by a single thread on startup.

    // TODO: Regions that have a null location and are not in regionsInTransitions
    // need to be handled.

    // Scan META to build list of existing regions, servers, and assignment
    // Returns servers who have not checked in (assumed dead) and their regions
    Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers = rebuildUserRegions();

    processDeadServersAndRegionsInTransition(deadServers);

    // Recover the tables that were not fully moved to DISABLED state.
    // These tables are in DISABLING state when the master restarted/switched.
    boolean isWatcherCreated = recoverTableInDisablingState(this.disablingTables);
    recoverTableInEnablingState(this.enablingTables.keySet(), isWatcherCreated);
    this.enablingTables.clear();
    this.disablingTables.clear();
  }

  /**
   * Process all regions that are in transition up in zookeeper.  Used by
   * master joining an already running cluster.
   * @throws KeeperException
   * @throws IOException
   * @throws InterruptedException
   */
  void processDeadServersAndRegionsInTransition()
  throws KeeperException, IOException, InterruptedException {
    // Pass null to signify no dead servers in this context.
    processDeadServersAndRegionsInTransition(null);
  }

  /**
   * Process all regions that are in transition in zookeeper and also
   * processes the list of dead servers by scanning the META.
   * Used by master joining an cluster.
   * @param deadServers
   *          Map of dead servers and their regions. Can be null.
   * @throws KeeperException
   * @throws IOException
   * @throws InterruptedException
   */
  void processDeadServersAndRegionsInTransition(
      final Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers)
  throws KeeperException, IOException, InterruptedException {
    List<String> nodes = ZKUtil.listChildrenAndWatchForNewChildren(watcher,
      watcher.assignmentZNode);
   
    if (nodes == null) {
      String errorMessage = "Failed to get the children from ZK";
      master.abort(errorMessage, new IOException(errorMessage));
      return;
    }
    // Run through all regions.  If they are not assigned and not in RIT, then
    // its a clean cluster startup, else its a failover.
    synchronized (this.regions) {
      for (Map.Entry<HRegionInfo, ServerName> e : this.regions.entrySet()) {
        if (!e.getKey().isMetaTable() && e.getValue() != null) {
          LOG.debug("Found " + e + " out on cluster");
          this.failover = true;
          break;
        }
        if (nodes.contains(e.getKey().getEncodedName())) {
          LOG.debug("Found " + e.getKey().getRegionNameAsString() + " in RITs");
          // Could be a meta region.
          this.failover = true;
          break;
        }
      }
    }

    // Remove regions in RIT, they are possibly being processed by
    // ServerShutdownHandler.
    synchronized (regionsInTransition) {
      nodes.removeAll(regionsInTransition.keySet());
    }

    // If some dead servers are processed by ServerShutdownHandler, we shouldn't
    // assign all user regions( some would be assigned by
    // ServerShutdownHandler), consider it as a failover
    if (!this.serverManager.getDeadServers().isEmpty()) {
      this.failover = true;
    }

    // If we found user regions out on cluster, its a failover.
    if (this.failover) {
      LOG.info("Found regions out on cluster or in RIT; failover");
      // Process list of dead servers and regions in RIT.
      // See HBASE-4580 for more information.
      processDeadServersAndRecoverLostRegions(deadServers, nodes);
      this.failover = false;
      failoverProcessedRegions.clear();
    } else {
      // Fresh cluster startup.
      LOG.info("Clean cluster startup. Assigning userregions");
      cleanoutUnassigned();
      assignAllUserRegions();
    }
  }

  /**
   * If region is up in zk in transition, then do fixup and block and wait until
   * the region is assigned and out of transition.  Used on startup for
   * catalog regions.
   * @param hri Region to look for.
   * @return True if we processed a region in transition else false if region
   * was not up in zk in transition.
   * @throws InterruptedException
   * @throws KeeperException
   * @throws IOException
   */
  boolean processRegionInTransitionAndBlockUntilAssigned(final HRegionInfo hri)
  throws InterruptedException, KeeperException, IOException {
    boolean intransistion =
      processRegionInTransition(hri.getEncodedName(), hri, null);
    if (!intransistion) return intransistion;
    LOG.debug("Waiting on " + HRegionInfo.prettyPrint(hri.getEncodedName()));
    synchronized(this.regionsInTransition) {
      while (!this.master.isStopped() &&
          this.regionsInTransition.containsKey(hri.getEncodedName())) {
        // We expect a notify, but by security we set a timout
        this.regionsInTransition.wait(100);
      }
    }
    return intransistion;
  }

  /**
   * Process failover of new master for region <code>encodedRegionName</code>
   * up in zookeeper.
   * @param encodedRegionName Region to process failover for.
   * @param regionInfo If null we'll go get it from meta table.
   * @param deadServers Can be null
   * @return True if we processed <code>regionInfo</code> as a RIT.
   * @throws KeeperException
   * @throws IOException
   */
  boolean processRegionInTransition(final String encodedRegionName,
      final HRegionInfo regionInfo,
      final Map<ServerName,List<Pair<HRegionInfo,Result>>> deadServers)
  throws KeeperException, IOException {
    Stat stat = new Stat();
    RegionTransitionData data = ZKAssign.getDataAndWatch(watcher,
        encodedRegionName, stat);
    if (data == null) return false;
    HRegionInfo hri = regionInfo;
    if (hri == null) {
      if ((hri = getHRegionInfo(data)) == null) return false;
    }
    processRegionsInTransition(data, hri, deadServers, stat.getVersion());
    return true;
  }

  void processRegionsInTransition(final RegionTransitionData data,
      final HRegionInfo regionInfo,
      final Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers,
      int expectedVersion)
  throws KeeperException {
    String encodedRegionName = regionInfo.getEncodedName();
    LOG.info("Processing region " + regionInfo.getRegionNameAsString() +
      " in state " + data.getEventType());
    List<HRegionInfo> hris = this.enablingTables.get(regionInfo.getTableNameAsString());
    if (hris != null && !hris.isEmpty()) {
      hris.remove(regionInfo);
    }
    synchronized (regionsInTransition) {
      RegionState regionState = regionsInTransition.get(encodedRegionName);
      if (regionState != null ||
          failoverProcessedRegions.containsKey(encodedRegionName)) {
        // Just return
        return;
      }
      switch (data.getEventType()) {
      case M_ZK_REGION_CLOSING:
        // If zk node of the region was updated by a live server skip this
        // region and just add it into RIT.
        if (isOnDeadServer(regionInfo, deadServers) &&
            (data.getOrigin() == null || !serverManager.isServerOnline(data.getOrigin()))) {
          // If was on dead server, its closed now. Force to OFFLINE and this
          // will get it reassigned if appropriate
          forceOffline(regionInfo, data);
        } else {
          // Just insert region into RIT.
          // If this never updates the timeout will trigger new assignment
          regionsInTransition.put(encodedRegionName, new RegionState(
            regionInfo, RegionState.State.CLOSING,
            data.getStamp(), data.getOrigin()));
        }
        failoverProcessedRegions.put(encodedRegionName, regionInfo);
        break;

      case RS_ZK_REGION_CLOSED:
      case RS_ZK_REGION_FAILED_OPEN:
        // Region is closed, insert into RIT and handle it
        addToRITandCallClose(regionInfo, RegionState.State.CLOSED, data);
        failoverProcessedRegions.put(encodedRegionName, regionInfo);
        break;

      case M_ZK_REGION_OFFLINE:
        // If zk node of the region was updated by a live server skip this
        // region and just add it into RIT.
        if (isOnDeadServer(regionInfo, deadServers) &&
            (data.getOrigin() == null ||
              !serverManager.isServerOnline(data.getOrigin()))) {
          // Region is offline, insert into RIT and handle it like a closed
          addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, data);
        } else if (data.getOrigin() != null &&
            !serverManager.isServerOnline(data.getOrigin())) {
          // to handle cases where offline node is created but sendRegionOpen
          // RPC is not yet sent
          addToRITandCallClose(regionInfo, RegionState.State.OFFLINE, data);
        } else {
          regionsInTransition.put(encodedRegionName, new RegionState(
              regionInfo, RegionState.State.PENDING_OPEN, data.getStamp(), data
                  .getOrigin()));
        }
        failoverProcessedRegions.put(encodedRegionName, regionInfo);
        break;

      case RS_ZK_REGION_OPENING:
        // TODO: Could check if it was on deadServers.  If it was, then we could
        // do what happens in TimeoutMonitor when it sees this condition.

        // Just insert region into RIT
        // If this never updates the timeout will trigger new assignment
        if (regionInfo.isMetaTable()) {
          regionsInTransition.put(encodedRegionName, new RegionState(
              regionInfo, RegionState.State.OPENING, data.getStamp(), data
                  .getOrigin()));
          // If ROOT or .META. table is waiting for timeout monitor to assign
          // it may take lot of time when the assignment.timeout.period is
          // the default value which may be very long.  We will not be able
          // to serve any request during this time.
          // So we will assign the ROOT and .META. region immediately.
          processOpeningState(regionInfo);
          break;
        }
        regionsInTransition.put(encodedRegionName, new RegionState(regionInfo,
            RegionState.State.OPENING, data.getStamp(), data.getOrigin()));
        failoverProcessedRegions.put(encodedRegionName, regionInfo);
        break;

      case RS_ZK_REGION_OPENED:
        // Region is opened, insert into RIT and handle it
        regionsInTransition.put(encodedRegionName, new RegionState(
            regionInfo, RegionState.State.OPEN,
            data.getStamp(), data.getOrigin()));
        ServerName sn = data.getOrigin() == null? null: data.getOrigin();
        // sn could be null if this server is no longer online.  If
        // that is the case, just let this RIT timeout; it'll be assigned
        // to new server then.
        if (sn == null) {
          LOG.warn("Region in transition " + regionInfo.getEncodedName() +
            " references a null server; letting RIT timeout so will be " +
            "assigned elsewhere");
        } else if (!serverManager.isServerOnline(sn)
            && (isOnDeadServer(regionInfo, deadServers)
                || regionInfo.isMetaRegion() || regionInfo.isRootRegion())) {
          forceOffline(regionInfo, data);
        } else {
          new OpenedRegionHandler(master, this, regionInfo, sn, expectedVersion)
              .process();
        }
        failoverProcessedRegions.put(encodedRegionName, regionInfo);
        break;
      }
    }
  }
 

  /**
   * Put the region <code>hri</code> into an offline state up in zk.
   * @param hri
   * @param oldData
   * @throws KeeperException
   */
  private void forceOffline(final HRegionInfo hri,
      final RegionTransitionData oldData)
  throws KeeperException {
    // If was on dead server, its closed now.  Force to OFFLINE and then
    // handle it like a close; this will get it reassigned if appropriate
    LOG.debug("RIT " + hri.getEncodedName() + " in state=" +
      oldData.getEventType() + " was on deadserver; forcing offline");
    ZKAssign.createOrForceNodeOffline(this.watcher, hri,
      this.master.getServerName());
    addToRITandCallClose(hri, RegionState.State.OFFLINE, oldData);
  }

  /**
   * Add to the in-memory copy of regions in transition and then call close
   * handler on passed region <code>hri</code>
   * @param hri
   * @param state
   * @param oldData
   */
  private void addToRITandCallClose(final HRegionInfo hri,
      final RegionState.State state, final RegionTransitionData oldData) {
    this.regionsInTransition.put(hri.getEncodedName(),
      new RegionState(hri, state, oldData.getStamp(), oldData.getOrigin()));
    new ClosedRegionHandler(this.master, this, hri).process();
  }

  /**
   * When a region is closed, it should be removed from the regionsToReopen
   * @param hri HRegionInfo of the region which was closed
   */
  public void removeClosedRegion(HRegionInfo hri) {
    if (!regionsToReopen.isEmpty()) {
      if (regionsToReopen.remove(hri.getEncodedName()) != null) {
          LOG.debug("Removed region from reopening regions because it was closed");
      }
    }
  }

  /**
   * @param regionInfo
   * @param deadServers Map of deadServers and the regions they were carrying;
   * can be null.
   * @return True if the passed regionInfo in the passed map of deadServers?
   */
  private boolean isOnDeadServer(final HRegionInfo regionInfo,
      final Map<ServerName, List<Pair<HRegionInfo, Result>>> deadServers) {
    if (deadServers == null) return false;
    for (Map.Entry<ServerName, List<Pair<HRegionInfo, Result>>> deadServer:
        deadServers.entrySet()) {
      for (Pair<HRegionInfo, Result> e: deadServer.getValue()) {
        if (e.getFirst().equals(regionInfo)) return true;
      }
    }
    return false;
  }

  /**
   * Handles various states an unassigned node can be in.
   * <p>
   * Method is called when a state change is suspected for an unassigned node.
   * <p>
   * This deals with skipped transitions (we got a CLOSED but didn't see CLOSING
   * yet).
   * @param data
   * @param expectedVersion
   */
  private void handleRegion(final RegionTransitionData data, int expectedVersion) {
    synchronized(regionsInTransition) {
      HRegionInfo hri = null;
      if (data == null || data.getOrigin() == null) {
        LOG.warn("Unexpected NULL input " + data);
        return;
      }
      ServerName sn = data.getOrigin();
      // Check if this is a special HBCK transition
      if (sn.equals(HConstants.HBCK_CODE_SERVERNAME)) {
        handleHBCK(data);
        return;
      }
      String encodedName = HRegionInfo.encodeRegionName(data.getRegionName());
      String prettyPrintedRegionName = HRegionInfo.prettyPrint(encodedName);
      // Verify this is a known server
      if (!serverManager.isServerOnline(sn) &&
          !this.master.getServerName().equals(sn)
          && !ignoreStatesRSOffline.contains(data.getEventType())) {
        LOG.warn("Attempted to handle region transition for server but " +
          "server is not online: " + prettyPrintedRegionName);
        return;
      }
      // Printing if the event was created a long time ago helps debugging
      boolean lateEvent = data.getStamp() <
          (System.currentTimeMillis() - 15000);
      LOG.debug("Handling transition=" + data.getEventType() +
        ", server=" + data.getOrigin() + ", region=" +
          (prettyPrintedRegionName == null? "null": prettyPrintedRegionName+
          (lateEvent? ", which is more than 15 seconds late" : ""));
      RegionState regionState = regionsInTransition.get(encodedName);
      switch (data.getEventType()) {
        case M_ZK_REGION_OFFLINE:
          // Nothing to do.
          break;

        case RS_ZK_REGION_SPLITTING:
          if (!isInStateForSplitting(regionState)) break;
          addSplittingToRIT(sn, encodedName);
          break;

        case RS_ZK_REGION_SPLIT:
          // RegionState must be null, or SPLITTING or PENDING_CLOSE.
          if (!isInStateForSplitting(regionState)) break;
          // If null, add SPLITTING state before going to SPLIT
          if (regionState == null) {
            regionState = addSplittingToRIT(sn, encodedName);
            String message = "Received SPLIT for region " + prettyPrintedRegionName +
              " from server " + sn;
            // If still null, it means we cannot find it and it was already processed
            if (regionState == null) {
              LOG.warn(message + " but it doesn't exist anymore," +
                  " probably already processed its split");
              break;
            }
            LOG.info(message +
                " but region was not first in SPLITTING state; continuing");
          }
          // Check it has daughters.
          byte [] payload = data.getPayload();
          List<HRegionInfo> daughters = null;
          try {
            daughters = Writables.getHRegionInfos(payload, 0, payload.length);
          } catch (IOException e) {
            LOG.error("Dropped split! Failed reading split payload for " +
              prettyPrintedRegionName);
            break;
          }
          assert daughters.size() == 2;
          // Assert that we can get a serverinfo for this server.
          if (!this.serverManager.isServerOnline(sn)) {
            LOG.error("Dropped split! ServerName=" + sn + " unknown.");
            break;
          }
          // Run handler to do the rest of the SPLIT handling.
          this.executorService.submit(new SplitRegionHandler(master, this,
            regionState.getRegion(), sn, daughters));
          break;

        case M_ZK_REGION_CLOSING:
          hri = checkIfInFailover(regionState, encodedName, data);
          if (hri != null) {
            regionState = new RegionState(hri, RegionState.State.CLOSING, data
               .getStamp(), data.getOrigin());
            regionsInTransition.put(encodedName, regionState);
            failoverProcessedRegions.put(encodedName, hri);
            break;
          }
          // Should see CLOSING after we have asked it to CLOSE or additional
          // times after already being in state of CLOSING
          if (regionState == null ||
              (!regionState.isPendingClose() && !regionState.isClosing())) {
            LOG.warn("Received CLOSING for region " + prettyPrintedRegionName +
              " from server " + data.getOrigin() + " but region was in " +
              " the state " + regionState + " and not " +
              "in expected PENDING_CLOSE or CLOSING states");
            return;
          }
          // Transition to CLOSING (or update stamp if already CLOSING)
          regionState.update(RegionState.State.CLOSING,
              data.getStamp(), data.getOrigin());
          break;

        case RS_ZK_REGION_CLOSED:
          hri = checkIfInFailover(regionState, encodedName, data);
          if (hri != null) {
            regionState = new RegionState(hri, RegionState.State.CLOSED, data
                .getStamp(), data.getOrigin());
            regionsInTransition.put(encodedName, regionState);
            remo
TOP

Related Classes of org.apache.hadoop.hbase.master.AssignmentManager

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.