Package org.activemq

Source Code of org.activemq.ActiveMQConnectionConsumer

/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/

package org.activemq;

import javax.jms.ConnectionConsumer;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.ServerSession;
import javax.jms.ServerSessionPool;
import javax.jms.Session;

import org.activemq.io.util.MemoryBoundedQueue;
import org.activemq.message.ActiveMQMessage;
import org.activemq.message.ConsumerInfo;

/**
* For application servers, <CODE>Connection</CODE> objects provide a special
* facility for creating a <CODE>ConnectionConsumer</CODE> (optional). The
* messages it is to consume are specified by a <CODE>Destination</CODE> and a
* message selector. In addition, a <CODE>ConnectionConsumer</CODE> must be
* given a <CODE>ServerSessionPool</CODE> to use for processing its messages.
* <p/>
* <P>
* Normally, when traffic is light, a <CODE>ConnectionConsumer</CODE> gets a
* <CODE>ServerSession</CODE> from its pool, loads it with a single message,
* and starts it. As traffic picks up, messages can back up. If this happens, a
* <CODE>ConnectionConsumer</CODE> can load each <CODE>ServerSession</CODE>
* with more than one message. This reduces the thread context switches and
* minimizes resource use at the expense of some serialization of message
* processing.
*
* @see javax.jms.Connection#createConnectionConsumer
* @see javax.jms.Connection#createDurableConnectionConsumer
* @see javax.jms.QueueConnection#createConnectionConsumer
* @see javax.jms.TopicConnection#createConnectionConsumer
* @see javax.jms.TopicConnection#createDurableConnectionConsumer
*/

public class ActiveMQConnectionConsumer implements ConnectionConsumer, ActiveMQMessageDispatcher {

    private ActiveMQConnection connection;

    private ServerSessionPool sessionPool;

    private ConsumerInfo consumerInfo;

    private boolean closed;

    protected MemoryBoundedQueue messageQueue;

    /**
     * Create a ConnectionConsumer
     *
     * @param theConnection
     * @param theSessionPool
     * @param theConsumerInfo
     * @param theMaximumMessages
     * @throws JMSException
     */
    protected ActiveMQConnectionConsumer(ActiveMQConnection theConnection, ServerSessionPool theSessionPool,
            ConsumerInfo theConsumerInfo, int theMaximumMessages) throws JMSException {
        this.connection = theConnection;
        this.sessionPool = theSessionPool;
        this.consumerInfo = theConsumerInfo;
        this.connection.addConnectionConsumer(this);
        this.consumerInfo.setStarted(true);
        this.consumerInfo.setPrefetchNumber(theMaximumMessages);
        this.connection.syncSendPacket(this.consumerInfo);

        String queueName = connection.clientID + ":" + theConsumerInfo.getConsumerName() + ":"
                + theConsumerInfo.getConsumerNo();
        this.messageQueue = connection.getMemoryBoundedQueue(queueName);
    }

    /**
     * Tests to see if the Message Dispatcher is a target for this message
     *
     * @param message
     *            the message to test
     * @return true if the Message Dispatcher can dispatch the message
     */
    public boolean isTarget(ActiveMQMessage message) {
        return message.isConsumerTarget(this.consumerInfo.getConsumerNo());
    }

    /**
     * Dispatch an ActiveMQMessage
     *
     * @param message
     */
    public void dispatch(ActiveMQMessage message) {
        if (message.isConsumerTarget(this.consumerInfo.getConsumerNo())) {
            message.setConsumerIdentifer(this.consumerInfo.getConsumerId());
            message.setTransientConsumed(!this.consumerInfo.isDurableTopic()
                    && !this.consumerInfo.getDestination().isQueue());
            try {
                if (sessionPool != null)
                    dispatchToSession(message);
                else
                    dispatchToQueue(message);
            } catch (JMSException jmsEx) {
                this.connection.handleAsyncException(jmsEx);
            }
        }
    }

    /**
     * @param message
     * @throws JMSException
     */
    private void dispatchToQueue(ActiveMQMessage message) throws JMSException {
        messageQueue.enqueue(message);
    }

    /**
     * Receives the next message that arrives within the specified timeout
     * interval.
     *
     * @throws JMSException
     */
    public ActiveMQMessage receive(long timeout) throws JMSException {
        try {
            ActiveMQMessage message = (ActiveMQMessage) messageQueue.dequeue(timeout);
            return message;
        } catch (InterruptedException ioe) {
            return null;
        }
    }

    /**
     * @param message
     * @throws JMSException
     */
    private void dispatchToSession(ActiveMQMessage message) throws JMSException {

        ServerSession serverSession = sessionPool.getServerSession();
        Session nestedSession = serverSession.getSession();
        ActiveMQSession session = null;
        if (nestedSession instanceof ActiveMQSession) {
            session = (ActiveMQSession) nestedSession;
        } else if (nestedSession instanceof ActiveMQTopicSession) {
            ActiveMQTopicSession topicSession = (ActiveMQTopicSession) nestedSession;
            session = (ActiveMQSession) topicSession.getNext();
        } else if (nestedSession instanceof ActiveMQQueueSession) {
            ActiveMQQueueSession queueSession = (ActiveMQQueueSession) nestedSession;
            session = (ActiveMQSession) queueSession.getNext();
        } else {
            throw new JMSException("Invalid instance of session obtained from server session." +
            "The instance should be one of the following: ActiveMQSession, ActiveMQTopicSession, ActiveMQQueueSession. " +
            "Found instance of " + nestedSession.getClass().getName());
        }
        session.dispatch(message);
        serverSession.start();
    }

    /**
     * Gets the server session pool associated with this connection consumer.
     *
     * @return the server session pool used by this connection consumer
     * @throws JMSException
     *             if the JMS provider fails to get the server session pool
     *             associated with this consumer due to some internal error.
     */

    public ServerSessionPool getServerSessionPool() throws JMSException {
        if (closed) {
            throw new IllegalStateException("The Connection Consumer is closed");
        }
        return this.sessionPool;
    }

    /**
     * Closes the connection consumer. <p/>
     * <P>
     * Since a provider may allocate some resources on behalf of a connection
     * consumer outside the Java virtual machine, clients should close these
     * resources when they are not needed. Relying on garbage collection to
     * eventually reclaim these resources may not be timely enough.
     *
     * @throws JMSException
     */

    public void close() throws JMSException {
        if (!closed) {
            closed = true;
            this.consumerInfo.setStarted(false);
            this.connection.asyncSendPacket(this.consumerInfo);
            this.connection.removeConnectionConsumer(this);
        }

    }
}
TOP

Related Classes of org.activemq.ActiveMQConnectionConsumer

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.