Package fr.dyade.aaa.agent

Source Code of fr.dyade.aaa.agent.MessageVector

/*
* Copyright (C) 2004 - 2010 ScalAgent Distributed Technologies
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
* USA.
*
* Initial developer(s): ScalAgent Distributed Technologies
* Contributor(s):
*/
package fr.dyade.aaa.agent;

import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;

import fr.dyade.aaa.common.Debug;
import fr.dyade.aaa.common.EmptyQueueException;

/**
* Class <code>MessageVector</code> represents a persistent vector of
* <tt>Message</tt> (source and target agent identifier, notification).
* As messages  have a relatively short life span, then the messages
* are kept in main memory. If possible, the list is backed by a persistent
* image on the disk for reliability needs. In this case, we can use
* <tt>SoftReference</tt> to avoid memory overflow.<p><hr>
* The stamp information in Message is used to restore the queue from
* persistent storage at initialization time, so there is no longer need
* to save <code>MessageVector</code> object state.
*/
final class MessageVector implements MessageQueue {
  private Logger logmon = null;
  private String logmsg = null;
  private long cpt1, cpt2;

  /**
   * The array buffer into which the <code>Message</code> objects are stored
   * in memory. The capacity of this array buffer is at least large enough to
   * contain all the messages of the <code>MessageVector</code>.<p>
   * Messages are stored in a circular way, first one in <tt>data[first]</tt>
   * through <tt>data[(first+count-1)%length]</tt>. Any other array elements
   * are null.
   */
  private Object data[];
  /** The index of the first message in the circular buffer. */
  private int first;
  /**
   * The number of messages in this <tt>MessageVector</tt> object. Components
   * <tt>data[first]</tt> through <tt>data[(first+count-1)%length]</tt> are the
   * actual items.
   */
  private int count;
  /** The number of validated message in this <tt>MessageQueue</tt>. */
  private int validated;

  private boolean persistent;

  MessageVector(String name, boolean persistent) {
    logmon = Debug.getLogger(getClass().getName() + '.' + name);
    logmsg = name + ".MessageVector: ";

    this.persistent = persistent;
    data = new Object[50];
    first = 0;
    count = 0;
    validated = 0;
  }

  /**
   * Insert a message in the queue, it should only be used during
   * initialization for restoring the queue state.
   *
   * @param  item   the message to be pushed onto this queue.
   */
  public synchronized void insert(Message item) {
    if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))
      logmon.log(BasicLevel.DEBUG, logmsg + "insert(" + item + ")");

    int i = 0;
    for (; i<validated; i++) {
      Message msg = getMessageAt(i);
      if (item.getStamp() < msg.getStamp()) break;
    }
    insertMessageAt(item, i);
    validated += 1;
  }

  /**
   * Pushes a message onto the bottom of this queue. It should only
   * be used during a transaction. The item will be really available
   * after the transaction commit and the queue validate.
   *
   * @param   item   the message to be pushed onto this queue.
   */
  public synchronized void push(Message item) {
    if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))
      logmon.log(BasicLevel.DEBUG, logmsg + "push(" + item + ")");
    insertMessageAt(item, count);
  }

  /**
   * Removes the message at the top of this queue.
   * It must only be used during a transaction.
   *
   * @return     The message at the top of this queue.
   * @exception  EmptyQueueException if this queue is empty.
   */
  public synchronized Message pop() throws EmptyQueueException {
    if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))
      logmon.log(BasicLevel.DEBUG, logmsg + "pop()");

    if (validated == 0)
      throw new EmptyQueueException();
   
    Message item = getMessageAt(0);
    removeMessageAt(0);
    validated -= 1;

    return item;
  }

  /**
   * Atomically validates all messages pushed in queue during a reaction.
   * It must only be used during a transaction.
   */
  public synchronized void validate() {
    if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))
      logmon.log(BasicLevel.DEBUG, logmsg + "validate()");
    validated = size();
    notify();
  }

  /**
   * Looks at the message at the top of this queue without removing
   * it from the queue.
   * It should never be used during a transaction to avoid dead-lock
   * problems.
   *
   * @return    the message at the top of this queue.
   * @exception  InterruptedException if another thread has interrupted the
   *    current thread.
   */
  public synchronized Message get() throws InterruptedException {
    if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) {
      logmon.log(BasicLevel.DEBUG, logmsg + "get()");

      cpt1 += 1; cpt2 += validated;
      if ((cpt1 & 0xFFFFL) == 0L) {
          logmon.log(BasicLevel.DEBUG, logmsg + (cpt2/cpt1) + '/' + validated);
      }
    }
   
    while (validated == 0) {
      wait();
    }
    Message item = getMessageAt(0);
    if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))
      logmon.log(BasicLevel.DEBUG, logmsg + "get() -> " + item);

   return item;
  }

  /**
   * Looks at the message at the top of this queue without removing
   * it from the queue. It should never be used during a transaction
   * to avoid dead-lock problems. It waits until a message is available
   * or the specified amount of time has elapsed.
   *
   * @param  timeout  the maximum time to wait in milliseconds.
   * @return      the message at the top of this queue.
   * @exception  InterruptedException if another thread has interrupted the
   *    current thread.
   * @exception  IllegalArgumentException if the value of timeout is negative.
   */
  public synchronized Message get(long timeout) throws InterruptedException {
    if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) {
      logmon.log(BasicLevel.DEBUG, logmsg + "get(" + timeout + ")");

      cpt1 += 1; cpt2 += validated;
      if ((cpt1 & 0xFFFFL) == 0L) {
        logmon.log(BasicLevel.DEBUG, logmsg + (cpt2/cpt1) + '/' + validated);
      }
    }
   
    Message item = null;
    if ((validated == 0) && (timeout > 0)) wait(timeout);
    if (validated > 0) item = getMessageAt(0);

    if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))
      logmon.log(BasicLevel.DEBUG, logmsg + "get() -> " + item);

    return item;
  }

  /**
   * Looks at the first message of this queue where the destination server
   * is the specified one.
   * The message is not removed from the queue. It should never be used during
   * a transaction to avoid dead-lock problems.
   *
   * @param  to  the unique server id.
   * @return      the corresponding message or null if none .
   */
  public synchronized Message getMessageTo(short to) {
    if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG)) {
      logmon.log(BasicLevel.DEBUG, logmsg + "getFrom(" + to + ")");

      cpt1 += 1; cpt2 += validated;
      if ((cpt1 & 0xFFFFL) == 0L) {
        logmon.log(BasicLevel.DEBUG, logmsg + (cpt2/cpt1) + '/' + validated);
      }
    }
   
    Message item = null;
    for (int i=0; i<validated; i++) {
      Message msg = getMessageAt(i);
      if (msg.getDest() == to) {
        item = msg;
        break;
      }
    }

    if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))
      logmon.log(BasicLevel.DEBUG, logmsg + "get() -> " + item);

    return item;
  }

  /**
   * Removes the specified message from the queue if exists.
   *
   * @param  msg  the message to remove.
   */
  synchronized void removeMessage(Message msg) {
    if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))
      logmon.log(BasicLevel.DEBUG,
                 logmsg + "removeMessage #" + msg.getStamp());

    for (int i = 0; i<validated; i++) {
      if (getMessageAt(i) ==  msg) {

        if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))
          logmon.log(BasicLevel.DEBUG,
                     logmsg + "removeMessage #" + msg.getStamp() + " -> " + i);

        removeMessageAt(i);
        validated -= 1;
        return;
      }
    }

    logmon.log(BasicLevel.ERROR,
               logmsg + "removeMessage #" + msg.getStamp() + " not found");

    return;
  }

  /**
   *  Removes all messages with a stamp less than the specified one.
   * Be careful with the use of this method, in particular it does not
   * take in account the multiples incoming nodes.
   */
  synchronized int remove(int stamp) {
    if (validated == 0) return 0;
   
    if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))
      logmon.log(BasicLevel.DEBUG, logmsg + "remove #" + stamp);

    int i = 0;
    for (; i<validated; i++) {
      Message msg = getMessageAt(i);
      if (stamp < msg.getStamp()) break;
    }

    for (int j=0; j<i; j++) {
      removeMessageAt(0);
    }
    validated -= i;
   
    if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))
      logmon.log(BasicLevel.DEBUG, logmsg + "remove #" + stamp + " ->" +i);

    return i;
  }

  /**
   *  Removes the first messages with a timestamp less than the specified one.
   * Be careful with the use of this method, in particular it does not take in
   * account the multiples incoming nodes.
   */
  synchronized Message removeExpired(long currentTimeMillis) {
    if (validated == 0) return null;
   
    if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))
      logmon.log(BasicLevel.DEBUG,
                 logmsg + "removeExpired - " + currentTimeMillis);

    for (int i = 0; i<validated; i++) {
      Message msg = getMessageAt(i);
      if ((msg.not != null) &&
          (msg.not.expiration > 0) &&
          (currentTimeMillis >= msg.not.expiration)) {
        removeMessageAt(i);
        validated -= 1;
   
        if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))
          logmon.log(BasicLevel.DEBUG, logmsg + "remove #" + msg.getStamp());

        return msg;
      }
    }

    return null;
  }

  /**
   * Inserts the specified message to this <code>MessageVector</code> at
   * the specified index. Each component in this vector with an index greater
   * or equal to the specified index is shifted upward.
   *
   * @param item  the message to be pushed onto this queue.
   * @param index  where to insert the new message.
   */
  private void insertMessageAt(Message item, int index) {
    if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))
      logmon.log(BasicLevel.DEBUG,
                 logmsg + "insertMessageAt(" + item + ", " + index + ")");

    if (count == data.length) {
      Object newData[] = new Object[data.length *2];
      if ((first + count) < data.length) {
        System.arraycopy(data, first, newData, 0, count);
      } else {
        int j = data.length - first;
        System.arraycopy(data, first, newData, 0, j);
        System.arraycopy(data, 0, newData, j, count - j);
      }
      first = 0;
      data = newData;
    }
    if (index != count)
      System.arraycopy(data, index, data, index + 1, count - index);
    if (persistent)
      data[(first + index)%data.length] = new MessageSoftRef(item);
    else
      data[(first + index)%data.length] = item;
    count += 1;

    if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))
      logmon.log(BasicLevel.DEBUG,
                 logmsg + "insertMessageAt() -> " + this);
  }

  /**
   * Returns the message at the specified index.
   *
   * @param index  the index of the message.
   * @return       The message at the top of this queue.
   */
  private Message getMessageAt(int index) {
    if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))
      logmon.log(BasicLevel.DEBUG, logmsg + "getMessageAt(" + index + ")");

    int idx = (first + index)%data.length;
    if (persistent) {
      Message msg = ((MessageSoftRef) data[idx]).getMessage();
      if (msg == null) {
        msg = ((MessageSoftRef) data[idx]).loadMessage();
      }
      return msg;
    }
   
    return (Message) data[idx];
  }

  /**
   * Deletes the message at the specified index.
   *
   * @param index  the index of the message to remove.
   */
  private void removeMessageAt(int index) {
    if (index == 0) {
      // It is the first element, just move the start of the list.
      data[first] = null; /* let gc do its work */
      first = (first +1)%data.length;
    } else if (index == (count -1)) {
      // It is the last element, just move the end of the list.
      data[(first + index) %data.length] = null; /* let gc do its work */
    } else if ((first + index) < data.length) {
      // Moves the start of the box to the empty 'box'
      System.arraycopy(data, first,
                       data, first +1, index);
      // Erase the old first 'box'
      data[first] = null; /* let gc do its work */
      // Move the first ptr +1, and decrease counter
      first = (first +1)%data.length;
    } else {
      // Moves the end of the vector -1 to the empty 'box'
      System.arraycopy(data, (first + index)%data.length +1,
                       data, (first + index)%data.length, count - index -1);
      // Erase the old last 'box'
      data[(first + count -1)%data.length] = null; /* let gc do its work */
    }

    // Decrease the counter
    count -= 1;
    // If there is no more element, moves the empty list to the beginning of
    // the vector.
    if (count == 0) first = 0;

    if (Debug.debug && logmon.isLoggable(BasicLevel.DEBUG))
      logmon.log(BasicLevel.DEBUG,
                 logmsg + "removeMessageAt(" + index + ") -> " + this);
  }

  /**
   * Returns the number of messages in this vector.
   *
   * @return  the number of messages in this vector.
   */
  public int size() {
    return count;
  }

  /**
   * Returns a string representation of this <code>MessageVector</code>
   * object. Be careful we scan the vector without synchronization, so the
   * result can be incoherent.
   *
   * @return  A string representation of this object.
   */
  public String toString() {
    StringBuffer strbuf = new StringBuffer();
   
    strbuf.append('(').append(super.toString());
    strbuf.append(",first=").append(first);
    strbuf.append(",count=").append(count);
    strbuf.append(",validated=").append(validated).append(",(");
    for (int i=0; i<data.length; i++) {
      strbuf.append(data[i]).append(',');
    }
    strbuf.append("))");
   
    return strbuf.toString();
  }

}
TOP

Related Classes of fr.dyade.aaa.agent.MessageVector

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.