Package org.apache.james.transport

Source Code of org.apache.james.transport.LinearProcessor

/*
* Copyright (C) The Apache Software Foundation. All rights reserved.
*
* This software is published under the terms of the Apache Software License
* version 1.1, a copy of which has been included with this distribution in
* the LICENSE file.
*/
package org.apache.james.transport;

import org.apache.avalon.framework.activity.Initializable;
import org.apache.avalon.framework.activity.Disposable;
import org.apache.avalon.framework.logger.AbstractLogEnabled;
import org.apache.avalon.framework.logger.Logger;
import org.apache.james.core.MailImpl;
import org.apache.james.services.SpoolRepository;
import org.apache.mailet.*;

import javax.mail.MessagingException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.Vector;
import java.util.Iterator;

/**
* @author Serge Knystautas <sergek@lokitech.com>
* @author Federico Barbieri <scoobie@systemy.it>
*
*  SAMPLE CONFIGURATION
<processor name="try" onerror="return,log">
*      <mailet match="RecipientIsLocal" class="LocalDelivery">
*      </mailet>
*      <mailet match="All" class="RemoteDelivery">
*          <delayTime>21600000</delayTime>
*          <maxRetries>5</maxRetries>
*      </mailet>
</processor>
*
* Note that the 'onerror' attribute is not yet supported.
*/
public class LinearProcessor
    extends AbstractLogEnabled
    implements Initializable, Disposable {

    private List mailets;
    private List matchers;
    private List[] unprocessed;
    private Collection tempUnprocessed;
    private Random random;
    private Logger logger;
    private SpoolRepository spool;

    public void setSpool(SpoolRepository spool) {
        this.spool = spool;
    }


    public void initialize() {
        this.matchers = new Vector();
        this.mailets = new Vector();
        tempUnprocessed = new Vector();
        tempUnprocessed.add(new Vector(2, 2));
        random = new Random();
    }

    // Shutdown mailets
    public void dispose() {
        Iterator it = mailets.iterator();
        while (it.hasNext()) {
            Mailet mailet = (Mailet)it.next();
            getLogger().debug("Shutdown mailet " + mailet.getMailetInfo());
            mailet.destroy();
        }
    }

    public void add(Matcher matcher, Mailet mailet) {
        matchers.add(matcher);
        mailets.add(mailet);
        //Make the collections array one larger
        tempUnprocessed.add(new Vector(2, 2));
    }


    public synchronized void service(MailImpl mail) throws MessagingException {
        getLogger().debug("Servicing mail: " + mail.getName());
        //unprocessed is an array of Lists of Mail objects
        //  the array indicates which matcher/mailet (stage in the linear
        //  processor) that this Mail needs to be processed.
        //  e.g., a Mail in unprocessed[0] needs to be
        //  processed by the first matcher/mailet.
        //
        //It is a List of Mail objects at each array spot as multiple Mail
        //  objects could be at the same stage.

        //make sure we have the array built
        if (unprocessed == null) {
            //Need to construct that object
            unprocessed = (List[])tempUnprocessed.toArray(new List[0]);
            tempUnprocessed = null;
        }
        //Wipe all the data (just to be sure)
        for (int i = 0; i < unprocessed.length; i++) {
            unprocessed[i].clear();
        }

        //Add the object to the bottom of the list
        unprocessed[0].add(mail);

        //This is the original state of the message
        String originalState = mail.getState();

        //We'll use these as temporary variables in the loop
        mail = null// the message we're currently processing
        int i = 0;    // where in the stage we're looking
        while (true) {
            //The last element in the unprocessed array has mail messages
            //  that have completed all stages.  We want them to just die,
            //  so we clear that spot.
            unprocessed[unprocessed.length - 1].clear();

            //initialize the mail reference we will be searching on
            mail = null;

            //Scan through all stages, trying to find a message to process
            for (i = 0; i < unprocessed.length; i++) {
                if (unprocessed[i].size() > 0) {
                    //Get the first element from the queue, and remove it from there
                    mail = (MailImpl)unprocessed[i].get(0);
                    unprocessed[i].remove(mail);
                    break;
                }
            }

            //Check it we found anything
            if (mail == null) {
                //We found no messages to process... we're done servicing the mail object
                return;
            }


            //Call the matcher and find what recipients match
            Collection recipients = null;
            Matcher matcher = (Matcher) matchers.get(i);
            getLogger().debug("Checking " + mail.getName() + " with " + matcher);
            try {
                recipients = matcher.match(mail);
                if (recipients == null) {
                    //In case the matcher returned null, create an empty Vector
                    recipients = new Vector();
                }
                //Make sure all the objects are MailAddress objects
                verifyMailAddresses(recipients);
            } catch (MessagingException me) {
                handleException(me, mail, matcher.getMatcherConfig().getMatcherName());
            }
            //Split the recipients into two pools.  notRecipients will contain the
            //  recipients on the message that the matcher did not return.
            Collection notRecipients = new Vector();
            notRecipients.addAll(mail.getRecipients());
            notRecipients.removeAll(recipients);

            if (recipients.size() == 0) {
                //Everything was not a match... store it in the next spot in the array
                unprocessed[i + 1].add(mail);
                continue;
            }
            if (notRecipients.size() != 0) {
                //There are a mix of recipients and not recipients.
                //We need to clone this message, put the notRecipients on the clone
                //  and store it in the next spot
                MailImpl notMail = (MailImpl)mail.duplicate(newName(mail));
                notMail.setRecipients(notRecipients);
                unprocessed[i + 1].add(notMail);
                //We have to set the reduce possible recipients on the old message
                mail.setRecipients(recipients);
            }
            //We have messages that need to process... time to run the mailet.
            Mailet mailet = (Mailet) mailets.get(i);
            getLogger().debug("Servicing " + mail.getName() + " by " + mailet.getMailetInfo());
            try {
                mailet.service(mail);
                //Make sure all the recipients are still MailAddress objects
                verifyMailAddresses(mail.getRecipients());
            } catch (MessagingException me) {
                handleException(me, mail, mailet.getMailetConfig().getMailetName());
            }

            //See if the state was changed by the mailet
            if (!mail.getState().equals(originalState)) {
                //If this message was ghosted, we just want to let it die
                if (mail.getState().equals(mail.GHOST)) {
                    //let this instance die...
                    mail = null;
                    continue;
                }
                //This was just set to another state... store this back in the spool
                //  and it will get picked up and run in that processor

                //Note we need to store this with a new mail name, otherwise it
                //  will get deleted upon leaving this processor
                mail.setName(newName(mail));
                spool.store(mail);
                mail = null;
                continue;
            } else {
                //Ok, we made it through with the same state... move it to the next
                //  spot in the array
                unprocessed[i + 1].add(mail);
            }

        }
    }

    /**
     * Create a unique new primary key name
     */
    private String newName(MailImpl mail) {
        String name = mail.getName();
        return name + "-!" + Math.abs(random.nextInt());
    }



    /**
     * Checks that all objects in this class are of the form MailAddress
     */
    private void verifyMailAddresses(Collection col) throws MessagingException {
        MailAddress addresses[] = (MailAddress[])col.toArray(new MailAddress[0]);
        if (addresses.length != col.size()) {
            throw new MailetException("The recipient list contains objects other than MailAddress objects");
        }
    }

    private void handleException(MessagingException me, Mail mail, String offendersName) throws MessagingException {
        System.err.println("exception! " + me);
        mail.setState(Mail.ERROR);
        StringWriter sout = new StringWriter();
        PrintWriter out = new PrintWriter(sout, true);
        out.println("Exception calling " + offendersName + ": " + me.getMessage());
        Exception e = me;
        while (e != null) {
            e.printStackTrace(out);
            if (e instanceof MessagingException) {
                e = ((MessagingException)e).getNextException();
            } else {
                e = null;
            }
        }
        mail.setErrorMessage(sout.toString());
        getLogger().error(sout.toString());
        throw me;
    }
}
TOP

Related Classes of org.apache.james.transport.LinearProcessor

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.