/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.codehaus.activemq;
import javax.jms.ConnectionConsumer;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.ServerSession;
import javax.jms.ServerSessionPool;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.util.MemoryBoundedQueue;
/**
* For application servers, <CODE>Connection</CODE> objects provide a special
* facility for creating a <CODE>ConnectionConsumer</CODE> (optional). The
* messages it is to consume are specified by a <CODE>Destination</CODE> and
* a message selector. In addition, a <CODE>ConnectionConsumer</CODE> must be
* given a <CODE>ServerSessionPool</CODE> to use for processing its messages.
* <p/>
* <P>
* Normally, when traffic is light, a <CODE>ConnectionConsumer</CODE> gets a
* <CODE>ServerSession</CODE> from its pool, loads it with a single message,
* and starts it. As traffic picks up, messages can back up. If this happens, a
* <CODE>ConnectionConsumer</CODE> can load each <CODE>ServerSession</CODE>
* with more than one message. This reduces the thread context switches and
* minimizes resource use at the expense of some serialization of message
* processing.
*
* @see javax.jms.Connection#createConnectionConsumer
* @see javax.jms.Connection#createDurableConnectionConsumer
* @see javax.jms.QueueConnection#createConnectionConsumer
* @see javax.jms.TopicConnection#createConnectionConsumer
* @see javax.jms.TopicConnection#createDurableConnectionConsumer
*/
public class ActiveMQConnectionConsumer implements ConnectionConsumer,
ActiveMQMessageDispatcher {
private ActiveMQConnection connection;
private ServerSessionPool sessionPool;
private ConsumerInfo consumerInfo;
private boolean closed;
private int maximumMessages;
protected MemoryBoundedQueue messageQueue;
/**
* Create a ConnectionConsumer
*
* @param theConnection
* @param theSessionPool
* @param theConsumerInfo
* @param theMaximumMessages
* @throws JMSException
*/
protected ActiveMQConnectionConsumer(ActiveMQConnection theConnection,
ServerSessionPool theSessionPool, ConsumerInfo theConsumerInfo,
int theMaximumMessages) throws JMSException {
this.connection = theConnection;
this.sessionPool = theSessionPool;
this.consumerInfo = theConsumerInfo;
this.maximumMessages = theMaximumMessages;
this.connection.addConnectionConsumer(this);
this.consumerInfo.setStarted(true);
this.connection.syncSendPacket(this.consumerInfo);
String queueName = connection.clientID + ":" + theConsumerInfo.getConsumerName()+":"+
theConsumerInfo.getConsumerNo();
this.messageQueue = connection.getMemoryBoundedQueue(queueName);
}
/**
* Tests to see if the Message Dispatcher is a target for this message
*
* @param message the message to test
* @return true if the Message Dispatcher can dispatch the message
*/
public boolean isTarget(ActiveMQMessage message) {
return message.isConsumerTarget(this.consumerInfo.getConsumerNo());
}
/**
* Dispatch an ActiveMQMessage
*
* @param message
*/
public void dispatch(ActiveMQMessage message) {
if (message.isConsumerTarget(this.consumerInfo.getConsumerNo())) {
message.setConsumerIdentifer(this.consumerInfo.getConsumerId());
message.setTransientConsumed( !this.consumerInfo.isDurableTopic() && !this.consumerInfo.getDestination().isQueue() );
try {
if( sessionPool != null )
dispatchToSession(message);
else
dispatchToQueue(message);
}
catch (JMSException jmsEx) {
this.connection.handleAsyncException(jmsEx);
}
}
}
/**
* @param message
* @throws JMSException
*/
private void dispatchToQueue(ActiveMQMessage message) throws JMSException {
messageQueue.enqueue(message);
}
/**
* Receives the next message that arrives within the specified timeout interval.
* @throws JMSException
*/
public ActiveMQMessage receive(long timeout) throws JMSException {
try {
ActiveMQMessage message = (ActiveMQMessage) messageQueue.dequeue(timeout);
return message;
}
catch (InterruptedException ioe) {
return null;
}
}
/**
* @param message
* @throws JMSException
*/
private void dispatchToSession(ActiveMQMessage message) throws JMSException {
ServerSession serverSession = sessionPool.getServerSession();
ActiveMQSession session = (ActiveMQSession) serverSession
.getSession();
session.dispatch(message);
serverSession.start();
}
/**
* Gets the server session pool associated with this connection consumer.
*
* @return the server session pool used by this connection consumer
* @throws JMSException if the JMS provider fails to get the server session pool
* associated with this consumer due to some internal error.
*/
public ServerSessionPool getServerSessionPool() throws JMSException {
if (closed) {
throw new IllegalStateException("The Connection Consumer is closed");
}
return this.sessionPool;
}
/**
* Closes the connection consumer.
* <p/>
* <P>
* Since a provider may allocate some resources on behalf of a connection
* consumer outside the Java virtual machine, clients should close these
* resources when they are not needed. Relying on garbage collection to
* eventually reclaim these resources may not be timely enough.
*
* @throws JMSException
*/
public void close() throws JMSException {
if (!closed) {
closed = true;
this.consumerInfo.setStarted(false);
this.connection.asyncSendPacket(this.consumerInfo);
this.connection.removeConnectionConsumer(this);
}
}
}