Package org.activemq

Source Code of org.activemq.ActiveMQConnectionConsumer

/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/

package org.activemq;

import javax.jms.ConnectionConsumer;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.ServerSession;
import javax.jms.ServerSessionPool;

import org.activemq.message.ActiveMQMessage;
import org.activemq.message.ConsumerInfo;
import org.activemq.message.util.MemoryBoundedQueue;

/**
* For application servers, <CODE>Connection</CODE> objects provide a special
* facility for creating a <CODE>ConnectionConsumer</CODE> (optional). The
* messages it is to consume are specified by a <CODE>Destination</CODE> and
* a message selector. In addition, a <CODE>ConnectionConsumer</CODE> must be
* given a <CODE>ServerSessionPool</CODE> to use for processing its messages.
* <p/>
* <P>
* Normally, when traffic is light, a <CODE>ConnectionConsumer</CODE> gets a
* <CODE>ServerSession</CODE> from its pool, loads it with a single message,
* and starts it. As traffic picks up, messages can back up. If this happens, a
* <CODE>ConnectionConsumer</CODE> can load each <CODE>ServerSession</CODE>
* with more than one message. This reduces the thread context switches and
* minimizes resource use at the expense of some serialization of message
* processing.
*
* @see javax.jms.Connection#createConnectionConsumer
* @see javax.jms.Connection#createDurableConnectionConsumer
* @see javax.jms.QueueConnection#createConnectionConsumer
* @see javax.jms.TopicConnection#createConnectionConsumer
* @see javax.jms.TopicConnection#createDurableConnectionConsumer
*/

public class ActiveMQConnectionConsumer implements ConnectionConsumer,
        ActiveMQMessageDispatcher {

    private ActiveMQConnection connection;

    private ServerSessionPool sessionPool;

    private ConsumerInfo consumerInfo;

    private boolean closed;

    private int maximumMessages;
   
    protected MemoryBoundedQueue messageQueue;
   

    /**
     * Create a ConnectionConsumer
     *
     * @param theConnection
     * @param theSessionPool
     * @param theConsumerInfo
     * @param theMaximumMessages
     * @throws JMSException
     */
    protected ActiveMQConnectionConsumer(ActiveMQConnection theConnection,
                                         ServerSessionPool theSessionPool, ConsumerInfo theConsumerInfo,
                                         int theMaximumMessages) throws JMSException {
        this.connection = theConnection;
        this.sessionPool = theSessionPool;
        this.consumerInfo = theConsumerInfo;
        this.maximumMessages = theMaximumMessages;
        this.connection.addConnectionConsumer(this);
        this.consumerInfo.setStarted(true);
        this.connection.syncSendPacket(this.consumerInfo);

        String queueName = connection.clientID + ":" + theConsumerInfo.getConsumerName()+":"+
      theConsumerInfo.getConsumerNo();
        this.messageQueue = connection.getMemoryBoundedQueue(queueName);
    }

    /**
     * Tests to see if the Message Dispatcher is a target for this message
     *
     * @param message the message to test
     * @return true if the Message Dispatcher can dispatch the message
     */
    public boolean isTarget(ActiveMQMessage message) {
        return message.isConsumerTarget(this.consumerInfo.getConsumerNo());
    }

    /**
     * Dispatch an ActiveMQMessage
     *
     * @param message
     */
    public void dispatch(ActiveMQMessage message) {
        if (message.isConsumerTarget(this.consumerInfo.getConsumerNo())) {
            message.setConsumerIdentifer(this.consumerInfo.getConsumerId());
            message.setTransientConsumed( !this.consumerInfo.isDurableTopic() && !this.consumerInfo.getDestination().isQueue() );
            try {
              if( sessionPool != null )
                dispatchToSession(message);
              else
                dispatchToQueue(message);
            }
            catch (JMSException jmsEx) {
                this.connection.handleAsyncException(jmsEx);
            }
        }
    }
       
    /**
   * @param message
   * @throws JMSException
   */
  private void dispatchToQueue(ActiveMQMessage message) throws JMSException {
    messageQueue.enqueue(message);
    }

    /**
     * Receives the next message that arrives within the specified timeout interval.
     * @throws JMSException
     */
    public ActiveMQMessage receive(long timeout) throws JMSException {
        try {
            ActiveMQMessage message = (ActiveMQMessage) messageQueue.dequeue(timeout);
            return message;
        }
        catch (InterruptedException ioe) {
            return null;
        }
    }
 
 
    /**
   * @param message
   * @throws JMSException
   */
  private void dispatchToSession(ActiveMQMessage message) throws JMSException {
    ServerSession serverSession = sessionPool.getServerSession();
    ActiveMQSession session = (ActiveMQSession) serverSession
            .getSession();
    session.dispatch(message);
    serverSession.start();
  }

  /**
     * Gets the server session pool associated with this connection consumer.
     *
     * @return the server session pool used by this connection consumer
     * @throws JMSException if the JMS provider fails to get the server session pool
     *                      associated with this consumer due to some internal error.
     */

    public ServerSessionPool getServerSessionPool() throws JMSException {
        if (closed) {
            throw new IllegalStateException("The Connection Consumer is closed");
        }
        return this.sessionPool;
    }

    /**
     * Closes the connection consumer.
     * <p/>
     * <P>
     * Since a provider may allocate some resources on behalf of a connection
     * consumer outside the Java virtual machine, clients should close these
     * resources when they are not needed. Relying on garbage collection to
     * eventually reclaim these resources may not be timely enough.
     *
     * @throws JMSException
     */

    public void close() throws JMSException {
        if (!closed) {
            closed = true;
            this.consumerInfo.setStarted(false);
            this.connection.asyncSendPacket(this.consumerInfo);
            this.connection.removeConnectionConsumer(this);
        }

    }
}
TOP

Related Classes of org.activemq.ActiveMQConnectionConsumer

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.