Package org.apache.james.queue.jms

Source Code of org.apache.james.queue.jms.JMSMailQueue

/****************************************************************
* Licensed to the Apache Software Foundation (ASF) under one   *
* or more contributor license agreements.  See the NOTICE file *
* distributed with this work for additional information        *
* regarding copyright ownership.  The ASF licenses this file   *
* to you under the Apache License, Version 2.0 (the            *
* "License"); you may not use this file except in compliance   *
* with the License.  You may obtain a copy of the License at   *
*                                                              *
*   http://www.apache.org/licenses/LICENSE-2.0                 *
*                                                              *
* Unless required by applicable law or agreed to in writing,   *
* software distributed under the License is distributed on an  *
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
* KIND, either express or implied.  See the License for the    *
* specific language governing permissions and limitations      *
* under the License.                                           *
****************************************************************/
package org.apache.james.queue.jms;

import com.google.common.io.Closeables;
import org.apache.james.core.MailImpl;
import org.apache.james.core.MimeMessageCopyOnWriteProxy;
import org.apache.james.queue.api.MailPrioritySupport;
import org.apache.james.queue.api.MailQueue;
import org.apache.james.queue.api.ManageableMailQueue;
import org.apache.mailet.Mail;
import org.apache.mailet.MailAddress;
import org.slf4j.Logger;

import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.mail.MessagingException;
import javax.mail.internet.AddressException;
import javax.mail.internet.MimeMessage;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Date;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.StringTokenizer;
import java.util.concurrent.TimeUnit;

/**
* <p>
* {@link MailQueue} implementation which use a JMS Queue for the<br>
* {@link MailQueue}. This implementation should work with every JMS 1.1.0
* implementation
* </p>
* <p>
* It use {@link ObjectMessage} with a byte array as payload to store the
* {@link Mail} objects.
* </p>
*/
public class JMSMailQueue implements ManageableMailQueue, JMSSupport, MailPrioritySupport {

    protected final String queueName;
    protected final ConnectionFactory connectionFactory;
    protected final Logger logger;
    public final static String FORCE_DELIVERY = "FORCE_DELIVERY";

    public JMSMailQueue(final ConnectionFactory connectionFactory, final String queueName, final Logger logger) {
        this.connectionFactory = connectionFactory;
        this.queueName = queueName;
        this.logger = logger;
    }

    /**
     * <p>
     * Dequeues a mail when it is ready to process. As JMS does not support delay scheduling out-of-the box,
     * we use a messageselector to check if a mail is ready. For this a
     * {@link MessageConsumer#receive(long)} is used with a timeout of 10
     * seconds.
     * </p>
     * <p>
     * Many JMS implementations support better solutions for this, so this
     * should get overridden by these implementations
     * </p>
     */
    @Override
    public MailQueueItem deQueue() throws MailQueueException {
        Connection connection = null;
        Session session = null;
        Message message;
        MessageConsumer consumer = null;

        while (true) {
            try {
                connection = connectionFactory.createConnection();
                connection.start();

                session = connection.createSession(true, Session.SESSION_TRANSACTED);
                Queue queue = session.createQueue(queueName);
                consumer = session.createConsumer(queue, getMessageSelector());

                message = consumer.receive(10000);

                if (message != null) {
                    return createMailQueueItem(connection, session, consumer, message);
                } else {
                    session.commit();

                    if (consumer != null) {

                        try {
                            consumer.close();
                        } catch (JMSException e1) {
                            // ignore on rollback
                        }
                    }
                    try {
                        if (session != null)
                            session.close();
                    } catch (JMSException e1) {
                        // ignore here
                    }

                    try {
                        if (connection != null)
                            connection.close();
                    } catch (JMSException e1) {
                        // ignore here
                    }
                }

            } catch (Exception e) {
                if (session != null) {
                    try {
                        session.rollback();
                    } catch (JMSException e1) {
                        // ignore on rollback
                    }
                }

                if (consumer != null) {

                    try {
                        consumer.close();
                    } catch (JMSException e1) {
                        // ignore on rollback
                    }
                }
                try {
                    if (session != null)
                        session.close();
                } catch (JMSException e1) {
                    // ignore here
                }

                try {
                    if (connection != null)
                        connection.close();
                } catch (JMSException e1) {
                    // ignore here
                }
                throw new MailQueueException("Unable to dequeue next message", e);
            }
        }

    }

    @Override
    public void enQueue(Mail mail, long delay, TimeUnit unit) throws MailQueueException {
        Connection connection = null;
        Session session = null;

        long mydelay = 0;

        if (delay > 0) {
            mydelay = TimeUnit.MILLISECONDS.convert(delay, unit);
        }

        try {

            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

            int msgPrio = NORMAL_PRIORITY;
            Object prio = mail.getAttribute(MAIL_PRIORITY);
            if (prio instanceof Integer) {
                msgPrio = (Integer) prio;
            }

            Map<String, Object> props = getJMSProperties(mail, mydelay);

            produceMail(session, props, msgPrio, mail);

        } catch (Exception e) {
            if (session != null) {
                try {
                    session.rollback();
                } catch (JMSException e1) {
                    // ignore on rollback
                }
            }
            throw new MailQueueException("Unable to enqueue mail " + mail, e);

        } finally {
            try {
                if (session != null)
                    session.close();
            } catch (JMSException e) {
                // ignore here
            }

            try {
                if (connection != null)
                    connection.close();
            } catch (JMSException e) {
                // ignore here
            }
        }
    }

    @Override
    public void enQueue(Mail mail) throws MailQueueException {
        enQueue(mail, NO_DELAY, TimeUnit.MILLISECONDS);
    }

    /**
     * Produce the mail to the JMS Queue
     */
    protected void produceMail(Session session, Map<String, Object> props, int msgPrio, Mail mail) throws JMSException, MessagingException, IOException {
        MessageProducer producer = null;

        try {
            Queue queue = session.createQueue(queueName);

            producer = session.createProducer(queue);
            ObjectMessage message = session.createObjectMessage();

            for (Map.Entry<String, Object> entry : props.entrySet()) {
                message.setObjectProperty(entry.getKey(), entry.getValue());
            }

            long size = mail.getMessageSize();
            ByteArrayOutputStream out;
            if (size > -1) {
                out = new ByteArrayOutputStream((int) size);
            } else {
                out = new ByteArrayOutputStream();
            }
            mail.getMessage().writeTo(out);

            // store the byte array in a ObjectMessage so we can use a
            // SharedByteArrayInputStream later
            // without the need of copy the day
            message.setObject(out.toByteArray());

            producer.send(message, Message.DEFAULT_DELIVERY_MODE, msgPrio, Message.DEFAULT_TIME_TO_LIVE);

        } finally {

            try {
                if (producer != null)
                    producer.close();
            } catch (JMSException e) {
                // ignore here
            }
        }

    }

    /**
     * Get JMS Message properties with values
     *
     * @param mail
     * @param delayInMillis
     * @throws JMSException
     * @throws MessagingException
     */
    @SuppressWarnings("unchecked")
    protected Map<String, Object> getJMSProperties(Mail mail, long delayInMillis) throws MessagingException {
        Map<String, Object> props = new HashMap<String, Object>();
        long nextDelivery = -1;
        if (delayInMillis > 0) {
            nextDelivery = System.currentTimeMillis() + delayInMillis;

        }
        props.put(JAMES_NEXT_DELIVERY, nextDelivery);
        props.put(JAMES_MAIL_ERROR_MESSAGE, mail.getErrorMessage());
        props.put(JAMES_MAIL_LAST_UPDATED, mail.getLastUpdated().getTime());
        props.put(JAMES_MAIL_MESSAGE_SIZE, mail.getMessageSize());
        props.put(JAMES_MAIL_NAME, mail.getName());

        StringBuilder recipientsBuilder = new StringBuilder();

        Iterator<MailAddress> recipients = mail.getRecipients().iterator();
        while (recipients.hasNext()) {
            String recipient = recipients.next().toString();
            recipientsBuilder.append(recipient.trim());
            if (recipients.hasNext()) {
                recipientsBuilder.append(JAMES_MAIL_SEPARATOR);
            }
        }
        props.put(JAMES_MAIL_RECIPIENTS, recipientsBuilder.toString());
        props.put(JAMES_MAIL_REMOTEADDR, mail.getRemoteAddr());
        props.put(JAMES_MAIL_REMOTEHOST, mail.getRemoteHost());

        String sender;
        MailAddress s = mail.getSender();
        if (s == null) {
            sender = "";
        } else {
            sender = mail.getSender().toString();
        }

        StringBuilder attrsBuilder = new StringBuilder();
        Iterator<String> attrs = mail.getAttributeNames();
        while (attrs.hasNext()) {
            String attrName = attrs.next();
            attrsBuilder.append(attrName);

            Object value = convertAttributeValue(mail.getAttribute(attrName));
            props.put(attrName, value);

            if (attrs.hasNext()) {
                attrsBuilder.append(JAMES_MAIL_SEPARATOR);
            }
        }
        props.put(JAMES_MAIL_ATTRIBUTE_NAMES, attrsBuilder.toString());
        props.put(JAMES_MAIL_SENDER, sender);
        props.put(JAMES_MAIL_STATE, mail.getState());
        return props;
    }

    /**
     * Create the complete Mail from the JMS Message. So the created
     * {@link Mail} is completely populated
     *
     * @param message
     * @return the complete mail
     * @throws MessagingException
     * @throws JMSException
     */
    protected final Mail createMail(Message message) throws MessagingException, JMSException {
        MailImpl mail = new MailImpl();
        populateMail(message, mail);
        populateMailMimeMessage(message, mail);

        return mail;
    }

    /**
     * Populat the given {@link Mail} instance with a {@link MimeMessage}. The
     * {@link MimeMessage} is read from the JMS Message. This implementation use
     * a {@link BytesMessage}
     *
     * @param message
     * @param mail
     * @throws MessagingException
     */
    protected void populateMailMimeMessage(Message message, Mail mail) throws MessagingException, JMSException {
        if (message instanceof ObjectMessage) {
            mail.setMessage(new MimeMessageCopyOnWriteProxy(new MimeMessageObjectMessageSource((ObjectMessage) message)));
        } else {
            throw new MailQueueException("Not supported JMS Message received " + message);
        }

    }

    /**
     * Populate Mail with values from Message. This exclude the
     * {@link MimeMessage}
     *
     * @param message
     * @param mail
     * @throws JMSException
     */
    protected void populateMail(Message message, MailImpl mail) throws JMSException {
        mail.setErrorMessage(message.getStringProperty(JAMES_MAIL_ERROR_MESSAGE));
        mail.setLastUpdated(new Date(message.getLongProperty(JAMES_MAIL_LAST_UPDATED)));
        mail.setName(message.getStringProperty(JAMES_MAIL_NAME));

        List<MailAddress> rcpts = new ArrayList<MailAddress>();
        String recipients = message.getStringProperty(JAMES_MAIL_RECIPIENTS);
        StringTokenizer recipientTokenizer = new StringTokenizer(recipients, JAMES_MAIL_SEPARATOR);
        while (recipientTokenizer.hasMoreTokens()) {
            String token = recipientTokenizer.nextToken();
            try {
                MailAddress rcpt = new MailAddress(token);
                rcpts.add(rcpt);
            } catch (AddressException e) {
                // Should never happen as long as the user does not modify the
                // the header by himself
                logger.error("Unable to parse the recipient address " + token + " for mail " + mail.getName() + ", so we ignore it", e);
            }
        }
        mail.setRecipients(rcpts);
        mail.setRemoteAddr(message.getStringProperty(JAMES_MAIL_REMOTEADDR));
        mail.setRemoteHost(message.getStringProperty(JAMES_MAIL_REMOTEHOST));

        String attributeNames = message.getStringProperty(JAMES_MAIL_ATTRIBUTE_NAMES);
        StringTokenizer namesTokenizer = new StringTokenizer(attributeNames, JAMES_MAIL_SEPARATOR);
        while (namesTokenizer.hasMoreTokens()) {
            String name = namesTokenizer.nextToken();

            // Now cast the property back to Serializable and set it as attribute.
            // See JAMES-1241
            Object attrValue = message.getObjectProperty(name);

            // ignore null values. See JAMES-1294
            if (attrValue != null) {
                if (attrValue instanceof Serializable) {
                    mail.setAttribute(name, (Serializable) attrValue);
                } else {
                    logger.error("Not supported mail attribute " + name + " of type " + attrValue + " for mail " + mail.getName());
                }
            }
        }

        String sender = message.getStringProperty(JAMES_MAIL_SENDER);
        if (sender == null || sender.trim().length() <= 0) {
            mail.setSender(null);
        } else {
            try {
                mail.setSender(new MailAddress(sender));
            } catch (AddressException e) {
                // Should never happen as long as the user does not modify the
                // the header by himself
                logger.error("Unable to parse the sender address " + sender + " for mail " + mail.getName() + ", so we fallback to a null sender", e);
                mail.setSender(null);
            }
        }

        mail.setState(message.getStringProperty(JAMES_MAIL_STATE));

    }

    /**
     * Convert the attribute value if necessary.
     *
     * @param value
     * @return convertedValue
     */
    protected Object convertAttributeValue(Object value) {
        if (value == null || value instanceof String || value instanceof Byte || value instanceof Long || value instanceof Double || value instanceof Boolean || value instanceof Integer || value instanceof Short || value instanceof Float) {
            return value;
        }
        return value.toString();
    }

    @Override
    public String toString() {
        return "MailQueue:" + queueName;
    }

    /**
     * Create a {@link org.apache.james.queue.api.MailQueue.MailQueueItem} for the given parameters
     *
     * @param connection
     * @param session
     * @param consumer
     * @param message
     * @return item
     * @throws JMSException
     * @throws MessagingException
     */
    protected MailQueueItem createMailQueueItem(Connection connection, Session session, MessageConsumer consumer, Message message) throws JMSException, MessagingException {
        final Mail mail = createMail(message);
        return new JMSMailQueueItem(mail, connection, session, consumer);
    }

    protected String getMessageSelector() {
        return JAMES_NEXT_DELIVERY + " <= " + System.currentTimeMillis() + " OR " + FORCE_DELIVERY + " = true";
    }

    @SuppressWarnings("unchecked")
    @Override
    public long getSize() throws MailQueueException {
        Connection connection = null;
        Session session = null;
        QueueBrowser browser = null;
        int size = 0;
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue queue = session.createQueue(queueName);

            browser = session.createBrowser(queue);

            Enumeration<Message> messages = browser.getEnumeration();

            while (messages.hasMoreElements()) {
                messages.nextElement();
                size++;
            }
            return size;
        } catch (Exception e) {
            logger.error("Unable to get size of queue " + queueName, e);
            throw new MailQueueException("Unable to get size of queue " + queueName, e);
        } finally {
            try {
                if (browser != null)
                    browser.close();
            } catch (JMSException e1) {
                // ignore here
            }

            try {
                if (session != null)
                    session.close();
            } catch (JMSException e1) {
                // ignore here
            }

            try {
                if (connection != null)
                    connection.close();
            } catch (JMSException e1) {
                // ignore here
            }
        }
    }

    @Override
    public long flush() throws MailQueueException {
        Connection connection = null;
        Session session = null;
        Message message = null;
        MessageConsumer consumer = null;
        MessageProducer producer = null;
        boolean first = true;
        long count = 0;
        try {
            connection = connectionFactory.createConnection();
            connection.start();

            session = connection.createSession(true, Session.SESSION_TRANSACTED);
            Queue queue = session.createQueue(queueName);
            consumer = session.createConsumer(queue);
            producer = session.createProducer(queue);

            while (first || message != null) {
                if (first) {
                    // give the consumer 2000 ms to receive messages
                    message = consumer.receive(2000);
                } else {
                    message = consumer.receiveNoWait();
                }
                first = false;

                if (message != null) {
                    Message m = copy(session, message);
                    m.setBooleanProperty(FORCE_DELIVERY, true);
                    producer.send(m, message.getJMSDeliveryMode(), message.getJMSPriority(), message.getJMSExpiration());
                    count++;
                }
            }
            session.commit();
            return count;
        } catch (Exception e) {
            logger.error("Unable to flush mail", e);
            try {
                session.rollback();
            } catch (JMSException e1) {
                // ignore on rollback
            }
            throw new MailQueueException("Unable to get size of queue " + queueName, e);
        } finally {
            if (consumer != null) {

                try {
                    consumer.close();
                } catch (JMSException e1) {
                    // ignore on rollback
                }
            }
            if (producer != null) {

                try {
                    producer.close();
                } catch (JMSException e1) {
                    // ignore on rollback
                }
            }

            try {
                if (session != null)
                    session.close();
            } catch (JMSException e1) {
                // ignore here
            }

            try {
                if (connection != null)
                    connection.close();
            } catch (JMSException e1) {
                // ignore here
            }
        }
    }

    @Override
    public long clear() throws MailQueueException {
        return count(removeWithSelector(null));
    }

    protected long count(List<Message> msgs) {
        if (msgs == null) {
            return -1;
        } else {
            return msgs.size();
        }
    }

    /**
     * Remove messages with the given selector
     *
     * @param selector
     * @return messages
     */
    public List<Message> removeWithSelector(String selector) throws MailQueueException {
        Connection connection = null;
        Session session = null;
        Message message = null;
        MessageConsumer consumer = null;
        boolean first = true;
        List<Message> messages = new ArrayList<Message>();

        try {
            connection = connectionFactory.createConnection();
            connection.start();

            session = connection.createSession(true, Session.SESSION_TRANSACTED);
            Queue queue = session.createQueue(queueName);
            consumer = session.createConsumer(queue, selector);
            while (first || message != null) {
                if (first) {
                    // give the consumer 2000 ms to receive messages
                    message = consumer.receive(2000);
                } else {
                    message = consumer.receiveNoWait();
                }
                first = false;
                if (message != null) {
                    messages.add(message);
                }
            }
            session.commit();
            return messages;
        } catch (Exception e) {
            try {
                session.rollback();
            } catch (JMSException e1) {
                // ignore on rollback
            }
            throw new MailQueueException("Unable to remove mails", e);

        } finally {
            if (consumer != null) {

                try {
                    consumer.close();
                } catch (JMSException e1) {
                    // ignore on rollback
                }
            }

            try {
                if (session != null)
                    session.close();
            } catch (JMSException e1) {
                // ignore here
            }

            try {
                if (connection != null)
                    connection.close();
            } catch (JMSException e1) {
                // ignore here
            }
        }
    }

    /**
     * Create a copy of the given {@link Message}. This includes the properties
     * and the payload
     *
     * @param session
     * @param m
     * @return copy
     * @throws JMSException
     */
    @SuppressWarnings("unchecked")
    protected Message copy(Session session, Message m) throws JMSException {
        ObjectMessage message = (ObjectMessage) m;
        ObjectMessage copy = session.createObjectMessage(message.getObject());

        Enumeration<String> properties = message.getPropertyNames();
        while (properties.hasMoreElements()) {
            String name = properties.nextElement();
            copy.setObjectProperty(name, message.getObjectProperty(name));
        }

        return copy;
    }

    @Override
    public long remove(Type type, String value) throws MailQueueException {
        switch (type) {
            case Name:
                return count(removeWithSelector(JAMES_MAIL_NAME + " = '" + value + "'"));
            case Sender:
                return count(removeWithSelector(JAMES_MAIL_SENDER + " = '" + value + "'"));
            case Recipient:
                return count(removeWithSelector(JAMES_MAIL_RECIPIENTS + " = '" + value + "' or " + JAMES_MAIL_RECIPIENTS + " = '%," + value + "' or " + JAMES_MAIL_RECIPIENTS + " = '%," + value + "%'"));
            default:
                break;
        }
        return -1;
    }

    @Override
    @SuppressWarnings("unchecked")
    public MailQueueIterator browse() throws MailQueueException {
        Connection connection = null;
        Session session = null;
        QueueBrowser browser = null;
        try {
            connection = connectionFactory.createConnection();
            connection.start();
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Queue queue = session.createQueue(queueName);

            browser = session.createBrowser(queue);

            final Enumeration<Message> messages = browser.getEnumeration();

            final Connection myConnection = connection;
            final Session mySession = session;
            final QueueBrowser myBrowser = browser;

            return new MailQueueIterator() {

                @Override
                public void remove() {
                    throw new UnsupportedOperationException("Read-only");
                }

                @Override
                public MailQueueItemView next() {
                    while (hasNext()) {
                        try {
                            Message m = messages.nextElement();
                            final Mail mail = createMail(m);
                            final long nextDelivery = m.getLongProperty(JAMES_NEXT_DELIVERY);
                            return new MailQueueItemView() {

                                @Override
                                public long getNextDelivery() {
                                    return nextDelivery;
                                }

                                @Override
                                public Mail getMail() {
                                    return mail;
                                }
                            };
                        } catch (MessagingException e) {
                            logger.error("Unable to browse queue", e);
                        } catch (JMSException e) {
                            logger.error("Unable to browse queue", e);
                        }
                    }

                    throw new NoSuchElementException();
                }

                @Override
                public boolean hasNext() {
                    return messages.hasMoreElements();
                }

                @Override
                public void close() {

                    try {
                        if (myBrowser != null)
                            myBrowser.close();
                    } catch (JMSException e1) {
                        // ignore here
                    }

                    try {
                        if (mySession != null)
                            mySession.close();
                    } catch (JMSException e1) {
                        // ignore here
                    }

                    try {
                        if (myConnection != null)
                            myConnection.close();
                    } catch (JMSException e1) {
                        // ignore here
                    }
                }
            };

        } catch (Exception e) {

            try {
                if (browser != null)
                    browser.close();
            } catch (JMSException e1) {
                // ignore here
            }

            try {
                if (session != null)
                    session.close();
            } catch (JMSException e1) {
                // ignore here
            }

            try {
                if (connection != null)
                    connection.close();
            } catch (JMSException e1) {
                // ignore here
            }
            logger.error("Unable to browse queue " + queueName, e);
            throw new MailQueueException("Unable to browse queue " + queueName, e);
        }
    }

}
TOP

Related Classes of org.apache.james.queue.jms.JMSMailQueue

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.