Package org.apache.james.transport

Source Code of org.apache.james.transport.JamesSpoolManager

/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one   *
* or more contributor license agreements.  See the NOTICE file *
* distributed with this work for additional information        *
* regarding copyright ownership.  The ASF licenses this file   *
* to you under the Apache License, Version 2.0 (the            *
* "License"); you may not use this file except in compliance   *
* with the License.  You may obtain a copy of the License at   *
*                                                              *
*   http://www.apache.org/licenses/LICENSE-2.0                 *
*                                                              *
* Unless required by applicable law or agreed to in writing,   *
* software distributed under the License is distributed on an  *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
* KIND, either express or implied.  See the License for the    *
* specific language governing permissions and limitations      *
* under the License.                                           *
****************************************************************/



package org.apache.james.transport;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.annotation.Resource;

import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.HierarchicalConfiguration;
import org.apache.commons.logging.Log;
import org.apache.james.lifecycle.Configurable;
import org.apache.james.lifecycle.LifecycleUtil;
import org.apache.james.lifecycle.LogEnabled;
import org.apache.james.mailetcontainer.api.MailProcessor;
import org.apache.james.mailetcontainer.api.MailProcessorList;
import org.apache.james.mailetcontainer.api.MailetContainer;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.MailQueueFactory;
import org.apache.james.queue.api.MailQueue.MailQueueItem;
import org.apache.james.services.SpoolManager;
import org.apache.mailet.Mail;
import org.apache.mailet.Mailet;
import org.apache.mailet.Matcher;

/**
* Manages the mail spool.  This class is responsible for retrieving
* messages from the spool, directing messages to the appropriate
* processor, and removing them from the spool when processing is
* complete.
*
* @version CVS $Revision$ $Date$
*
* TODO: We should better use a ExecutorService here and only spawn a new Thread if needed
*/
public class JamesSpoolManager implements Runnable, SpoolManager, Configurable, LogEnabled {

   
    private MailQueue queue;

    /**
     * The number of threads used to move mail through the spool.
     */
    private int numThreads;

 
    /**
     * Number of active threads
     */
    private AtomicInteger numActive = new AtomicInteger(0);;

    /**
     * Spool threads are active
     */
    private AtomicBoolean active = new AtomicBoolean(false);

    /**
     * Spool threads
     */
    private Collection<Thread> spoolThreads;

    /**
     * The mail processor
     */
    private MailProcessorList mailProcessor;

    private Log logger;

    private MailQueueFactory queueFactory;


    @Resource(name="mailQueueFactory")
    public void setMailQueueFactory(MailQueueFactory queueFactory) {
        this.queueFactory = queueFactory;
    }

    @Resource(name="mailProcessor")
    public void setMailProcessorList(MailProcessorList mailProcessor) {
        this.mailProcessor = mailProcessor;
    }
   
    /*
     * (non-Javadoc)
     * @see org.apache.james.lifecycle.Configurable#configure(org.apache.commons.configuration.HierarchicalConfiguration)
     */
    public void configure(HierarchicalConfiguration config) throws ConfigurationException {
        numThreads = config.getInt("threads",100);
    }

    /**
     * Initialises the spool manager.
     */
    @PostConstruct
    public void init() throws Exception {
        logger.info("JamesSpoolManager init...");
       
        queue = queueFactory.getQueue(MailQueueFactory.SPOOL);

        if (logger.isInfoEnabled()) {
            StringBuffer infoBuffer =
                new StringBuffer(64)
                    .append("Spooler Manager uses ")
                    .append(numThreads)
                    .append(" Thread(s)");
            logger.info(infoBuffer.toString());
        }

        active.set(true);
        spoolThreads = new java.util.ArrayList<Thread>(numThreads);
        for ( int i = 0 ; i < numThreads ; i++ ) {
            Thread reader = new Thread(this, "Spool Thread #" + i);
            spoolThreads.add(reader);
            reader.start();
        }
    }

    /**
     * This routinely checks the message spool for messages, and processes
     * them as necessary
     */
    public void run() {

        if (logger.isInfoEnabled()) {
            logger.info("Run JamesSpoolManager: "
                             + Thread.currentThread().getName());
            logger.info("Spool=" + queue.getClass().getName());
        }

        while(active.get()) {
            numActive.incrementAndGet();

            try {
                MailQueueItem queueItem = queue.deQueue();
                Mail mail = queueItem.getMail();
                if (logger.isDebugEnabled()) {
                    StringBuffer debugBuffer =
                        new StringBuffer(64)
                                .append("==== Begin processing mail ")
                                .append(mail.getName())
                                .append("====");
                    logger.debug(debugBuffer.toString());
                }

                try {
                    mailProcessor.service(mail);
                    queueItem.done(true);
                } catch (Exception e) {
                    if (active.get() && logger.isErrorEnabled()) {
                        logger.error("Exception processing mail in JamesSpoolManager.run " + e.getMessage(), e);
                    }
                    queueItem.done(false);

                } finally {
                    LifecycleUtil.dispose(mail);
                    mail = null;
                }
              
            } catch (Throwable e) {
                if (active.get() && logger.isErrorEnabled()) {
                    logger.error("Exception processing mail in JamesSpoolManager.run "
                                      + e.getMessage(), e);
                }
            } finally {
                numActive.decrementAndGet();
            }

        }
        if (logger.isInfoEnabled()){
            logger.info("Stop JamesSpoolManager: " + Thread.currentThread().getName());
        }
    }

    /**
     * The dispose operation is called at the end of a components lifecycle.
     * Instances of this class use this method to release and destroy any
     * resources that they own.
     *
     * This implementation shuts down the LinearProcessors managed by this
     * JamesSpoolManager
     *
     * @see org.apache.avalon.framework.activity.Disposable#dispose()
     */
    @PreDestroy
    public void dispose() {
        logger.info("JamesSpoolManager dispose...");
        active.set(false); // shutdown the threads
        for (Thread thread: spoolThreads) {
            thread.interrupt(); // interrupt any waiting accept() calls.
        }

        long stop = System.currentTimeMillis() + 60000;
        // give the spooler threads one minute to terminate gracefully     
        while (numActive.get() != 0 && stop > System.currentTimeMillis()) {
            try {
                Thread.sleep(1000);
            } catch (Exception ignored) {}
        }
       
        logger.info("JamesSpoolManager thread shutdown completed.");
    }

    /**
     * @see org.apache.james.services.SpoolManager#getProcessorNames()
     */
    public String[] getProcessorNames() {
        return mailProcessor.getProcessorNames();
    }

    /*
     * (non-Javadoc)
     * @see org.apache.james.services.SpoolManager#getMailets(java.lang.String)
     */
    public List<Mailet> getMailets(String processorName) {
        MailetContainer mailetContainer = getMailetContainerByName(processorName);
        if (mailetContainer == null) return new ArrayList<Mailet>();
        return mailetContainer.getMailets();
    }

    /*
     * (non-Javadoc)
     * @see org.apache.james.services.SpoolManager#getMatchers(java.lang.String)
     */
    public List<Matcher> getMatchers(String processorName) {
        MailetContainer mailetContainer = getMailetContainerByName(processorName);
        if (mailetContainer == null) return new ArrayList<Matcher>();
        return mailetContainer.getMatchers();
    }

    private MailetContainer getMailetContainerByName(String processorName) {       
        MailProcessor processor = mailProcessor.getProcessor(processorName);
        if (!(processor instanceof MailetContainer)) return null;
        // TODO: decide, if we have to visit all sub-processors for being ProcessorLists
        // on their very own and collecting the processor names deeply.
        return (MailetContainer)processor;
    }



    /*
     * (non-Javadoc)
     * @see org.apache.james.lifecycle.LogEnabled#setLog(org.apache.commons.logging.Log)
     */
    public void setLog(Log log) {
        this.logger = log;
    }
}
TOP

Related Classes of org.apache.james.transport.JamesSpoolManager

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.