Package org.apache.hadoop.hbase.stargate.util

Source Code of org.apache.hadoop.hbase.stargate.util.HTableTokenBucket

/*
* Copyright 2010 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.hbase.stargate.util;

import java.io.IOException;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.RowLock;
import org.apache.hadoop.hbase.stargate.Constants;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.StringUtils;

/**
* A HTable-backed token bucket.
* <p>
* Can be configured with <t>rate</t>, the number of tokens to add to the
* bucket each second, and <t>size</t>, the maximum number of tokens allowed
* to burst. Configuration is stored in the HTable adjacent to the token
* count and is periodically refreshed.
* <p>
* Expected columns:
* <p>
* <ul>
*   <li>user:
*   <ul>
*     <li>user:tokens</li>
*     <li>user:tokens.rate</li>
*     <li>user:tokens.size</li>
*   </ul></li>
* </ul>
*/
public class HTableTokenBucket implements Constants {

  static final Log LOG = LogFactory.getLog(HTableTokenBucket.class);

  static final byte[] USER = Bytes.toBytes("user");
  static final byte[] TOKENS = Bytes.toBytes("tokens");
  static final byte[] TOKENS_RATE = Bytes.toBytes("tokens.rate");
  static final byte[] TOKENS_SIZE = Bytes.toBytes("tokens.size");

  HBaseConfiguration conf;
  String tableName;
  HTable table;
  byte[] row;
  int tokens;
  double rate = 20.0; // default, 20 ops added per second
  int size = 100;     // burst
  long lastUpdated = System.currentTimeMillis();
  long configUpdateInterval;
  long lastConfigUpdated = System.currentTimeMillis();

  void updateConfig() throws IOException {
    Get get = new Get(row);
    get.addColumn(USER, TOKENS_RATE);
    get.addColumn(USER, TOKENS_SIZE);
    Result result = table.get(get);
    byte[] value = result.getValue(USER, TOKENS_RATE);
    if (value != null) {
      this.rate = (int)Bytes.toDouble(value);
    }
    value = result.getValue(USER, TOKENS_SIZE);
    if (value != null) {
      this.size = (int)Bytes.toLong(value);
    }
  }

  /**
   * Constructor
   * @param conf configuration
   * @param row row key for user
   * @throws IOException
   */
  public HTableTokenBucket(HBaseConfiguration conf, byte[] row)
      throws IOException {
    this(conf, conf.get("stargate.tb.htable.name", USERS_TABLE), row);
  }

  /**
   * Constructor
   * @param conf configuration
   * @param tableName the table to use
   * @param row row key for user
   * @throws IOException
   */
  public HTableTokenBucket(HBaseConfiguration conf, String tableName,
      byte[] row) throws IOException {
    this.conf = conf;
    this.tableName = tableName;
    this.row = row;
    this.table = new HTable(conf, tableName);
    this.configUpdateInterval =
      conf.getLong("stargate.tb.update.interval", 1000 * 60);
    updateConfig();
  }

  /**
   * @return the number of remaining tokens in the bucket (roughly)
   * @throws IOException
   */
  public int available() throws IOException {
    long now = System.currentTimeMillis();
    if (now - lastConfigUpdated > configUpdateInterval) {
      try {
        updateConfig();
      } catch (IOException e) {
        LOG.warn(StringUtils.stringifyException(e));
      }
      lastConfigUpdated = now;
    }

    // We can't simply use incrementColumnValue here because the timestamp of
    // the keyvalue will not be changed as long as it remains in memstore, so
    // there will be some unavoidable contention on the row if multiple
    // Stargate instances are concurrently serving the same user, and three
    // more round trips than otherwise.
    RowLock rl = table.lockRow(row);
    try {
      Get get = new Get(row, rl);
      get.addColumn(USER, TOKENS);
      List<KeyValue> kvs = table.get(get).list();
      if (kvs != null && !kvs.isEmpty()) {
        KeyValue kv = kvs.get(0);
        tokens = (int)Bytes.toLong(kv.getValue());
        lastUpdated = kv.getTimestamp();
      } else {
        tokens = (int)rate;
      }
      long elapsed = now - lastUpdated;
      int i = (int)((elapsed / 1000) * rate); // convert sec <-> ms
      if (tokens + i > size) {
        i = size - tokens;
      }
      if (i > 0) {
        tokens += i;
        Put put = new Put(row, rl);
        put.add(USER, TOKENS, Bytes.toBytes((long)tokens));
        put.setWriteToWAL(false);
        table.put(put);
        table.flushCommits();
      }
    } finally {
      table.unlockRow(rl);
    }
    return tokens;
  }

  /**
   * @param t the number of tokens to consume from the bucket
   * @throws IOException
   */
  public void remove(int t) throws IOException {
    // Here we don't care about timestamp changes; actually it's advantageous
    // if they are not updated, otherwise available() and remove() must be
    // used as near to each other in time as possible.
    table.incrementColumnValue(row, USER, TOKENS, (long) -t, false);
  }

  public double getRate() {
    return rate;
  }

  public int getSize() {
    return size;
  }

}
TOP

Related Classes of org.apache.hadoop.hbase.stargate.util.HTableTokenBucket

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.