Package com.sun.faban.driver.engine

Source Code of com.sun.faban.driver.engine.AgentThread

/* The contents of this file are subject to the terms
* of the Common Development and Distribution License
* (the License). You may not use this file except in
* compliance with the License.
*
* You can obtain a copy of the License at
* http://www.sun.com/cddl/cddl.html or
* install_dir/legal/LICENSE
* See the License for the specific language governing
* permission and limitations under the License.
*
* When distributing Covered Code, include this CDDL
* Header Notice in each file and include the License file
* at install_dir/legal/LICENSE.
* If applicable, add the following below the CDDL Header,
* with the fields enclosed by brackets [] replaced by
* your own identifying information:
* "Portions Copyrighted [year] [name of copyright owner]"
*
* $Id$
*
* Copyright 2005-2009 Sun Microsystems Inc. All Rights Reserved
*/
package com.sun.faban.driver.engine;

import com.sun.faban.driver.BenchmarkOperation;
import com.sun.faban.driver.FatalException;
import com.sun.faban.driver.Timing;
import com.sun.faban.driver.ExpectedException;
import com.sun.faban.driver.util.Random;
import com.sun.faban.driver.util.Timer;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.concurrent.TimeUnit;
import java.io.InterruptedIOException;


/**
* Abstract superclass for all driver threads. It provides a factory for
* instantiating the right implementation depending on the driver specification.
* Subclasses execute the provided driver, keeps track of response times,
* think times, transaction counts, etc.
*
* @author Akara Sucharitakul
*/
public abstract class AgentThread extends Thread {

  /**
   * The various run states for an {@link AgentThread}.
   *
   * @author ncampbell
   */
  public static enum RunState {
    /* !!!
     * Please note the order is significant for this enumeration.  A
     * Enum#compareTo is used which is based on the order of these RunStates.
     * !!!
     */

    /**
     * The {@link AgentThread} thread has not started.
     */
      NOT_STARTED,
      /**
       * The {@link AgentThread} has initialized.
       */
      INITIALIZING,
      /**
       * The {@link AgentThread} is executing {@link BenchmarkOperation}s to warm
       * the SUT.
       */
      PRE_RUN,
      /**
       * The {@link AgentThread} is running in steady state.
       */
      RUNNING,
      /**
       * The {@link AgentThread} is doing a post run.
       */
      POST_RUN,
      /**
       *  The {@link AgentThread} has completed.
       */
      ENDED
  }

    String type;
    String name;
    int id;
    int currentOperation = -1; // Global index into the current operation.
    int[] previousOperation; // Index into the previous operation.
    int mixId = 0; // 0 for foreground and 1 for background
    Mix.Selector[] selector; // The selector array, size 1 if no bg, 2 if bg

    DriverContext driverContext;
    Metrics metrics;
    Random random = new Random();
    Timer timer;
    AgentImpl agent;
    RunInfo.DriverConfig driverConfig;
    Class<?> driverClass;
    Object driver;
    boolean inRamp = true; // indicator for rampup or rampdown, initially true
    long[] delayTime;  // recently calculated cycle times
    long[] startTime; // start times for previous tx
    long[] endTime; // end time for the recent tx ended

    private RunState threadState = RunState.NOT_STARTED;

    Logger logger;
    String className;
    long endRampUp = Long.MAX_VALUE;
    long endStdyState = Long.MAX_VALUE;
    long endRampDown = Long.MAX_VALUE;
    int cycleCount = 0; // The cycles executed so far

    /** Run configuration from the Master. */
    RunInfo runInfo;

    boolean startTimeSet = false;

    boolean stopped = false;

    /** Constant value for specifying that the time is not set. */
    public static final long TIME_NOT_SET = Long.MIN_VALUE;

    /**
     * Factory method for instantiating the right type of AgentThread.
     * @param type The type of this agent
     * @param agentId The display id of this agent
     * @param id The id of this agent
     * @param driverClass The driver class
     * @param timer The timer object reference
     * @param agent The agent calling this thread
     * @return An instance of the AgentThread subclass.
     */
    public static AgentThread getInstance(String type, String agentId, int id,
                                Class<?> driverClass, Timer timer,
                                AgentImpl agent) {
        RunInfo.DriverConfig driverConfig = RunInfo.getInstance().driverConfig;
        AgentThread agentThread = null;
        switch (driverConfig.runControl) {
           case TIME : if (driverConfig.mix[1] != null) {
      agentThread = new TimeThreadWithBackground();
    } else {
      agentThread = new TimeThread();
    }
                       break;
           case CYCLES : agentThread = new CycleThread();
        }

        agentThread.configure(type, agentId, id, driverClass, timer, agent);
        return agentThread;
    }

    /**
     * Configures this AgentThread.
     *
     * @param type The type of this agent
     * @param agentId The display id of this agent
     * @param id The id of this agent
     * @param driverClass The driver class
     * @param timer The timer object reference
     * @param agent The agent calling this thread
     */
    private void configure(String type, String agentId, int id,
                                Class<?> driverClass, Timer timer,
                                AgentImpl agent) {
        this.type = type;
        this.id = id;
        this.driverClass = driverClass;
        this.timer = timer;
        this.runInfo = RunInfo.getInstance();
        this.agent = agent;
        random = new Random(System.nanoTime() + hashCode());
        className = getClass().getName();
        driverConfig = runInfo.driverConfig;
        name = type + '[' + agentId + "]." + id;
        setName(name);
        logger = Logger.getLogger(className + '.' + id);
        metrics = new Metrics(this);
        initTimes();
    }

    /**
     *  Allocates and initializes the timing structures which is specific
     *  to the pseudo-thread dimensions.
     */
    abstract void initTimes();

    /**
     * Entry point for starting the thread. Subclasses do not override this
     * method but override doRun instead. Run implicitly calls doRun.
     */
    @Override
  public final void run() {
        try {
            setThreadState(RunState.INITIALIZING);
            doRun();
        } catch (FatalException e) {
            // A fatal exception thrown by the driver is already caught
            // in the run methods and logged there.
            // A fatal exception otherwise thrown by the run
            // methods signals termination of this thread.
            if (!e.wasLogged())  {
                Throwable t = e.getCause();
                if (t != null) {
          logger.log(Level.SEVERE, name + ": " + t.getMessage(), t);
        } else {
          logger.log(Level.SEVERE, name + ": " + e.getMessage(), e);
        }
                e.setLogged();
                agent.abortRun();
            }
        } catch (Throwable t) {
            logger.log(Level.SEVERE, name + ": " + t.getMessage(), t);
            agent.abortRun();
        } finally {
            postRun();
        }
    }

    /**
     * Each thread executes in the doRun method until the benchmark time is up
     * The main loop chooses a tx. type according to the mix specified in
     * the parameter file and calls the appropriate transaction
     * method to do the job.
      * The stats for the entire run are stored in a Metrics object
      * which is returned to the Agent via the getResult() method.
     * @see Metrics
     */
    abstract void doRun();

    /**
     * Checks for a fatal exception. This is called from the invocation loop.
     * @param e The throwable
     * @param op The operation
     */
    void checkFatal(Throwable e, BenchmarkDefinition.Operation op) {
        checkFatal(e, op.m);
    }

    /**
     * Checks for a fatal exception. This is called from the pre/post.
     * @param e The throwable
     * @param m The method
     */
    void checkFatal(Throwable e, Method m) {
        if (e instanceof FatalException) {
            FatalException fatal = (FatalException) e;
            e = fatal.getCause();
            if (e != null) {
        logger.log(Level.SEVERE, name + '.' + m.getName() +
                        ": " + e.getMessage(), e);
      } else {
        logger.log(Level.SEVERE, name + '.' + m.getName() +
                        ": " + fatal.getMessage(), fatal);
      }
            fatal.setLogged();
            agent.abortRun();
            throw fatal; // Also don't continue with current thread.
        }
    }

    /**
     * Logs the normal errors occuring in operations. The operation is marked
     * as failed.
     * @param e  The throwable received
     * @param op The operation being executed.
     */
    void logError(Throwable e, BenchmarkDefinition.Operation op) {
        String message = e.getMessage();
        if (message == null) { // Find the message in the highest level exception.
            Throwable t = e.getCause();
            while (message == null && t != null) {
                message = t.getMessage();
                t = t.getCause();
            }
        }
        if (message == null) // If still null, artificially create message.
            message = "Exception in operation.";
        message = name + "." + op.m.getName() + ": " + message;

        if (inRamp) {
      message += "\nNote: Error not counted in result." +
                    "\nEither transaction start or end time is not " +
                    "within steady state.";
    }
        Level level;
        if (e instanceof ExpectedException) {
      level = Level.FINER;
    } else {
      level = Level.WARNING;
    }
        logger.log(level, message, e);
    }

    private synchronized void setThreadState(RunState state) {
        threadState = state;
        notifyAll();
    }

    private synchronized boolean compareAndSetThreadState(RunState orig, RunState state) {
        boolean set = threadState == orig;
        if (set) {
            threadState = state;
            notifyAll();
        }
        return set;
    }


    /**
     * Obtains the state of the current thread.
     * @return The state of the current thread.
     */
    public synchronized RunState getThreadState() {
        return threadState;
    }

    /**
     * Waits for a given state of the thread to arrive.
     * @param state The state to wait for.
     */
    public synchronized void waitThreadState(RunState state) {
        while (threadState.compareTo(state) < 0) {
            try {
                wait(10000);
            } catch (InterruptedException e) {
              logger.log(Level.FINE, e.getMessage(), e);
            }
        }
    }

    /**
     * Executes the method market with @OnceBefore in thread 0.
     */
    void preRun() {
        // Thread 0 needs to do the preRun
        if (id == 0 && driverConfig.preRun != null) {
            setThreadState(RunState.PRE_RUN);
            logger.fine(name + ": Invoking preRun @OnceBefore");
            try {
                invokePrePost(driverConfig.preRun.m);
            } catch (InterruptedIOException e) {
                // Should not happen unless run is cancelled. And if so,
                // we don't really care to redo this.
            } finally {
                agent.preRunLatch.countDown();
            }
            try {
                while (!stopped) {
                    if (agent.startLatch.await(500, TimeUnit.MILLISECONDS))
                        break;
                }
            } catch (InterruptedException e) {
                logger.log(Level.WARNING, name +
                        ": Start latch await interrupted!");
            }
            agent.startLatch = null;
            logger.finest(name + ": Thread 0 got startLatch, now executing.");
        }
        setThreadState(RunState.RUNNING);
    }

    /**
     * Executes the method marked with @OnceAfter in thread 0.
     */
    void postRun() {
        if (id == 0 && driverConfig.postRun != null &&
                compareAndSetThreadState(RunState.RUNNING, RunState.POST_RUN)) {
            // Tell the world we're finished.
            agent.finishLatch.countDown();
            logger.finest(name +
                    ": Thread 0 finished, awaiting postRunLatch");

            // Then wait for a signal (everybody else finished) to run postrun.
            while (agent.postRunLatch.getCount() > 0l) {
        try {
                    while (!stopped) {
                        if (agent.postRunLatch.await(500,
                                TimeUnit.MILLISECONDS))
                            break;
                    }
                } catch (InterruptedException e) {
                  logger.log(Level.FINE, e.getMessage(), e);
                }
      }

            if (stopped)
                return;
            // We need to make sure this method is re-run if I/O is interrupted.
            // This may happen if terminate gets called while thread is
            // switching to POST_RUN state.
            boolean interrupted = false;
            logger.fine(name + ": Invoking postRun @OnceAfter");
            do {
                try {
                    invokePrePost(driverConfig.postRun.m);
                } catch (InterruptedIOException e) {
                    interrupted = true;
                }
            } while (interrupted);
            logger.finest(name + ": Thread 0 finished postRun.");
        }
        setThreadState(RunState.ENDED);
    }

    private void invokePrePost(Method m) throws InterruptedIOException {
        try {
            m.invoke(driver);
        } catch (InvocationTargetException e) {
            Throwable cause = e.getCause();
            if (cause == null) {
        cause = e;
      }
            checkFatal(cause, m);
            logger.log(Level.WARNING, name + "." + m.getName() + ": " +
                    cause.getMessage(), cause);
            if (cause instanceof InterruptedIOException) {
        throw (InterruptedIOException) cause;
      }
        } catch (IllegalAccessException e) {
            logger.log(Level.SEVERE, name + "." + m.getName() + ": " +
                    e.getMessage(), e);
        }

    }

    /**
     * Obtains the invoke time of the next operation.
     *
     * @param op The operation
     * @param mixId The mix
     * @return The targeted invoke time.
     */
    long getInvokeTime(BenchmarkDefinition.Operation op, int mixId) {
        Cycle cycle;
        if (op == null) {
      // No op, this is the initial cycle, use initialDelay
            cycle = runInfo.driverConfig.initialDelay[mixId];
    } else {
      // Set the start time based on the operation selected
            cycle = op.cycle;
    }

        long invokeTime = TIME_NOT_SET;
        delayTime[mixId] = cycle.getDelay(random);

        switch (cycle.cycleType) {
            case CYCLETIME :
                invokeTime = startTime[mixId] + delayTime[mixId];
                break;
            case THINKTIME :
                invokeTime = endTime[mixId] + delayTime[mixId];
                break;
        }
        return invokeTime;
    }

    /**
     * This method blocks until the start time is set by the master.
     * Called by AgentThread implementations.
     */
    void waitStartTime() {
        try {
            agent.timeSetLatch.await();
            startTimeSet = true;
            long delay = agent.startTime - System.nanoTime();
            if (delay <= 0) {
                logger.severe(name + ": Start time is set " + (-delay) +
                        " nanosecs too late. Please file a bug.");
                agent.abortRun();
            } else {
                timer.wakeupAt(agent.startTime);
            }
        } catch (InterruptedException e) { // Run is killed.
            throw new FatalException(e);
        }
    }

    /**
     * Validates whether the times for a successful operation are properly
     * captured. Called by AgentThread implementations.
     * @param op The operation.
     */
    void validateTimeCompletion(BenchmarkDefinition.Operation op) {
        DriverContext.TimingInfo timingInfo = driverContext.timingInfo;

        // If there is no error, we still check for the
        // unusual case of time not recorded and issue the
        // proper message.
        if (timingInfo.invokeTime == TIME_NOT_SET) {
            String msg = null;
            if (driverConfig.operations[
                    currentOperation].timing == Timing.AUTO) {
                msg = name + '.' + op.m.getName() +
                        ": Transport not called! " +
                        "Please ensure transport instantiation " +
                        "before making any remote calls!";
            } else {
                msg = name + '.' + op.m.getName() +
                        ": Cannot determine time! " +
                        "DriverContext.recordTime() not called " +
                        "before critical section in operation.";

            }
            logger.severe(msg);
            agent.abortRun();
            throw new FatalException(msg);
        } else if (timingInfo.respondTime == TIME_NOT_SET &&
                   timingInfo.lastRespondTime == TIME_NOT_SET) {
            String msg = null;
            if (driverConfig.operations[
                    currentOperation].timing == Timing.AUTO) {
                msg = name + '.' + op.m.getName() +
                        ": Transport incomplete! " +
                        "Please ensure transport exception is " +
                        "thrown from operation.";
            } else {
                msg = name + '.' + op.m.getName() +
                        ": Cannot determine end time! " +
                        "DriverContext.recordTime() not called " +
                        "after critical section in operation.";

            }
            logger.severe(msg);
            agent.abortRun();
            throw new FatalException(msg);
        } else if (timingInfo.respondTime == TIME_NOT_SET) {
            timingInfo.respondTime = timingInfo.lastRespondTime;
            logger.fine("Potential open request in operation " +
                    op.m.getName() + ".");
        }
    }

    /**
     * Checks whether the last operation is in the ramp-up or ramp-down or
     * not. Updates the inRamp parameter accordingly.  This is only supposed
     * to be called from a subclass.
     */
    abstract void checkRamp();

    /**
     * Tests whether the last operation is in steady state or not. This is
     * called through the context from the driver from within the operation
     * so we need to be careful not to change run control parameters. This
     * method only reads the stats.
     * @return True if the last operation is in steady state, false otherwise.
     */
    abstract boolean isSteadyState();

    /**
     * Tests whether the time between start and end is in steady state or not.
     * For non time-based steady state, this will depend on the current cycle
     * count. Otherwise time is used.
     * @param start The start of a time span
     * @param end The end of a time span
     * @return true if this time span is in steady state, false otherwise.
     */
    abstract boolean isSteadyState(long start, long end);

    /**
     * Return results of this thread.
     * @return Final stats
     */
    public Metrics getResult() {
        metrics.wrap();
        return metrics;
    }

    /**
     * Triggers stopping and exiting of this thread.
     */
    public void stopExecution() {
        stopped = true;
        interrupt();
    }
}
TOP

Related Classes of com.sun.faban.driver.engine.AgentThread

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.