Package org.apache.hadoop.contrib.failmon

Source Code of org.apache.hadoop.contrib.failmon.LocalStore

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.contrib.failmon;

import java.io.BufferedOutputStream;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.FileWriter;
import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.zip.CRC32;
import java.util.zip.CheckedOutputStream;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

/**********************************************************
* This class takes care of the temporary local storage of
* gathered metrics before they get uploaded into HDFS. It writes
* Serialized Records as lines in a temporary file and then
* compresses and uploads it into HDFS.
*
**********************************************************/

public class LocalStore {

  public final static char FIELD_SEPARATOR = '|';

  public final static char RECORD_SEPARATOR = '\n';

  public final static String COMPRESSION_SUFFIX = ".zip";

  public final static int UPLOAD_INTERVAL = 600;

  String filename;
  String hdfsDir;

  boolean compress;

  FileWriter fw;

  BufferedWriter writer;

  /**
   * Create an instance of the class and read the configuration
   * file to determine some output parameters. Then, initiate the
   * structured needed for the buffered I/O (so that smal appends
   * can be handled efficiently).
   *
   */

  public LocalStore() {
    // determine the local output file name
    if (Environment.getProperty("local.tmp.filename") == null)
      Environment.setProperty("local.tmp.filename", "failmon.dat");
   
    // local.tmp.dir has been set by the Executor
    if (Environment.getProperty("local.tmp.dir") == null)
      Environment.setProperty("local.tmp.dir", System.getProperty("java.io.tmpdir"));
   
    filename = Environment.getProperty("local.tmp.dir") + "/" +
      Environment.getProperty("local.tmp.filename");

    // determine the upload location
    hdfsDir = Environment.getProperty("hdfs.upload.dir");
    if (hdfsDir == null)
      hdfsDir = "/failmon";

    // determine if compression is enabled
    compress = true;
    if ("false".equalsIgnoreCase(Environment
        .getProperty("local.tmp.compression")))
      compress = false;

    try {
      fw = new FileWriter(filename, true);
      writer = new BufferedWriter(fw);
    } catch (IOException e) {
      e.printStackTrace();
    }
  }

  /**
   * Insert an EventRecord to the local storage, after it
   * gets serialized and anonymized.
   *
   * @param er the EventRecord to be inserted
   */
 
  public void insert(EventRecord er) {
    SerializedRecord sr = new SerializedRecord(er);
    try {
      Anonymizer.anonymize(sr);
    } catch (Exception e) {
      e.printStackTrace();
    }
    append(sr);
  }

  /**
   * Insert an array of EventRecords to the local storage, after they
   * get serialized and anonymized.
   *
   * @param ers the array of EventRecords to be inserted
   */
  public void insert(EventRecord[] ers) {
    for (EventRecord er : ers)
      insert(er);
  }

  private void append(SerializedRecord sr) {
    try {
      writer.write(pack(sr).toString());
      writer.write(RECORD_SEPARATOR);
      // writer.flush();
    } catch (IOException e) {
      e.printStackTrace();
    }
  }

  /**
   * Pack a SerializedRecord into an array of bytes
   * <p>
   * This method is deprecated.
   * @param sr the SerializedRecord to be packed
   * @return Packed representation fo the Serialized Record
   * @see #packConcurrent(SerializedRecord)
   * @deprecated
   */
  public static StringBuffer pack(SerializedRecord sr) {
    return new StringBuffer(packConcurrent(sr));
  }

  /**
   * Pack a SerializedRecord into an array of bytes
   *
   * @param sr the SerializedRecord to be packed
   * @return Packed representation fo the Serialized Record
   */
  public static CharSequence packConcurrent(SerializedRecord sr) {
    StringBuilder sb = new StringBuilder();

    ArrayList<String> keys = new ArrayList<String>(sr.fields.keySet());

    if (sr.isValid())
      SerializedRecord.arrangeKeys(keys);

    for (int i = 0; i < keys.size(); i++) {
      String value = sr.fields.get(keys.get(i));
      sb.append(keys.get(i) + ":" + value);
      sb.append(FIELD_SEPARATOR);
    }
    return sb;
  }
 
  /**
   * Upload the local file store into HDFS, after it
   * compressing it. Then a new local file is created
   * as a temporary record store.
   *
   */
  public void upload() {
    try {
      writer.flush();
      if (compress)
        zipCompress(filename);
      String remoteName = "failmon-";
      if ("true".equalsIgnoreCase(Environment.getProperty("anonymizer.hash.hostnames")))
        remoteName += Anonymizer.getMD5Hash(InetAddress.getLocalHost().getCanonicalHostName()) + "-";
      else
        remoteName += InetAddress.getLocalHost().getCanonicalHostName() + "-";
      remoteName += Calendar.getInstance().getTimeInMillis();//.toString();
      if (compress)
  copyToHDFS(filename + COMPRESSION_SUFFIX, hdfsDir + "/" + remoteName + COMPRESSION_SUFFIX);
      else
  copyToHDFS(filename, hdfsDir + "/" + remoteName);
    } catch (IOException e) {
      e.printStackTrace();
    }

    // delete and re-open
    try {
      fw.close();
      fw = new FileWriter(filename);
      writer = new BufferedWriter(fw);
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
 
  /**
   * Compress a text file using the ZIP compressing algorithm.
   *
   * @param filename the path to the file to be compressed
   */
  public static void zipCompress(String filename) throws IOException {
    FileOutputStream fos = new FileOutputStream(filename + COMPRESSION_SUFFIX);
    CheckedOutputStream csum = new CheckedOutputStream(fos, new CRC32());
    ZipOutputStream out = new ZipOutputStream(new BufferedOutputStream(csum));
    out.setComment("Failmon records.");

    BufferedReader in = new BufferedReader(new FileReader(filename));
    out.putNextEntry(new ZipEntry(new File(filename).getName()));
    int c;
    while ((c = in.read()) != -1)
      out.write(c);
    in.close();

    out.finish();
    out.close();
  }

  /**
   * Copy a local file to HDFS
   *
   * @param localFile the filename of the local file
   * @param hdfsFile the HDFS filename to copy to
   */
  public static void copyToHDFS(String localFile, String hdfsFile) throws IOException {

    String hadoopConfPath;

    if (Environment.getProperty("hadoop.conf.path") == null)
      hadoopConfPath = "../../../conf";
    else
      hadoopConfPath = Environment.getProperty("hadoop.conf.path");

    // Read the configuration for the Hadoop environment
    Configuration hadoopConf = new Configuration();
    hadoopConf.addResource(new Path(hadoopConfPath + "/hadoop-default.xml"));
    hadoopConf.addResource(new Path(hadoopConfPath + "/hadoop-site.xml"));

    // System.out.println(hadoopConf.get("hadoop.tmp.dir"));
    // System.out.println(hadoopConf.get("fs.default.name"));
    FileSystem fs = FileSystem.get(hadoopConf);

    // HadoopDFS deals with Path
    Path inFile = new Path("file://" + localFile);
    Path outFile = new Path(hadoopConf.get("fs.default.name") + hdfsFile);

     // Read from and write to new file
    Environment.logInfo("Uploading to HDFS (file " + outFile + ") ...");
    fs.copyFromLocalFile(false, inFile, outFile);
  }

  /**
   * Close the temporary local file
   *
   */
  public void close() {
    try {
    writer.flush();
    writer.close();
    } catch (IOException e) {
      e.printStackTrace();
    }
  }
}
TOP

Related Classes of org.apache.hadoop.contrib.failmon.LocalStore

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.