Package org.apache.accumulo.server.master.balancer

Source Code of org.apache.accumulo.server.master.balancer.DefaultLoadBalancer

* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.accumulo.server.master.balancer;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.Map.Entry;

import org.apache.accumulo.core.master.thrift.TableInfo;
import org.apache.accumulo.core.master.thrift.TabletServerStatus;
import org.apache.accumulo.core.tabletserver.thrift.TabletStats;
import org.apache.accumulo.server.master.state.TServerInstance;
import org.apache.accumulo.server.master.state.TabletMigration;
import org.apache.log4j.Logger;

public class DefaultLoadBalancer extends TabletBalancer {
  private static final Logger log = Logger.getLogger(DefaultLoadBalancer.class);
  Iterator<TServerInstance> assignments;
  // if tableToBalance is set, then only balance the given table
  String tableToBalance = null;
  public DefaultLoadBalancer() {
  public DefaultLoadBalancer(String table) {
    tableToBalance = table;
  public TServerInstance getAssignment(SortedMap<TServerInstance,TabletServerStatus> locations, KeyExtent extent, TServerInstance last) {
    if (locations.size() == 0)
      return null;
    if (last != null) {
      // Maintain locality
      TServerInstance simple = new TServerInstance(last.getLocation(), "");
      Iterator<TServerInstance> find = locations.tailMap(simple).keySet().iterator();
      if (find.hasNext()) {
        TServerInstance current =;
        if (
          return current;
    // The strategy here is to walk through the locations and hand them back, one at a time
    // Grab an iterator off of the set of options; use a new iterator if it hands back something not in the current list.
    if (assignments == null || !assignments.hasNext())
      assignments = locations.keySet().iterator();
    TServerInstance result =;
    if (!locations.containsKey(result)) {
      assignments = null;
      return locations.keySet().iterator().next();
    return result;
  static class ServerCounts implements Comparable<ServerCounts> {
    public final TServerInstance server;
    public final int count;
    public final TabletServerStatus status;
    ServerCounts(int count, TServerInstance server, TabletServerStatus status) {
      this.count = count;
      this.server = server;
      this.status = status;
    public int compareTo(ServerCounts obj) {
      int result = count - obj.count;
      if (result == 0)
        return server.compareTo(obj.server);
      return result;
  public boolean getMigrations(Map<TServerInstance,TabletServerStatus> current, List<TabletMigration> result) {
    boolean moreBalancingNeeded = false;
    try {
      // no moves possible
      if (current.size() < 2) {
        return false;
      // Sort by total number of online tablets, per server
      int total = 0;
      ArrayList<ServerCounts> totals = new ArrayList<ServerCounts>();
      for (Entry<TServerInstance,TabletServerStatus> entry : current.entrySet()) {
        int serverTotal = 0;
        if (entry.getValue() != null && entry.getValue().tableMap != null) {
          for (Entry<String,TableInfo> e : entry.getValue().tableMap.entrySet()) {
             * The check below was on entry.getKey(), but that resolves to a tabletserver not a tablename. Believe it should be e.getKey() which is a tablename
            if (tableToBalance == null || tableToBalance.equals(e.getKey()))
              serverTotal += e.getValue().onlineTablets;
        totals.add(new ServerCounts(serverTotal, entry.getKey(), entry.getValue()));
        total += serverTotal;
      // order from low to high
      int even = total / totals.size();
      int numServersOverEven = total % totals.size();
      // Move tablets from the servers with too many to the servers with
      // the fewest but only nominate tablets to move once. This allows us
      // to fill new servers with tablets from a mostly balanced server
      // very quickly. However, it may take several balancing passes to move
      // tablets from one hugely overloaded server to many slightly
      // under-loaded servers.
      int end = totals.size() - 1;
      int movedAlready = 0;
      for (int tooManyIndex = 0; tooManyIndex < totals.size(); tooManyIndex++) {
        ServerCounts tooMany = totals.get(tooManyIndex);
        int goal = even;
        if (tooManyIndex < numServersOverEven) {
        int needToUnload = tooMany.count - goal;
        ServerCounts tooLittle = totals.get(end);
        int needToLoad = goal - tooLittle.count - movedAlready;
        if (needToUnload < 1 && needToLoad < 1) {
        if (needToUnload >= needToLoad) {
          result.addAll(move(tooMany, tooLittle, needToLoad));
          movedAlready = 0;
        } else {
          result.addAll(move(tooMany, tooLittle, needToUnload));
          movedAlready += needToUnload;
        if (needToUnload > needToLoad)
          moreBalancingNeeded = true;
    } finally {
      log.debug("balance ended with " + result.size() + " migrations");
    return moreBalancingNeeded;
  static class TableDiff {
    int diff;
    String table;
    public TableDiff(int diff, String table) {
      this.diff = diff;
      this.table = table;
   * Select a tablet based on differences between table loads; if the loads are even, use the busiest table
  List<TabletMigration> move(ServerCounts tooMuch, ServerCounts tooLittle, int count) {
    List<TabletMigration> result = new ArrayList<TabletMigration>();
    if (count == 0)
      return result;
    Map<String,Map<KeyExtent,TabletStats>> onlineTablets = new HashMap<String,Map<KeyExtent,TabletStats>>();
    // Copy counts so we can update them as we propose migrations
    Map<String,Integer> tooMuchMap = tabletCountsPerTable(tooMuch.status);
    Map<String,Integer> tooLittleMap = tabletCountsPerTable(tooLittle.status);
    for (int i = 0; i < count; i++) {
      String table;
      Integer tooLittleCount;
      if (tableToBalance == null) {
        // find a table to migrate
        // look for an uneven table count
        int biggestDifference = 0;
        String biggestDifferenceTable = null;
        for (Entry<String,Integer> tableEntry : tooMuchMap.entrySet()) {
          String tableID = tableEntry.getKey();
          if (tooLittleMap.get(tableID) == null)
            tooLittleMap.put(tableID, 0);
          int diff = tableEntry.getValue() - tooLittleMap.get(tableID);
          if (diff > biggestDifference) {
            biggestDifference = diff;
            biggestDifferenceTable = tableID;
        if (biggestDifference < 2) {
          table = busiest(tooMuch.status.tableMap);
        } else {
          table = biggestDifferenceTable;
      } else {
        // just balance the given table
        table = tableToBalance;
      Map<KeyExtent,TabletStats> onlineTabletsForTable = onlineTablets.get(table);
      try {
        if (onlineTabletsForTable == null) {
          onlineTabletsForTable = new HashMap<KeyExtent,TabletStats>();
          for (TabletStats stat : getOnlineTabletsForTable(tooMuch.server, table))
            onlineTabletsForTable.put(new KeyExtent(stat.extent), stat);
          onlineTablets.put(table, onlineTabletsForTable);
      } catch (Exception ex) {
        log.error("Unable to select a tablet to move", ex);
        return result;
      KeyExtent extent = selectTablet(tooMuch.server, onlineTabletsForTable);
      if (extent == null)
        return result;
      tooMuchMap.put(table, tooMuchMap.get(table) - 1);
       * If a table grows from 1 tablet then tooLittleMap.get(table) can return a null, since there is only one tabletserver that holds all of the tablets. Here
       * we check to see if in fact that is the case and if so set the value to 0.
      tooLittleCount = tooLittleMap.get(table);
      if (tooLittleCount == null) {
        tooLittleCount = 0;
      tooLittleMap.put(table, tooLittleCount + 1);
      result.add(new TabletMigration(extent, tooMuch.server, tooLittle.server));
    return result;
  static Map<String,Integer> tabletCountsPerTable(TabletServerStatus status) {
    Map<String,Integer> result = new HashMap<String,Integer>();
    if (status != null && status.tableMap != null) {
      Map<String,TableInfo> tableMap = status.tableMap;
      for (Entry<String,TableInfo> entry : tableMap.entrySet()) {
        result.put(entry.getKey(), entry.getValue().onlineTablets);
    return result;
  static KeyExtent selectTablet(TServerInstance tserver, Map<KeyExtent,TabletStats> extents) {
    if (extents.size() == 0)
      return null;
    KeyExtent mostRecentlySplit = null;
    long splitTime = 0;
    for (Entry<KeyExtent,TabletStats> entry : extents.entrySet())
      if (entry.getValue().splitCreationTime >= splitTime) {
        splitTime = entry.getValue().splitCreationTime;
        mostRecentlySplit = entry.getKey();
    return mostRecentlySplit;
  // define what it means for a tablet to be busy
  private static String busiest(Map<String,TableInfo> tables) {
    String result = null;
    double busiest = Double.NEGATIVE_INFINITY;
    for (Entry<String,TableInfo> entry : tables.entrySet()) {
      TableInfo info = entry.getValue();
      double busy = info.ingestRate + info.queryRate;
      if (busy > busiest) {
        busiest = busy;
        result = entry.getKey();
    return result;
  public void getAssignments(SortedMap<TServerInstance,TabletServerStatus> current, Map<KeyExtent,TServerInstance> unassigned,
      Map<KeyExtent,TServerInstance> assignments) {
    for (Entry<KeyExtent,TServerInstance> entry : unassigned.entrySet()) {
      assignments.put(entry.getKey(), getAssignment(current, entry.getKey(), entry.getValue()));
  public long balance(SortedMap<TServerInstance,TabletServerStatus> current, Set<KeyExtent> migrations, List<TabletMigration> migrationsOut) {
    // do we have any servers?
    if (current.size() > 0) {
      // Don't migrate if we have migrations in progress
      if (migrations.size() == 0) {
        if (getMigrations(current, migrationsOut))
          return 1 * 1000;
    return 5 * 1000;

Related Classes of org.apache.accumulo.server.master.balancer.DefaultLoadBalancer

Copyright © 2018 All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact