Package flex.messaging.services.messaging.adapters

Source Code of flex.messaging.services.messaging.adapters.JMSProducer

/*************************************************************************
*
* ADOBE CONFIDENTIAL
* __________________
*
*  Copyright 2002 - 2007 Adobe Systems Incorporated
*  All Rights Reserved.
*
* NOTICE:  All information contained herein is, and remains
* the property of Adobe Systems Incorporated and its suppliers,
* if any.  The intellectual and technical concepts contained
* herein are proprietary to Adobe Systems Incorporated
* and its suppliers and may be covered by U.S. and Foreign Patents,
* patents in process, and are protected by trade secret or copyright law.
* Dissemination of this information or reproduction of this material
* is strictly forbidden unless prior written permission is obtained
* from Adobe Systems Incorporated.
**************************************************************************/
package flex.messaging.services.messaging.adapters;

import flex.messaging.MessageException;
import flex.messaging.config.ConfigurationException;
import flex.messaging.log.Log;
import flex.messaging.messages.MessagePerformanceUtils;

import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.naming.NamingException;

import java.io.Serializable;
import java.lang.reflect.Field;
import java.util.Iterator;
import java.util.Map;

/**
* A JMSProxy subclass for <code>javax.jms.MessageProducer</code> instances.
*
* @exclude
*/
public abstract class JMSProducer extends JMSProxy
{
    /* JMS related variables */
    protected MessageProducer producer;

    protected int deliveryMode;
    protected int messagePriority;
    protected String messageType;

    //--------------------------------------------------------------------------
    //
    // Constructor
    //
    //--------------------------------------------------------------------------

    /**
     * Create a new JMSProducer with default delivery mode of <code>javax.jms.Message.DEFAULT_DELIVERY_MODE</code>
     * and default message priority of <code>javax.jms.Message.DEFAULT_PRIORITY</code>.
     */
    public JMSProducer()
    {
        super();
        deliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE;
        messagePriority = javax.jms.Message.DEFAULT_PRIORITY;
    }

    //--------------------------------------------------------------------------
    //
    // Initialize, validate, start, and stop methods.
    //
    //--------------------------------------------------------------------------

    /**
     * Initialize with settings from the JMS adapter.
     *
     * @param settings JMS settings to use for initialization.
     */
    public void initialize(JMSSettings settings)
    {
        super.initialize(settings);

        String deliveryString = settings.getDeliveryMode();
        if (deliveryString.equals(JMSConfigConstants.DEFAULT_DELIVERY_MODE))
            deliveryMode = javax.jms.Message.DEFAULT_DELIVERY_MODE;
        else if (deliveryString.equals(JMSConfigConstants.PERSISTENT))
            deliveryMode = javax.jms.DeliveryMode.PERSISTENT;
        else if (deliveryString.equals(JMSConfigConstants.NON_PERSISTENT))
            deliveryMode = javax.jms.DeliveryMode.NON_PERSISTENT;

        messagePriority = settings.getMessagePriority();
        messageType = settings.getMessageType();
    }

    /**
     * Verifies that the <code>JMSProducer</code> is in valid state before
     * it is started. For <code>JMSProducer</code> to be in valid state, it needs
     * to have a message type assigned.
     */
    protected void validate()
    {
        super.validate();

        if (messageType == null || (!(messageType.equals(JMSConfigConstants.TEXT_MESSAGE)
                || messageType.equals(JMSConfigConstants.OBJECT_MESSAGE))) )
        {
            // Unsupported JMS Message Type ''{0}''. Valid values are javax.jms.TextMessage and javax.jms.ObjectMessage.
            ConfigurationException ce = new ConfigurationException();
            ce.setMessage(JMSConfigConstants.INVALID_JMS_MESSAGE_TYPE, new Object[] {messageType});
            throw ce;
        }
    }

    /**
     * Starts the <code>JMSProducer</code>. Subclasses should call <code>super.start</code>.
     */
    public void start() throws NamingException, JMSException
    {
        super.start();

        if (Log.isInfo())
            Log.getLogger(JMSAdapter.LOG_CATEGORY).info("JMS producer for JMS destination '"
                    + destinationJndiName +"' is starting.");
    }

    /**
     * Stops the <code>JMSProducer</code> by closing its underlying
     * <code>MessageProducer</code>. It then calls <code>JMSProxy.close</code>
     * for session and connection closure.
     */
    public void stop()
    {
        if (Log.isInfo())
            Log.getLogger(JMSAdapter.LOG_CATEGORY).info("JMS producer for JMS destination '" +
                    destinationJndiName + "' is stopping.");

        try
        {
            if (producer != null)
                producer.close();
        }
        catch (JMSException e)
        {
            if (Log.isWarn())
                Log.getLogger(JMSAdapter.LOG_CATEGORY).warn("JMS producer for JMS destination '" +
                        destinationJndiName + "' received an error while closing"
                        + " its underlying MessageProducer: " + e.getMessage());
        }

        super.stop();
    }

    //--------------------------------------------------------------------------
    //
    // Public Methods
    //
    //--------------------------------------------------------------------------

    /**
     * Returns the delivery mode used by the <code>JMSProducer</code>.
     *
     * @return The delivery mode used by the <code>JMSProducer</code>.
     */
    public int getDeliveryMode()
    {
        return deliveryMode;
    }

    /**
     * Sets the delivery mode used by the <code>JMSProducer</code>. Valid values
     * javax.jms.DeliveryMode.PERSISTENT and javax.jms.DeliveryMode.NON_PERSISTENT.
     * This propery is optional and defaults to javax.jms.Message.DEFAULT_DELIVERY_MODE.
     *
     * @param deliveryMode
     */
    public void setDeliveryMode(int deliveryMode)
    {
        if (deliveryMode == javax.jms.Message.DEFAULT_DELIVERY_MODE
                || deliveryMode == javax.jms.DeliveryMode.NON_PERSISTENT
                || deliveryMode == javax.jms.DeliveryMode.PERSISTENT)
            this.deliveryMode = deliveryMode;
    }

    /**
     * Returns the message priority used by the <code>JMSProducer</code>.
     *
     * @return an int specifying the message priority.
     */
    public int getMessagePriority()
    {
        return messagePriority;
    }

    /**
     * Sets the message priority used by the <code>JMSProducer</code>.
     * This property is optional and defaults to <code>javax.jms.Message.DEFAULT_PRIORITY</code>.
     *
     * @param messagePriority an int specifying the message priority.
     */
    public void setMessagePriority(int messagePriority)
    {
        this.messagePriority = messagePriority;
    }

    /**
     * Returns the message type used by the <code>JMSProducer</code>.
     *
     * @return The message type used by the <code>JMSProducer</code>.
     */
    public String getMessageType()
    {
        return messageType;
    }

    /**
     * Sets the message type used by the <code>JMSProducer</code>. Supported
     * types are <code>javax.jms.TextMessage</code> and <code>javax.jms.ObjectMessage</code>.
     * This property is mandatory.
     *
     * @param messageType String representing the message type used.
     */
    public void setMessageType(String messageType)
    {
        this.messageType = messageType;
    }

    //--------------------------------------------------------------------------
    //
    // Protected and Private Methods
    //
    //--------------------------------------------------------------------------

    protected void copyHeadersToProperties(Map properties, javax.jms.Message message) throws JMSException
    {
        // Generic Flex headers become JMS properties, named Flex headers become JMS headers
        for (Iterator iter = properties.keySet().iterator(); iter.hasNext();)
        {
            String propName = (String)iter.next();
            Object propValue = properties.get(propName);

            // For now, only named property is TTL.
            if (!propName.equals(JMSConfigConstants.TIME_TO_LIVE))
            {
                // MPI header contains a MessagePerformaceInfo object that cannot
                // be set as a JMS header property. Instead, it is broken down
                // to its primitive types and each primitive is individually
                // set as a JMS header property.
                if (propName.equals(MessagePerformanceUtils.MPI_HEADER_IN))
                {
                    Field[] fields = propValue.getClass().getFields();
                    for (int i = 0; i < fields.length; i++)
                    {
                        Field field = fields[i];
                        // Use MPI_HEADER_IN as prefix to the property name so that
                        // they can be distinguished later when the MessagePerformanceInfo
                        // object gets built from the headers.
                        String mpiPropertyName = MessagePerformanceUtils.MPI_HEADER_IN + field.getName();
                        Object mpiPropertyValue = null;
                        try
                        {
                            mpiPropertyValue = field.get(propValue);
                            message.setObjectProperty(mpiPropertyName, mpiPropertyValue);
                        }
                        catch (Exception e)
                        {
                            if (Log.isWarn())
                                Log.getLogger(JMSAdapter.LOG_CATEGORY).warn("JMSProducer could not retrieve the value of MessagePerformanceUtils property '"
                                        + propValue + "' from the Flex message, therefore it will not be set on the JMS message.");
                        }
                    }
                }
                else if (propValue != null)
                {
                    message.setObjectProperty(propName, propValue);
                }
            }
        }
    }

    protected long getTimeToLive(Map properties) throws JMSException
    {
        long timeToLive = producer.getTimeToLive();
        if (properties.containsKey(JMSConfigConstants.TIME_TO_LIVE))
        {
            long l = ((Long)properties.get(JMSConfigConstants.TIME_TO_LIVE)).longValue();
            if (l != 0)
            {
                // Don't let Flex default override JMS implementation default,
                // only explicit ActionScript TTL usage overrides JMS TTL.
                timeToLive = l;
            }
        }
        return timeToLive;
    }

    void sendMessage(flex.messaging.messages.Message flexMessage) throws JMSException
    {
        if (messageType.equals(JMSConfigConstants.TEXT_MESSAGE))
        {
            MessagePerformanceUtils.markServerPreAdapterExternalTime(flexMessage);
            sendTextMessage(flexMessage.getBody().toString(), flexMessage.getHeaders());
        }
        else if (messageType.equals(JMSConfigConstants.OBJECT_MESSAGE))
        {
            try
            {
                MessagePerformanceUtils.markServerPreAdapterExternalTime(flexMessage);
                sendObjectMessage((Serializable)flexMessage.getBody(), flexMessage.getHeaders());
            }
            catch (ClassCastException ce)
            {
                // The body of the Flex Message could not be converted to a Serializable Java Object.
                MessageException me = new MessageException();
                me.setMessage(JMSConfigConstants.NONSERIALIZABLE_MESSAGE_BODY);
                throw me;
            }
        }
    }

    abstract void sendObjectMessage(Serializable obj, Map properties) throws JMSException;

    abstract void sendTextMessage(String text, Map properties) throws JMSException;
}
TOP

Related Classes of flex.messaging.services.messaging.adapters.JMSProducer

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.