Package org.masukomi.aspirin.core.delivery

Source Code of org.masukomi.aspirin.core.delivery.DeliveryManager

package org.masukomi.aspirin.core.delivery;

import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.NoSuchElementException;

import javax.mail.MessagingException;
import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;

import org.apache.commons.pool.ObjectPool;
import org.apache.commons.pool.impl.GenericObjectPool;
import org.masukomi.aspirin.core.AspirinInternal;
import org.masukomi.aspirin.core.config.Configuration;
import org.masukomi.aspirin.core.config.ConfigurationChangeListener;
import org.masukomi.aspirin.core.config.ConfigurationMBean;
import org.masukomi.aspirin.core.dns.ResolveHost;
import org.masukomi.aspirin.core.store.mail.MailStore;
import org.masukomi.aspirin.core.store.queue.DeliveryState;
import org.masukomi.aspirin.core.store.queue.QueueInfo;
import org.masukomi.aspirin.core.store.queue.QueueStore;

/**
* This class is the manager of delivery. It is instantiated by Aspirin class.
*
* @author Laszlo Solova
*
*/
public final class DeliveryManager extends Thread implements ConfigurationChangeListener {
  private MailStore mailStore;
  private QueueStore queueStore;
  private DeliveryMaintenanceThread maintenanceThread;
  private Object mailingLock = new Object();
  private ObjectPool deliveryThreadObjectPool = null;
  private boolean running = false;
  private GenericPoolableDeliveryThreadFactory deliveryThreadObjectFactory = null;
  private Map<String, DeliveryHandler> deliveryHandlers = new HashMap<String, DeliveryHandler>();
 
  public DeliveryManager() {
    // Set up default objects.
    this.setName("Aspirin-"+getClass().getSimpleName()+"-"+getId());
   
    // Configure pool of DeliveryThread threads
    GenericObjectPool.Config gopConf = new GenericObjectPool.Config();
    gopConf.lifo = false;
    gopConf.maxActive = AspirinInternal.getConfiguration().getDeliveryThreadsActiveMax();
    gopConf.maxIdle = AspirinInternal.getConfiguration().getDeliveryThreadsIdleMax();
    gopConf.maxWait = 5000;
    gopConf.testOnReturn = true;
    gopConf.whenExhaustedAction = GenericObjectPool.WHEN_EXHAUSTED_BLOCK;
   
    // Create DeliveryThread object factory used in pool
    deliveryThreadObjectFactory = new GenericPoolableDeliveryThreadFactory();
   
    // Create pool
    deliveryThreadObjectPool = new GenericObjectPool(deliveryThreadObjectFactory,gopConf);
   
    // Initialize object factory of pool
    deliveryThreadObjectFactory.init(new ThreadGroup("DeliveryThreadGroup"),deliveryThreadObjectPool);
   
    // Set up stores and configuration listener
    queueStore = AspirinInternal.getConfiguration().getQueueStore();
    queueStore.init();
   
    mailStore = AspirinInternal.getConfiguration().getMailStore();
    mailStore.init();
   
    maintenanceThread = new DeliveryMaintenanceThread();
    maintenanceThread.start();
   
    // Set up deliveryhandlers
    // TODO create by configuration
    deliveryHandlers.put(SendMessage.class.getCanonicalName(), new SendMessage());
    deliveryHandlers.put(ResolveHost.class.getCanonicalName(), new ResolveHost());
   
    AspirinInternal.getConfiguration().addListener(this);
  }
 
  public String add(MimeMessage mimeMessage) throws MessagingException {
    String mailid = AspirinInternal.getMailID(mimeMessage);
    long expiry = AspirinInternal.getExpiry(mimeMessage);
    Collection<InternetAddress> recipients = AspirinInternal.extractRecipients(mimeMessage);
    synchronized (mailingLock) {
      mailStore.set(mailid, mimeMessage);
      queueStore.add(mailid, expiry, recipients);
    }
    return mailid;
  }
 
  public MimeMessage get(QueueInfo qi) {
    return mailStore.get(qi.getMailid());
  }
 
  public void remove(String messageName) {
    synchronized (mailingLock) {
      mailStore.remove(messageName);
      queueStore.remove(messageName);
    }
  }
 
  @Override
  public void run() {
    running = true;
    AspirinInternal.getLogger().info("DeliveryManager started.");
    while( running )
    {
      QueueInfo qi = null;
      try {
        qi = queueStore.next();
        if( qi != null )
        {
          MimeMessage message = get(qi);
          if( message == null )
          {
            AspirinInternal.getLogger().warn("No MimeMessage found for qi={}",qi);
            qi.setResultInfo("No MimeMessage found.");
            qi.setState(DeliveryState.FAILED);
            release(qi);
            continue;
          }
          DeliveryContext dCtx = new DeliveryContext()
            .setQueueInfo(qi)
            .setMessage(message);
          AspirinInternal.getLogger().trace("DeliveryManager.run(): Pool state. A{}/I{}",new Object[]{deliveryThreadObjectPool.getNumActive(),deliveryThreadObjectPool.getNumIdle()});
          try
          {
            AspirinInternal.getLogger().debug("DeliveryManager.run(): Start delivery. qi={}",qi);
            DeliveryThread dThread = (DeliveryThread)deliveryThreadObjectPool.borrowObject();
            AspirinInternal.getLogger().trace("DeliveryManager.run(): Borrow DeliveryThread object. dt={}: state '{}/{}'",new Object[]{dThread.getName(), dThread.getState().name(), dThread.isAlive()});
            dThread.setContext(dCtx);
            /*
             * On first borrow the DeliveryThread is created and
             * initialized, but not started, because the first
             * time we have to set up the QueItem to deliver.
             */
            if( !dThread.isAlive() )
              dThread.start();
          } catch ( IllegalStateException ise )
          {
            /*
             * This could be happen, if thread is running, but
             * ObjectPool is already closed. It is a normal process
             * of Aspirin sending thread shutdown.
             */
            release(qi);
          } catch ( NoSuchElementException nsee )
          {
            /*
             * This happens if there is a lot of mail to send, and
             * no idle DeliveryThread is available.
             */
            AspirinInternal.getLogger().debug("DeliveryManager.run(): No idle DeliveryThread is available: {}",nsee.getMessage());
            release(qi);
          } catch ( Exception e )
          {
            AspirinInternal.getLogger().error("DeliveryManager.run(): Failed borrow delivery thread object.",e);
            release(qi);
          }
        }
        else
        {
          if( AspirinInternal.getLogger().isTraceEnabled() && 0 < queueStore.size() )
            AspirinInternal.getLogger().trace("DeliveryManager.run(): There is no sendable item in the queue. Fallback to waiting state for a minute.");
          synchronized (this) {
            try
            {
              /*
               * We should wait for a specified time, because
               * some emails unsent could be sendable again.
               */
              wait(60000);
            }catch (InterruptedException e)
            {
              running = false;
            }
          }
        }
       
      } catch (Throwable t) {
        if( qi != null )
          release(qi);
      }
     
    }
    AspirinInternal.getLogger().info("DeliveryManager terminated.");
  }
 
  public boolean isRunning() {
    return running;
  }
 
  public void terminate() {
    running = false;
  }
 
  public void release(QueueInfo qi) {
    if( qi.hasState(DeliveryState.IN_PROGRESS) )
    {
      if( qi.isInTimeBounds() )
      {
        qi.setState(DeliveryState.QUEUED);
        AspirinInternal.getLogger().trace("DeliveryManager.release(): Releasing: QUEUED. qi={}",qi);
      }
      else
      {
        qi.setState(DeliveryState.FAILED);
        AspirinInternal.getLogger().trace("DeliveryManager.release(): Releasing: FAILED. qi={}",qi);
      }
    }
    queueStore.setSendingResult(qi);
    if( queueStore.isCompleted(qi.getMailid()) )
      queueStore.remove(qi.getMailid());
    AspirinInternal.getLogger().trace("DeliveryManager.release(): Release item '{}' with state: '{}' after {} attempts.",new Object[]{qi.getMailid(),qi.getState().name(), qi.getAttemptCount()});
  }
 
  public boolean isCompleted(QueueInfo qi) {
    return queueStore.isCompleted(qi.getMailid());
  }
 
  @Override
  public void configChanged(String parameterName) {
    synchronized (mailingLock) {
      if( parameterName.equals(Configuration.PARAM_MAILSTORE_CLASS) )
        mailStore = AspirinInternal.getConfiguration().getMailStore();
      else
      if( parameterName.equals(Configuration.PARAM_QUEUESTORE_CLASS) )
        queueStore = AspirinInternal.getConfiguration().getQueueStore();
      if( parameterName.equals(ConfigurationMBean.PARAM_DELIVERY_THREADS_ACTIVE_MAX) )
        ((GenericObjectPool)deliveryThreadObjectPool).setMaxActive(AspirinInternal.getConfiguration().getDeliveryThreadsActiveMax());
      else
      if( parameterName.equals(ConfigurationMBean.PARAM_DELIVERY_THREADS_IDLE_MAX) )
        ((GenericObjectPool)deliveryThreadObjectPool).setMaxIdle(AspirinInternal.getConfiguration().getDeliveryThreadsIdleMax());
    }
  }
 
  public DeliveryHandler getDeliveryHandler(String handlerName) {
    return deliveryHandlers.get(handlerName);
  }
 
  public void shutdown() {
    this.running = false;
    try {
      deliveryThreadObjectPool.close();
      deliveryThreadObjectPool.clear();
    } catch (Exception e) {
      AspirinInternal.getLogger().error("DeliveryManager.shutdown() failed.",e);
    }
    maintenanceThread.shutdown();
  }

}
TOP

Related Classes of org.masukomi.aspirin.core.delivery.DeliveryManager

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.