Package org.apache.hadoop.chukwa.datacollection.writer.hbase

Source Code of org.apache.hadoop.chukwa.datacollection.writer.hbase.HBaseWriter$StatReportingTask

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.chukwa.datacollection.writer.hbase;

import java.util.List;
import java.util.Timer;
import java.util.TimerTask;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.chukwa.ChukwaArchiveKey;
import org.apache.hadoop.chukwa.Chunk;
import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
import org.apache.hadoop.chukwa.datacollection.writer.ChukwaWriter;
import org.apache.hadoop.chukwa.datacollection.writer.PipelineableWriter;
import org.apache.hadoop.chukwa.datacollection.writer.WriterException;
import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessor;
import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.MapProcessorFactory;
import org.apache.hadoop.chukwa.extraction.demux.processor.mapper.UnknownRecordTypeException;
import org.apache.hadoop.chukwa.extraction.demux.Demux;
import org.apache.hadoop.chukwa.util.ClassUtils;
import org.apache.hadoop.chukwa.util.DaemonWatcher;
import org.apache.hadoop.chukwa.util.ExceptionUtil;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.client.HTablePool;
import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Table;
import org.apache.hadoop.chukwa.datacollection.writer.hbase.Annotation.Tables;
import org.apache.log4j.Logger;

public class HBaseWriter extends PipelineableWriter {
  static Logger log = Logger.getLogger(HBaseWriter.class);
  boolean reportStats;
  volatile long dataSize = 0;
  final Timer statTimer;
  private OutputCollector output;
  private Reporter reporter;
  private ChukwaConfiguration conf;
  String defaultProcessor;
  private HTablePool pool;
  private Configuration hconf;
 
  private class StatReportingTask extends TimerTask {
    private long lastTs = System.currentTimeMillis();
    private long lastDataSize = 0;

    public void run() {
      long time = System.currentTimeMillis();
      long interval = time - lastTs;
      lastTs = time;

      long ds = dataSize;
      long dataRate = 1000 * (ds - lastDataSize) / interval; // bytes/sec
      // refers only to data field, not including http or chukwa headers
      lastDataSize = ds;

      log.info("stat=HBaseWriter|dataRate="
          + dataRate);
    }
  };

  public HBaseWriter() {
    this(true);
  }

  public HBaseWriter(boolean reportStats) {
    /* HBase Version >= 0.89.x */
    this(reportStats, new ChukwaConfiguration(), HBaseConfiguration.create());
  }

  public HBaseWriter(ChukwaConfiguration conf, Configuration hconf) {
    this(true, conf, hconf);
  }

  private HBaseWriter(boolean reportStats, ChukwaConfiguration conf, Configuration hconf) {
    this.reportStats = reportStats;
    this.conf = conf;
    this.hconf = hconf;
    this.statTimer = new Timer();
    this.defaultProcessor = conf.get(
      "chukwa.demux.mapper.default.processor",
      "org.apache.hadoop.chukwa.extraction.demux.processor.mapper.DefaultProcessor");
    Demux.jobConf = conf;
    log.info("hbase.zookeeper.quorum: " + hconf.get("hbase.zookeeper.quorum"));
  }

  public void close() {
    if (reportStats) {
      statTimer.cancel();
    }
  }

  public void init(Configuration conf) throws WriterException {
    if (reportStats) {
      statTimer.schedule(new StatReportingTask(), 1000, 10 * 1000);
    }
    output = new OutputCollector();
    reporter = new Reporter();
    if(conf.getBoolean("hbase.writer.verify.schema", false)) {
      verifyHbaseSchema();     
    }
    pool = new HTablePool(hconf, 60);
  }

  private boolean verifyHbaseTable(HBaseAdmin admin, Table table) {
    boolean status = false;
    try {
      if(admin.tableExists(table.name())) {
        HTableDescriptor descriptor = admin.getTableDescriptor(table.name().getBytes());
        HColumnDescriptor[] columnDescriptors = descriptor.getColumnFamilies();
        for(HColumnDescriptor cd : columnDescriptors) {
          if(cd.getNameAsString().equals(table.columnFamily())) {
            log.info("Verified schema - table: "+table.name()+" column family: "+table.columnFamily());
            status = true;
          }
        }
      } else {
        throw new Exception("HBase table: "+table.name()+ " does not exist.");
      }
    } catch(Exception e) {
      log.error(ExceptionUtil.getStackTrace(e));
      status = false;
    }
    return status;   
  }
 
  private void verifyHbaseSchema() {
    log.debug("Verify Demux parser with HBase schema");
    boolean schemaVerified = true;
    try {
      HBaseAdmin admin = new HBaseAdmin(hconf);
      List<Class> demuxParsers = ClassUtils.getClassesForPackage(conf.get("hbase.demux.package"));
      for(Class<?> x : demuxParsers) {
        if(x.isAnnotationPresent(Tables.class)) {
          Tables list = x.getAnnotation(Tables.class);
          for(Table table : list.annotations()) {
            if(!verifyHbaseTable(admin, table)) {
              schemaVerified = false;
              log.warn("Validation failed - table: "+table.name()+" column family: "+table.columnFamily()+" does not exist.");             
            }
          }
        } else if(x.isAnnotationPresent(Table.class)) {
          Table table = x.getAnnotation(Table.class);
          if(!verifyHbaseTable(admin, table)) {
            schemaVerified = false;
            log.warn("Validation failed - table: "+table.name()+" column family: "+table.columnFamily()+" does not exist.");
          }
        }
      }
    } catch (Exception e) {
      schemaVerified = false;
      log.error(ExceptionUtil.getStackTrace(e));
    }
    if(!schemaVerified) {
      log.error("Hbase schema mismatch with demux parser.");
      if(conf.getBoolean("hbase.writer.halt.on.schema.mismatch", true)) {
        log.error("Exiting...");
        DaemonWatcher.bailout(-1);
      }
    }
  }

  @Override
  public CommitStatus add(List<Chunk> chunks) throws WriterException {
    CommitStatus rv = ChukwaWriter.COMMIT_OK;
    try {
      for(Chunk chunk : chunks) {
        synchronized (this) {
          try {
            Table table = findHBaseTable(chunk.getDataType());

            if(table!=null) {
              HTableInterface hbase = pool.getTable(table.name().getBytes());
              MapProcessor processor = getProcessor(chunk.getDataType());
              processor.process(new ChukwaArchiveKey(), chunk, output, reporter);

              hbase.put(output.getKeyValues());
              pool.putTable(hbase);
            }
          } catch (Exception e) {
            log.warn(output.getKeyValues());
            log.warn(ExceptionUtil.getStackTrace(e));
          }
          dataSize += chunk.getData().length;
          output.clear();
          reporter.clear();
        }
      }
    } catch (Exception e) {
      log.error(ExceptionUtil.getStackTrace(e));
      throw new WriterException("Failed to store data to HBase.");
    }   
    if (next != null) {
      rv = next.add(chunks); //pass data through
    }
    return rv;
  }

  public Table findHBaseTable(String dataType) throws UnknownRecordTypeException {
    MapProcessor processor = getProcessor(dataType);

    Table table = null;
    if(processor.getClass().isAnnotationPresent(Table.class)) {
      return processor.getClass().getAnnotation(Table.class);
    } else if(processor.getClass().isAnnotationPresent(Tables.class)) {
      Tables tables = processor.getClass().getAnnotation(Tables.class);
      for(Table t : tables.annotations()) {
        table = t;
      }
    }

    return table;
  }

  public String findHBaseColumnFamilyName(String dataType)
          throws UnknownRecordTypeException {
    Table table = findHBaseTable(dataType);
    return table.columnFamily();
  }

  private MapProcessor getProcessor(String dataType) throws UnknownRecordTypeException {
    String processorClass = findProcessor(conf.get(dataType, defaultProcessor), defaultProcessor);
    return MapProcessorFactory.getProcessor(processorClass);
  }

  /**
   * Look for mapper parser class in the demux configuration.
   * Demux configuration has been changed since CHUKWA-581 to
   * support mapping of both mapper and reducer, and this utility
   * class is to detect the mapper class and return the mapper
   * class only.
   *
   */
  private String findProcessor(String processors, String defaultProcessor) {
    if(processors.startsWith(",")) {
      // No mapper class defined.
      return defaultProcessor;
    } else if(processors.contains(",")) {
      // Both mapper and reducer defined.
      String[] parsers = processors.split(",");
      return parsers[0];
    }
    // No reducer defined.
    return processors;
  }
}
TOP

Related Classes of org.apache.hadoop.chukwa.datacollection.writer.hbase.HBaseWriter$StatReportingTask

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.