Package org.apache.hadoop.chukwa.database

Source Code of org.apache.hadoop.chukwa.database.TestDatabaseIostat

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.chukwa.database;


import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.FilenameFilter;
import java.io.IOException;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.chukwa.conf.ChukwaConfiguration;
import org.apache.hadoop.chukwa.database.TableCreator;

import org.apache.hadoop.chukwa.datacollection.DataFactory;
import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent;
import org.apache.hadoop.chukwa.datacollection.agent.ChukwaAgent.AlreadyRunningException;
import org.apache.hadoop.chukwa.datacollection.connector.http.HttpConnector;
import org.apache.hadoop.chukwa.datacollection.collector.CaptureWriter;
import org.apache.hadoop.chukwa.datacollection.collector.servlet.ServletCollector;
import org.apache.hadoop.chukwa.datacollection.controller.ChukwaAgentController;
import org.apache.hadoop.chukwa.datacollection.sender.ChukwaHttpSender;
import org.apache.hadoop.chukwa.datacollection.sender.RetryListOfCollectors;
import org.apache.hadoop.chukwa.datacollection.test.ConsoleOutConnector;
import org.apache.hadoop.chukwa.datacollection.writer.PipelineStageWriter;
import org.apache.hadoop.chukwa.dataloader.MetricDataLoader;
import org.apache.hadoop.conf.Configuration;
import org.mortbay.jetty.Connector;
import org.mortbay.jetty.Server;
import org.mortbay.jetty.nio.SelectChannelConnector;
import org.mortbay.jetty.servlet.Context;
import org.mortbay.jetty.servlet.ServletHolder;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobPriority;
import org.apache.hadoop.mapred.MiniMRCluster;
import org.apache.hadoop.mapred.SequenceFileInputFormat;
import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.chukwa.ChukwaArchiveKey;
import org.apache.hadoop.chukwa.ChunkImpl;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.chukwa.extraction.demux.ChukwaRecordOutputFormat;
import org.apache.hadoop.chukwa.extraction.demux.ChukwaRecordPartitioner;
import org.apache.hadoop.chukwa.extraction.demux.Demux;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.chukwa.util.DatabaseWriter;
import org.apache.hadoop.chukwa.util.ExceptionUtil;
import org.apache.hadoop.chukwa.database.Macro;

import junit.framework.TestCase;

public class TestDatabaseIostat extends TestCase {
  private static Log log = LogFactory.getLog(TestDatabaseIostat.class);
  private HttpConnector conn = null;
  private ChukwaAgent agent = null;
  private int agentPort = 9093;
  private int collectorPort = 9990;
  private Server jettyCollector = null;
  private ChukwaHttpSender sender = null;
  private MiniDFSCluster dfs = null;
  private FileSystem fileSys = null;
  private MiniMRCluster mr = null;
  int NUM_HADOOP_SLAVES = 4;
  int LINES = 10000;
  int THREADS = 2;
  private static final String dataSink = "/demux/input";
  private static Path DEMUX_INPUT_PATH = null;
  private static Path DEMUX_OUTPUT_PATH = null;
  private ChukwaConfiguration conf = new ChukwaConfiguration();
  private static SimpleDateFormat day = new java.text.SimpleDateFormat("yyyyMMdd_HH_mm");
  private static String cluster = "demo";
  long[] timeWindow = {7, 30, 91, 365, 3650};
  long current = 1244617200000L// 2009-06-10
 
  public void setUp() {
   
    // Startup HDFS Cluster
    try {
      dfs = new MiniDFSCluster(conf, NUM_HADOOP_SLAVES, true,
          null);
      fileSys = dfs.getFileSystem();
      DEMUX_INPUT_PATH = new Path(fileSys.getUri().toString()+File.separator+dataSink);         
      DEMUX_OUTPUT_PATH = new Path(fileSys.getUri().toString()+File.separator+"/demux/output");
    } catch(Exception e) {
      e.printStackTrace();
      fail("Fail to startup HDFS cluster.");     
    }

    // Startup MR Cluster
    try {
      System.setProperty("hadoop.log.dir", System.getProperty(
          "test.build.data", "/tmp"));
      mr = new MiniMRCluster(NUM_HADOOP_SLAVES, fileSys.getUri()
          .toString(), 1);
    } catch(Exception e) {
      fail("Fail to startup Map/reduce cluster.");
    }
   
    // Startup Collector
    try {
      // Configure Collector
      conf.set("chukwaCollector.chunkSuppressBufferSize", "10");
      conf.set("writer.hdfs.filesystem",fileSys.getUri().toString());
      conf.set("chukwaCollector.outputDir",dataSink);
      conf.set("chukwaCollector.rotateInterval", "10000");
     
      // Set up jetty connector
      SelectChannelConnector jettyConnector = new SelectChannelConnector();
      jettyConnector.setLowResourcesConnections(THREADS-1);
      jettyConnector.setLowResourceMaxIdleTime(1500);
      jettyConnector.setPort(collectorPort);
     
      // Set up jetty server proper, using connector
      jettyCollector = new Server(collectorPort);
      Context root = new Context(jettyCollector, "/", Context.SESSIONS);
      root.addServlet(new ServletHolder(new ServletCollector(conf)), "/*");
      jettyCollector.start();
      jettyCollector.setStopAtShutdown(true);
      Thread.sleep(10000);
    } catch(Exception e) {
      fail("Fail to startup collector.");
    }
   
    // Startup Agent
    try {
      // Configure Agent
      conf.set("chukwaAgent.tags", "cluster=\"demo\"");
      DataFactory.getInstance().addDefaultTag(conf.get("chukwaAgent.tags", "cluster=\"unknown\""));
      conf.set("chukwaAgent.checkpoint.dir", System.getenv("CHUKWA_DATA_DIR")+File.separator+"tmp");
      conf.set("chukwaAgent.checkpoint.interval", "10000");
      int portno = conf.getInt("chukwaAgent.control.port", agentPort);
      agent = new ChukwaAgent(conf);
      conn = new HttpConnector(agent, "http://localhost:"+collectorPort+"/chukwa");
      conn.start();     
      sender = new ChukwaHttpSender(conf);
      ArrayList<String> collectorList = new ArrayList<String>();
      collectorList.add("http://localhost:"+collectorPort+"/chukwa");
      sender.setCollectors(new RetryListOfCollectors(collectorList, conf));
    } catch (AlreadyRunningException e) {
      fail("Chukwa Agent is already running");
    }
     
    System.setProperty("CLUSTER","demo");
    DatabaseWriter db = new DatabaseWriter(cluster);
    String buffer = "";
    File aFile = new File(System.getenv("CHUKWA_CONF_DIR")
                 + File.separator + "database_create_tables.sql");
    buffer = readFile(aFile);
    String tables[] = buffer.split(";");
    for(String table : tables) {
      if(table.length()>5) {
        try {
          db.execute(table);
        } catch (Exception e) {
          fail("Fail to retrieve meta data for database table: "+table);
        }
      }
    }
    db.close();
    for(int i=0;i<timeWindow.length;i++) {
      TableCreator tc = new TableCreator();
      long start = current;
      long end = current + (timeWindow[i]*1440*60*1000);
      try {
        tc.createTables(start, end);
      } catch (Exception e) {
        fail("Fail to create database tables."+ExceptionUtil.getStackTrace(e));
      }
    }   
  }

  public String readFile(File aFile) {
    StringBuffer contents = new StringBuffer();
    try {
      BufferedReader input = new BufferedReader(new FileReader(aFile));
      try {
        String line = null; // not declared within while loop
        while ((line = input.readLine()) != null) {
          contents.append(line);
          contents.append(System.getProperty("line.separator"));
        }
      } finally {
        input.close();
      }
    } catch (IOException ex) {
      ex.printStackTrace();
    }
    return contents.toString();
  }
 
  public void tearDown() {
    try {
      agent.shutdown();
      conn.shutdown();
      jettyCollector.stop();
      mr.shutdown();
      dfs.shutdown();
      Thread.sleep(2000);
    } catch(Exception e) {
      e.printStackTrace();
      fail(e.toString());
    }
    DatabaseWriter db = null;
    try {
      db = new DatabaseWriter(cluster);
      ResultSet rs = db.query("show tables");
      ArrayList<String> list = new ArrayList<String>();
      while(rs.next()) {
        String table = rs.getString(1);
        list.add(table);
      }
      for(String table : list) {
        db.execute("drop table "+table);
      }
    } catch(Throwable ex) {
    } finally {
      if(db!=null) {
        db.close();
      }
    }
  }
 
  public void testChukwaFramework() {
    try {
      // Test Chukwa Agent Controller and Agent Communication
      ChukwaAgentController cli = new ChukwaAgentController("localhost", agentPort);
      String[] source = new File(System.getenv("CHUKWA_DATA_DIR") + File.separator + "log").list(new FilenameFilter() {
        public boolean accept(File dir, String name) {
          return name.endsWith(".log");
        }
      });

      for(String fname : source) {
        StringBuilder fullPath = new StringBuilder();
        fullPath.append(System.getenv("CHUKWA_DATA_DIR"));
        fullPath.append(File.separator);
        fullPath.append("log");
        fullPath.append(File.separator);       
        fullPath.append(fname);
        String recordType = fname.substring(0,fname.indexOf("."));
        String adaptorId = cli.add(
          "org.apache.hadoop.chukwa.datacollection.adaptor.filetailer.CharFileTailingAdaptorUTF8NewLineEscaped",
          recordType, "0 " + fullPath.toString(), 0);
        assertNotNull(adaptorId);
        Thread.sleep(2000);
      }
      cli.removeAll();
      Thread.sleep(30000);
    } catch (Exception e) {
      e.printStackTrace();
      fail(e.toString());
    }
   
   
    // Test Data Sink files written by Collector   
    Path demuxDir = new Path(dataSink+"/*");
    FileSystem fs;
    try {
      fs = dfs.getFileSystem();
      FileStatus[] events = fs.globStatus(demuxDir);
      log.info("Number of data sink files written:"+events.length);
      assertTrue(events.length!=0);
    } catch (IOException e) {
      e.printStackTrace();
      fail("File System Error.");
    }
   
    // Test Demux   
    log.info("Testing demux");
    try {
      //ChukwaConfiguration conf = new ChukwaConfiguration();
      System.setProperty("hadoop.log.dir", System.getProperty(
          "test.build.data", "/tmp"));
   
      String[] sortArgs = { DEMUX_INPUT_PATH.toString(), DEMUX_OUTPUT_PATH.toString() };
//      JobConf job = mr.createJobConf();
      JobConf job = new JobConf(new ChukwaConfiguration(), Demux.class);
      job.addResource(System.getenv("CHUKWA_CONF_DIR")+File.separator+"chukwa-demux-conf.xml");
      job.setJobName("Chukwa-Demux_" + day.format(new Date()));
      job.setInputFormat(SequenceFileInputFormat.class);
      job.setMapperClass(Demux.MapClass.class);
      job.setPartitionerClass(ChukwaRecordPartitioner.class);
      job.setReducerClass(Demux.ReduceClass.class);

      job.setOutputKeyClass(ChukwaRecordKey.class);
      job.setOutputValueClass(ChukwaRecord.class);
      job.setOutputFormat(ChukwaRecordOutputFormat.class);
      job.setJobPriority(JobPriority.VERY_HIGH);
      job.setNumMapTasks(2);
      job.setNumReduceTasks(1);
      Path input = new Path(fileSys.getUri().toString()+File.separator+dataSink+File.separator+"*.done");
      FileInputFormat.setInputPaths(job, input);
      FileOutputFormat.setOutputPath(job, DEMUX_OUTPUT_PATH);
      String[] jars = new File(System.getenv("CHUKWA_HOME")).list(new FilenameFilter() {
        public boolean accept(File dir, String name) {
          return name.endsWith(".jar");
        }
      });
      job.setJar(System.getenv("CHUKWA_HOME")+File.separator+jars[0]);
      //assertEquals(ToolRunner.run(job, new Demux(), sortArgs), 0);
      JobClient.runJob(job);
    } catch (Exception e) {
      fail(e.toString());
    }

    // Test DataLoader
    try {
      fs = dfs.getFileSystem();
      Path outputPath = new Path(DEMUX_OUTPUT_PATH.toString()+File.separator+"/*/*/*.evt");
      FileStatus[] demuxOutput = fs.globStatus(outputPath);
      log.info("Number of chukwa records files written:"+demuxOutput.length);
      assertTrue(demuxOutput.length!=0);
      for(FileStatus fname : demuxOutput) {
        MetricDataLoader mdl = new MetricDataLoader(conf, fs, fname.getPath().toUri().toString());
        mdl.call();
      }
    } catch (IOException e) {
      e.printStackTrace();
      fail("Metric Data Loader Error.");
    }   
   
    // Verify Data
    DatabaseWriter db = null;
    try {
      db = new DatabaseWriter(cluster);
      Macro mp = new Macro(current,current, "select * from [system_metrics]");
      String query = mp.toString();
      ResultSet rs = db.query(query);
      ResultSetMetaData rmeta = rs.getMetaData();
      int size = rmeta.getColumnCount();
      while(rs.next()) {
        for(int i=1;i<=size;i++) {
          int columnType = rmeta.getColumnType(i);
          if(columnType==java.sql.Types.BIGINT ||
             columnType==java.sql.Types.INTEGER) {
            long testValue = rs.getLong(i);
            assertTrue(testValue<1000000000L);
          } else if(columnType==java.sql.Types.FLOAT ||
              columnType==java.sql.Types.DOUBLE) {
            double testValue = rs.getDouble(i);
            assertTrue(testValue<1000000000L);
          }
        }
      }
    } catch(Throwable ex) {
      fail("Data verification failed.");
    } finally {
      if(db!=null) {
        db.close();
      }
    }     
   
  }

}
TOP

Related Classes of org.apache.hadoop.chukwa.database.TestDatabaseIostat

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.