Package org.activeio.journal

Source Code of org.activeio.journal.JournalPerfToolSupport

/**
*
* Copyright 2004 Hiram Chirino
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.activeio.journal;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Random;

import org.activeio.packet.ByteArrayPacket;

/**
* Provides the base class uses to run performance tests against a Journal.
* Should be subclassed to customize for specific journal implementation.
*
* @version $Revision: 1.1 $
*/
abstract public class JournalPerfToolSupport implements JournalEventListener {

  private JournalStatsFilter journal;
  private Random random = new Random();
  private byte data[];
  private int workerCount=0
  private PrintWriter statWriter;
  // Performance test Options
 
  // The output goes here:
  protected File journalDirectory = new File("journal-logs");
  protected File statCSVFile = new File("stats.csv");;

  // Controls how often we start a new batch of workers.
  protected int workerIncrement=20;
  protected long incrementDelay=1000*20;
  protected boolean verbose=true;

  // Worker configuration.
  protected int recordSize=1024;
  protected int syncFrequency=15
  protected int workerThinkTime=100;

    private final class Worker implements Runnable {
    public void run() {
      int i=random.nextInt()%syncFrequency;
      while(true) {
        boolean sync=false;
       
        if( syncFrequency>=0 && (i%syncFrequency)==0 ) {
          sync=true;
        }       
        try {
          journal.write(new ByteArrayPacket(data), sync);
          Thread.sleep(workerThinkTime);
        } catch (Exception e) {
          e.printStackTrace();
          return;
        }
        i++;           
      }         
    }
  } 
 
    /**
     * @throws IOException
   *
   */
  protected void exec() throws Exception {
   
    System.out.println("Client threads write records using: Record Size: "+recordSize+", Sync Frequency: "+syncFrequency+", Worker Think Time: "+workerThinkTime);

    // Create the record and fill it with some values.
    data = new byte[recordSize];
    for (int i = 0; i < data.length; i++) {
      data[i] = (byte)i;
    }
   
    if( statCSVFile!=null ) {
      statWriter = new PrintWriter(new FileOutputStream(statCSVFile));
      statWriter.println("Threads,Throughput (k/s),Forcd write latency (ms),Throughput (records/s)");
    }
   
        if( journalDirectory.exists() ) {
          deleteDir(journalDirectory);
        }   
        journal = new JournalStatsFilter(createJournal()).enableDetailedStats(verbose);
        journal.setJournalEventListener(this);
   
        try {         
         
          // Wait a little to see the worker affect the stats.
          // Increment the number of workers every few seconds.
          while(true) {
            System.out.println("Starting "+workerIncrement+" Workers...");
              for(int i=0;i <workerIncrement;i++) {
                  new Thread(new Worker()).start();
                  workerCount++;
              }
                   
              // Wait a little to see the worker affect the stats.
              System.out.println("Waiting "+(incrementDelay/1000)+" seconds before next Stat sample.");
              Thread.sleep(incrementDelay);
              displayStats();
              journal.reset();
          }
         
         
        } finally {
          journal.close();
        }
  }

  private void displayStats() {   
    System.out.println("Stats at "+workerCount+" workers.");
    System.out.println(journal);         
    if( statWriter!= null ) {
      statWriter.println(""+workerCount+","+journal.getThroughputKps()+","+journal.getAvgSyncedLatencyMs()+","+journal.getThroughputRps());
      statWriter.flush();
    }
  }

  /**
   * @return
   */
  abstract public Journal createJournal() throws Exception;

  static private void deleteDir(File f) {
    File[] files = f.listFiles();
    for (int i = 0; i < files.length; i++) {
      File file = files[i];
      file.delete();
    }
    f.delete();
  }
   
   
  public void overflowNotification(RecordLocation safeLocation) {
    try {
      System.out.println("Mark set: "+safeLocation);
      journal.setMark(safeLocation, false);
    } catch (InvalidRecordLocationException e) {
      e.printStackTrace();
    } catch (IOException e) {
      e.printStackTrace();
    }   
  }
}
TOP

Related Classes of org.activeio.journal.JournalPerfToolSupport

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.