/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.codehaus.activemq;
import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.capacity.CapacityMonitorEvent;
import org.codehaus.activemq.capacity.CapacityMonitorEventListener;
import org.codehaus.activemq.management.JMSConnectionStatsImpl;
import org.codehaus.activemq.management.JMSStatsImpl;
import org.codehaus.activemq.management.StatsCapable;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.CapacityInfo;
import org.codehaus.activemq.message.ConnectionInfo;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.Packet;
import org.codehaus.activemq.message.PacketListener;
import org.codehaus.activemq.message.ProducerInfo;
import org.codehaus.activemq.message.Receipt;
import org.codehaus.activemq.message.SessionInfo;
import org.codehaus.activemq.message.util.MemoryBoundedQueue;
import org.codehaus.activemq.message.util.MemoryBoundedQueueManager;
import org.codehaus.activemq.transport.TransportChannel;
import org.codehaus.activemq.transport.TransportStatusEvent;
import org.codehaus.activemq.transport.TransportStatusEventListener;
import org.codehaus.activemq.util.IdGenerator;
import org.codehaus.activemq.util.JMSExceptionHelper;
import javax.jms.*;
import javax.jms.IllegalStateException;
import javax.management.j2ee.statistics.Stats;
import java.util.Iterator;
/**
* A <CODE>Connection</CODE> object is a client's active connection to its JMS provider. It typically allocates
* provider resources outside the Java virtual machine (JVM).
* <P>
* Connections support concurrent use.
* <P>
* A connection serves several purposes:
* <UL>
* <LI>It encapsulates an open connection with a JMS provider. It typically represents an open TCP/IP socket between a
* client and the service provider software.
* <LI>Its creation is where client authentication takes place.
* <LI>It can specify a unique client identifier.
* <LI>It provides a <CODE>ConnectionMetaData</CODE> object.
* <LI>It supports an optional <CODE>ExceptionListener</CODE> object.
* </UL>
* <P>
* Because the creation of a connection involves setting up authentication and communication, a connection is a
* relatively heavyweight object. Most clients will do all their messaging with a single connection. Other more advanced
* applications may use several connections. The JMS API does not architect a reason for using multiple connections;
* however, there may be operational reasons for doing so.
* <P>
* A JMS client typically creates a connection, one or more sessions, and a number of message producers and consumers.
* When a connection is created, it is in stopped mode. That means that no messages are being delivered.
* <P>
* It is typical to leave the connection in stopped mode until setup is complete (that is, until all message consumers
* have been created). At that point, the client calls the connection's <CODE>start</CODE> method, and messages begin
* arriving at the connection's consumers. This setup convention minimizes any client confusion that may result from
* asynchronous message delivery while the client is still in the process of setting itself up.
* <P>
* A connection can be started immediately, and the setup can be done afterwards. Clients that do this must be prepared
* to handle asynchronous message delivery while they are still in the process of setting up.
* <P>
* A message producer can send messages while a connection is stopped. <p/>This class is also a <CODE>TopicConnection
* </CODE>. A <CODE>TopicConnection</CODE> object is an active connection to a publish/subscribe JMS provider. A
* client uses a <CODE>TopicConnection</CODE> object to create one or more <CODE>TopicSession</CODE> objects for
* producing and consuming messages.
* <P>
* A <CODE>TopicConnection</CODE> can be used to create a <CODE>TopicSession</CODE>, from which specialized
* topic-related objects can be created. A more general, and recommended approach is to use the <CODE>Connection
* </CODE> object.
* <P>
* <p/><p/>This class is also a <CODE>QueueConnection</CODE>. A A <CODE>QueueConnection</CODE> object is an active
* connection to a point-to-point JMS provider. A client uses a <CODE>QueueConnection</CODE> object to create one or
* more <CODE>QueueSession</CODE> objects for producing and consuming messages.
* <P>
* A <CODE>QueueConnection</CODE> can be used to create a <CODE>QueueSession</CODE>, from which specialized
* queue-related objects can be created. A more general, and recommended, approach is to use the <CODE>Connection
* </CODE> object.
* <P>
* A <CODE>QueueConnection</CODE> cannot be used to create objects specific to the publish/subscribe domain. The
* <CODE>createDurableConnectionConsumer</CODE> method inherits from <CODE>Connection</CODE>, but must throw an
* <CODE>IllegalStateException</CODE> if used from <CODE>QueueConnection</CODE>. // *
*
* @version $Revision: 1.11 $
* @see javax.jms.Connection
* @see javax.jms.ConnectionFactory
* @see javax.jms.QueueConnection
* @see javax.jms.TopicConnection
* @see javax.jms.TopicConnectionFactory
* @see javax.jms.QueueConnection
* @see javax.jms.QueueConnectionFactory
*/
public class ActiveMQConnection
implements
Connection,
PacketListener,
ExceptionListener,
TopicConnection,
QueueConnection,
StatsCapable,
CapacityMonitorEventListener,
TransportStatusEventListener,
Closeable {
/**
* Default UserName for the Connection
*/
public static final String DEFAULT_USER = "defaultUser";
/**
* Default URL for the ActiveMQ Broker
*/
public static final String DEFAULT_URL = "tcp://localhost:61616";
/**
* Default Password for the Connection
*/
public static final String DEFAULT_PASSWORD = "defaultPassword";
private static final Log log = LogFactory.getLog(ActiveMQConnection.class);
private static final int DEFAULT_CONNECTION_MEMORY_LIMIT = 10 * 1024 * 1024;
// properties
private ActiveMQConnectionFactory factory;
private String userName;
private String password;
protected String clientID;
private int sendCloseTimeout = 2000;
private TransportChannel transportChannel;
private ExceptionListener exceptionListener;
private ActiveMQPrefetchPolicy prefetchPolicy;
private JMSStatsImpl factoryStats;
private MemoryBoundedQueueManager boundedQueueManager;
protected IdGenerator consumerIdGenerator;
private IdGenerator clientIdGenerator;
protected IdGenerator packetIdGenerator;
private IdGenerator sessionIdGenerator;
private JMSConnectionStatsImpl stats;
// internal state
private CopyOnWriteArrayList sessions;
private CopyOnWriteArrayList messageDispatchers;
private CopyOnWriteArrayList connectionConsumers;
private SynchronizedInt consumerNumberGenerator;
private ActiveMQConnectionMetaData connectionMetaData;
private SynchronizedBoolean closed;
private SynchronizedBoolean started;
private boolean clientIDSet;
private boolean isConnectionInfoSentToBroker;
private boolean isTransportOK;
private boolean startedTransport;
private long startTime;
private long flowControlSleepTime = 0;
private boolean userSpecifiedClientID;
/**
* Should we use an async send for persistent non transacted messages ?
*/
protected boolean useAsyncSend = true;
private int sendConnectionInfoTimeout = 30000;
private boolean J2EEcompliant = true;
/**
* A static helper method to create a new connection
*
* @return an ActiveMQConnection
* @throws JMSException
*/
public static ActiveMQConnection makeConnection() throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
return (ActiveMQConnection) factory.createConnection();
}
/**
* A static helper method to create a new connection
*
* @param uri
* @return and ActiveMQConnection
* @throws JMSException
*/
public static ActiveMQConnection makeConnection(String uri) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
return (ActiveMQConnection) factory.createConnection();
}
/**
* A static helper method to create a new connection
*
* @param user
* @param password
* @param uri
* @return an ActiveMQConnection
* @throws JMSException
*/
public static ActiveMQConnection makeConnection(String user, String password, String uri) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user, password, uri);
return (ActiveMQConnection) factory.createConnection();
}
/**
* Constructs a connection from an existing TransportChannel and user/password.
*
* @param factory
* @param theUserName the users name
* @param thePassword the password
* @param transportChannel the transport channel to communicate with the server
* @throws JMSException
*/
public ActiveMQConnection(ActiveMQConnectionFactory factory, String theUserName, String thePassword,
TransportChannel transportChannel) throws JMSException {
this(factory, theUserName, thePassword);
this.transportChannel = transportChannel;
this.transportChannel.setPacketListener(this);
this.transportChannel.setExceptionListener(this);
this.transportChannel.addTransportStatusEventListener(this);
this.isTransportOK = true;
}
protected ActiveMQConnection(ActiveMQConnectionFactory factory, String theUserName, String thePassword) {
this.factory = factory;
this.userName = theUserName;
this.password = thePassword;
this.clientIdGenerator = new IdGenerator();
this.packetIdGenerator = new IdGenerator();
this.consumerIdGenerator = new IdGenerator();
this.sessionIdGenerator = new IdGenerator();
this.consumerNumberGenerator = new SynchronizedInt(0);
this.sessions = new CopyOnWriteArrayList();
this.messageDispatchers = new CopyOnWriteArrayList();
this.connectionConsumers = new CopyOnWriteArrayList();
this.connectionMetaData = new ActiveMQConnectionMetaData();
this.closed = new SynchronizedBoolean(false);
this.started = new SynchronizedBoolean(false);
this.startTime = System.currentTimeMillis();
this.prefetchPolicy = new ActiveMQPrefetchPolicy();
this.boundedQueueManager = new MemoryBoundedQueueManager(clientID, DEFAULT_CONNECTION_MEMORY_LIMIT);
this.boundedQueueManager.addCapacityEventListener(this);
boolean transactional = this instanceof XAConnection;
factoryStats = factory.getFactoryStats();
factoryStats.addConnection(this);
stats = new JMSConnectionStatsImpl(sessions, transactional);
factory.onConnectionCreate(this);
}
/**
* @return statistics for this Connection
*/
public Stats getStats() {
return stats;
}
/**
* @return a number unique for this connection
*/
public JMSConnectionStatsImpl getConnectionStats() {
return stats;
}
/**
* Creates a <CODE>Session</CODE> object.
*
* @param transacted indicates whether the session is transacted
* @param acknowledgeMode indicates whether the consumer or the client will acknowledge any messages it receives;
* ignored if the session is transacted. Legal values are <code>Session.AUTO_ACKNOWLEDGE</code>,
* <code>Session.CLIENT_ACKNOWLEDGE</code>, and <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
* @return a newly created session
* @throws JMSException if the <CODE>Connection</CODE> object fails to create a session due to some internal error
* or lack of support for the specific transaction and acknowledgement mode.
* @see Session#AUTO_ACKNOWLEDGE
* @see Session#CLIENT_ACKNOWLEDGE
* @see Session#DUPS_OK_ACKNOWLEDGE
* @since 1.1
*/
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
checkClosed();
ensureClientIDInitialised();
return new ActiveMQSession(this, (transacted ? Session.SESSION_TRANSACTED : acknowledgeMode));
}
/**
* Gets the client identifier for this connection.
* <P>
* This value is specific to the JMS provider. It is either preconfigured by an administrator in a <CODE>
* ConnectionFactory</CODE> object or assigned dynamically by the application by calling the
* <code>setClientID</code> method.
*
* @return the unique client identifier
* @throws JMSException if the JMS provider fails to return the client ID for this connection due to some internal
* error.
*/
public String getClientID() throws JMSException {
checkClosed();
return this.clientID;
}
/**
* Sets the client identifier for this connection.
* <P>
* The preferred way to assign a JMS client's client identifier is for it to be configured in a client-specific
* <CODE>ConnectionFactory</CODE> object and transparently assigned to the <CODE>Connection</CODE> object it
* creates.
* <P>
* Alternatively, a client can set a connection's client identifier using a provider-specific value. The facility to
* set a connection's client identifier explicitly is not a mechanism for overriding the identifier that has been
* administratively configured. It is provided for the case where no administratively specified identifier exists.
* If one does exist, an attempt to change it by setting it must throw an <CODE>IllegalStateException</CODE>. If
* a client sets the client identifier explicitly, it must do so immediately after it creates the connection and
* before any other action on the connection is taken. After this point, setting the client identifier is a
* programming error that should throw an <CODE>IllegalStateException</CODE>.
* <P>
* The purpose of the client identifier is to associate a connection and its objects with a state maintained on
* behalf of the client by a provider. The only such state identified by the JMS API is that required to support
* durable subscriptions.
* <P>
* If another connection with the same <code>clientID</code> is already running when this method is called, the
* JMS provider should detect the duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
*
* @param newClientID the unique client identifier
* @throws JMSException if the JMS provider fails to set the client ID for this connection due to some internal
* error.
* @throws javax.jms.InvalidClientIDException
* if the JMS client specifies an invalid or duplicate client ID.
* @throws javax.jms.IllegalStateException
* if the JMS client attempts to set a connection's client ID at the wrong
* time or when it has been administratively configured.
*/
public void setClientID(String newClientID) throws JMSException {
if (this.clientIDSet) {
throw new IllegalStateException("The clientID has already been set");
}
if (this.isConnectionInfoSentToBroker) {
throw new IllegalStateException("Setting clientID on a used Connection is not allowed");
}
checkClosed();
this.clientID = newClientID;
this.userSpecifiedClientID = true;
ensureClientIDInitialised();
}
/**
* Gets the metadata for this connection.
*
* @return the connection metadata
* @throws JMSException if the JMS provider fails to get the connection metadata for this connection.
* @see javax.jms.ConnectionMetaData
*/
public ConnectionMetaData getMetaData() throws JMSException {
checkClosed();
return this.connectionMetaData;
}
/**
* Gets the <CODE>ExceptionListener</CODE> object for this connection. Not every <CODE>Connection</CODE> has an
* <CODE>ExceptionListener</CODE> associated with it.
*
* @return the <CODE>ExceptionListener</CODE> for this connection, or null. if no <CODE>ExceptionListener</CODE>
* is associated with this connection.
* @throws JMSException if the JMS provider fails to get the <CODE>ExceptionListener</CODE> for this connection.
* @see javax.jms.Connection#setExceptionListener(ExceptionListener)
*/
public ExceptionListener getExceptionListener() throws JMSException {
checkClosed();
return this.exceptionListener;
}
/**
* Sets an exception listener for this connection.
* <P>
* If a JMS provider detects a serious problem with a connection, it informs the connection's <CODE>
* ExceptionListener</CODE>, if one has been registered. It does this by calling the listener's <CODE>onException
* </CODE> method, passing it a <CODE>JMSException</CODE> object describing the problem.
* <P>
* An exception listener allows a client to be notified of a problem asynchronously. Some connections only consume
* messages, so they would have no other way to learn their connection has failed.
* <P>
* A connection serializes execution of its <CODE>ExceptionListener</CODE>.
* <P>
* A JMS provider should attempt to resolve connection problems itself before it notifies the client of them.
*
* @param listener the exception listener
* @throws JMSException if the JMS provider fails to set the exception listener for this connection.
*/
public void setExceptionListener(ExceptionListener listener) throws JMSException {
checkClosed();
this.exceptionListener = listener;
this.transportChannel.setExceptionListener(listener);
}
/**
* Starts (or restarts) a connection's delivery of incoming messages. A call to <CODE>start</CODE> on a connection
* that has already been started is ignored.
*
* @throws JMSException if the JMS provider fails to start message delivery due to some internal error.
* @see javax.jms.Connection#stop()
*/
public void start() throws JMSException {
checkClosed();
if (started.commit(false, true)) {
// We have a change in connection info to send.
//send the Connection info again
sendConnectionInfoToBroker(sendConnectionInfoTimeout, true);
for (Iterator i = sessions.iterator(); i.hasNext();) {
ActiveMQSession s = (ActiveMQSession) i.next();
s.start();
}
}
}
/**
* @return true if this Connection is started
*/
protected boolean isStarted() {
return started.get();
}
/**
* Temporarily stops a connection's delivery of incoming messages. Delivery can be restarted using the connection's
* <CODE>start</CODE> method. When the connection is stopped, delivery to all the connection's message consumers
* is inhibited: synchronous receives block, and messages are not delivered to message listeners.
* <P>
* This call blocks until receives and/or message listeners in progress have completed.
* <P>
* Stopping a connection has no effect on its ability to send messages. A call to <CODE>stop</CODE> on a
* connection that has already been stopped is ignored.
* <P>
* A call to <CODE>stop</CODE> must not return until delivery of messages has paused. This means that a client can
* rely on the fact that none of its message listeners will be called and that all threads of control waiting for
* <CODE>receive</CODE> calls to return will not return with a message until the connection is restarted. The
* receive timers for a stopped connection continue to advance, so receives may time out while the connection is
* stopped.
* <P>
* If message listeners are running when <CODE>stop</CODE> is invoked, the <CODE>stop</CODE> call must wait
* until all of them have returned before it may return. While these message listeners are completing, they must
* have the full services of the connection available to them.
*
* @throws JMSException if the JMS provider fails to stop message delivery due to some internal error.
* @see javax.jms.Connection#start()
*/
public void stop() throws JMSException {
checkClosed();
if (started.commit(true, false)) {
for (Iterator i = sessions.iterator(); i.hasNext();) {
ActiveMQSession s = (ActiveMQSession) i.next();
s.stop();
}
sendConnectionInfoToBroker(2000, true);
}
}
/**
* Closes the connection.
* <P>
* Since a provider typically allocates significant resources outside the JVM on behalf of a connection, clients
* should close these resources when they are not needed. Relying on garbage collection to eventually reclaim these
* resources may not be timely enough.
* <P>
* There is no need to close the sessions, producers, and consumers of a closed connection.
* <P>
* Closing a connection causes all temporary destinations to be deleted.
* <P>
* When this method is invoked, it should not return until message processing has been shut down in an orderly
* fashion. This means that all message listeners that may have been running have returned, and that all pending
* receives have returned. A close terminates all pending message receives on the connection's sessions' consumers.
* The receives may return with a message or with null, depending on whether there was a message available at the
* time of the close. If one or more of the connection's sessions' message listeners is processing a message at the
* time when connection <CODE>close</CODE> is invoked, all the facilities of the connection and its sessions must
* remain available to those listeners until they return control to the JMS provider.
* <P>
* Closing a connection causes any of its sessions' transactions in progress to be rolled back. In the case where a
* session's work is coordinated by an external transaction manager, a session's <CODE>commit</CODE> and <CODE>
* rollback</CODE> methods are not used and the result of a closed session's work is determined later by the
* transaction manager. Closing a connection does NOT force an acknowledgment of client-acknowledged sessions.
* <P>
* Invoking the <CODE>acknowledge</CODE> method of a received message from a closed connection's session must
* throw an <CODE>IllegalStateException</CODE>. Closing a closed connection must NOT throw an exception.
*
* @throws JMSException if the JMS provider fails to close the connection due to some internal error. For example, a
* failure to release resources or to close a socket connection can cause this exception to be thrown.
*/
public synchronized void close() throws JMSException {
this.transportChannel.setPendingStop(true);
if (!closed.get()) {
boundedQueueManager.removeCapacityEventListener(this);
try {
for (Iterator i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = (ActiveMQSession) i.next();
s.close();
}
for (Iterator i = this.connectionConsumers.iterator(); i.hasNext();) {
ActiveMQConnectionConsumer c = (ActiveMQConnectionConsumer) i.next();
c.close();
}
try {
sendConnectionInfoToBroker(sendCloseTimeout, true);
}
catch (TimeoutExpiredException e) {
log.warn("Failed to send close to broker, timeout expired of: " + sendCloseTimeout + " millis");
}
this.connectionConsumers.clear();
this.messageDispatchers.clear();
this.transportChannel.stop();
}
finally {
this.sessions.clear();
started.set(false);
factory.onConnectionClose(this);
}
closed.set(true);
}
}
/**
* simply throws an exception if the Connection is already closed
*
* @throws JMSException
*/
protected synchronized void checkClosed() throws JMSException {
if (!startedTransport) {
startedTransport = true;
this.transportChannel.start();
}
if (this.closed.get()) {
throw new IllegalStateException("The Connection is closed");
}
}
/**
* Creates a connection consumer for this connection (optional operation). This is an expert facility not used by
* regular JMS clients.
*
* @param destination the destination to access
* @param messageSelector only messages with properties matching the message selector expression are delivered. A
* value of null or an empty string indicates that there is no message selector for the message consumer.
* @param sessionPool the server session pool to associate with this connection consumer
* @param maxMessages the maximum number of messages that can be assigned to a server session at one time
* @return the connection consumer
* @throws JMSException if the <CODE>Connection</CODE> object fails to create a connection consumer due to some
* internal error or invalid arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
* @throws javax.jms.InvalidDestinationException
* if an invalid destination is specified.
* @throws javax.jms.InvalidSelectorException
* if the message selector is invalid.
* @see javax.jms.ConnectionConsumer
* @since 1.1
*/
public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
ServerSessionPool sessionPool, int maxMessages) throws JMSException {
checkClosed();
ensureClientIDInitialised();
ConsumerInfo info = new ConsumerInfo();
info.setId(this.packetIdGenerator.generateId());
info.setConsumerId(consumerIdGenerator.generateId());
info.setDestination(ActiveMQMessageTransformation.transformDestination(destination));
info.setSelector(messageSelector);
return new ActiveMQConnectionConsumer(this, sessionPool, info, maxMessages);
}
/**
* Create a durable connection consumer for this connection (optional operation). This is an expert facility not
* used by regular JMS clients.
*
* @param topic topic to access
* @param subscriptionName durable subscription name
* @param messageSelector only messages with properties matching the message selector expression are delivered. A
* value of null or an empty string indicates that there is no message selector for the message consumer.
* @param sessionPool the server session pool to associate with this durable connection consumer
* @param maxMessages the maximum number of messages that can be assigned to a server session at one time
* @return the durable connection consumer
* @throws JMSException if the <CODE>Connection</CODE> object fails to create a connection consumer due to some
* internal error or invalid arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
* @throws javax.jms.InvalidDestinationException
* if an invalid destination is specified.
* @throws javax.jms.InvalidSelectorException
* if the message selector is invalid.
* @see javax.jms.ConnectionConsumer
* @since 1.1
*/
public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName,
String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
checkClosed();
ensureClientIDInitialised();
ConsumerInfo info = new ConsumerInfo();
info.setId(this.packetIdGenerator.generateId());
info.setConsumerId(this.consumerIdGenerator.generateId());
info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
info.setSelector(messageSelector);
info.setConsumerName(subscriptionName);
return new ActiveMQConnectionConsumer(this, sessionPool, info, maxMessages);
}
/**
* Implementation of the PacketListener interface - consume a packet
*
* @param packet - the Packet to consume
* @see org.codehaus.activemq.message.PacketListener#consume(org.codehaus.activemq.message.Packet)
*/
public void consume(Packet packet) {
if (!closed.get() && packet != null) {
if (packet.isJMSMessage()) {
ActiveMQMessage message = (ActiveMQMessage) packet;
message.setReadOnly(true);
message.setProducerID(clientID);
// lets check for expired messages which is only relevant for multicast based stuff
// as a pointcast based network should filter out this stuff
if (transportChannel.isMulticast()) {
long expiration = message.getJMSExpiration();
if (expiration > 0) {
long timeStamp = System.currentTimeMillis();
if (timeStamp > expiration) {
if (log.isDebugEnabled()) {
log.debug("Discarding expired message: " + message);
}
return;
}
}
}
try {
int count = 0;
for (Iterator i = this.messageDispatchers.iterator(); i.hasNext();) {
ActiveMQMessageDispatcher dispatcher = (ActiveMQMessageDispatcher) i.next();
if (dispatcher.isTarget(message)) {
if (count > 0) {
//separate message for each Session etc.
message = message.deepCopy();
}
dispatcher.dispatch(message);
count++;
}
}
}
catch (JMSException jmsEx) {
handleAsyncException(jmsEx);
}
}
else if (packet.getPacketType() == Packet.CAPACITY_INFO) {
CapacityInfo info = (CapacityInfo) packet;
flowControlSleepTime = info.getFlowControlTimeout();
//System.out.println("SET FLOW TIMEOUT = " + flowControlSleepTime + " FOR " + info);
}
}
}
/**
* @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
*/
public void onException(JMSException jmsEx) {
//Got an exception propagated up from the transport channel
handleAsyncException(jmsEx);
isTransportOK = false;
try {
close();
}
catch (JMSException ex) {
log.warn("Got an exception closing the connection", ex);
}
}
/**
* Creates a <CODE>TopicSession</CODE> object.
*
* @param transacted indicates whether the session is transacted
* @param acknowledgeMode indicates whether the consumer or the client will acknowledge any messages it receives;
* ignored if the session is transacted. Legal values are <code>Session.AUTO_ACKNOWLEDGE</code>,
* <code>Session.CLIENT_ACKNOWLEDGE</code>, and <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
* @return a newly created topic session
* @throws JMSException if the <CODE>TopicConnection</CODE> object fails to create a session due to some internal
* error or lack of support for the specific transaction and acknowledgement mode.
* @see Session#AUTO_ACKNOWLEDGE
* @see Session#CLIENT_ACKNOWLEDGE
* @see Session#DUPS_OK_ACKNOWLEDGE
*/
public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
return new ActiveMQTopicSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
}
/**
* Creates a connection consumer for this connection (optional operation). This is an expert facility not used by
* regular JMS clients.
*
* @param topic the topic to access
* @param messageSelector only messages with properties matching the message selector expression are delivered. A
* value of null or an empty string indicates that there is no message selector for the message consumer.
* @param sessionPool the server session pool to associate with this connection consumer
* @param maxMessages the maximum number of messages that can be assigned to a server session at one time
* @return the connection consumer
* @throws JMSException if the <CODE>TopicConnection</CODE> object fails to create a connection consumer due to
* some internal error or invalid arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
* @throws InvalidDestinationException if an invalid topic is specified.
* @throws InvalidSelectorException if the message selector is invalid.
* @see javax.jms.ConnectionConsumer
*/
public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector,
ServerSessionPool sessionPool, int maxMessages) throws JMSException {
checkClosed();
ensureClientIDInitialised();
ConsumerInfo info = new ConsumerInfo();
info.setId(this.packetIdGenerator.generateId());
info.setConsumerId(this.consumerIdGenerator.generateId());
info.setDestination(ActiveMQMessageTransformation.transformDestination(topic));
info.setSelector(messageSelector);
return new ActiveMQConnectionConsumer(this, sessionPool, info, maxMessages);
}
/**
* Creates a <CODE>QueueSession</CODE> object.
*
* @param transacted indicates whether the session is transacted
* @param acknowledgeMode indicates whether the consumer or the client will acknowledge any messages it receives;
* ignored if the session is transacted. Legal values are <code>Session.AUTO_ACKNOWLEDGE</code>,
* <code>Session.CLIENT_ACKNOWLEDGE</code>, and <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
* @return a newly created queue session
* @throws JMSException if the <CODE>QueueConnection</CODE> object fails to create a session due to some internal
* error or lack of support for the specific transaction and acknowledgement mode.
* @see Session#AUTO_ACKNOWLEDGE
* @see Session#CLIENT_ACKNOWLEDGE
* @see Session#DUPS_OK_ACKNOWLEDGE
*/
public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
return new ActiveMQQueueSession((ActiveMQSession)createSession(transacted, acknowledgeMode));
}
/**
* Creates a connection consumer for this connection (optional operation). This is an expert facility not used by
* regular JMS clients.
*
* @param queue the queue to access
* @param messageSelector only messages with properties matching the message selector expression are delivered. A
* value of null or an empty string indicates that there is no message selector for the message consumer.
* @param sessionPool the server session pool to associate with this connection consumer
* @param maxMessages the maximum number of messages that can be assigned to a server session at one time
* @return the connection consumer
* @throws JMSException if the <CODE>QueueConnection</CODE> object fails to create a connection consumer due to
* some internal error or invalid arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
* @throws InvalidDestinationException if an invalid queue is specified.
* @throws InvalidSelectorException if the message selector is invalid.
* @see javax.jms.ConnectionConsumer
*/
public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector,
ServerSessionPool sessionPool, int maxMessages) throws JMSException {
checkClosed();
ensureClientIDInitialised();
ConsumerInfo info = new ConsumerInfo();
info.setId(this.packetIdGenerator.generateId());
info.setConsumerId(this.consumerIdGenerator.generateId());
info.setDestination(ActiveMQMessageTransformation.transformDestination(queue));
info.setSelector(messageSelector);
return new ActiveMQConnectionConsumer(this, sessionPool, info, maxMessages);
}
/**
* Ensures that the clientID was manually specified and not auto-generated. If the clientID was not specified this
* method will throw an exception. This method is used to ensure that the clientID + durableSubscriber name are used
* correctly.
*
* @throws JMSException
*/
public void checkClientIDWasManuallySpecified() throws JMSException {
if (!userSpecifiedClientID) {
throw new JMSException("You cannot create a durable subscriber without specifying a unique clientID on a Connection");
}
}
/**
* handle disconnect/reconnect events
*
* @param event
*/
public void statusChanged(TransportStatusEvent event) {
log.info("channel status changed: " + event);
if (event.getChannelStatus() == TransportStatusEvent.RECONNECTED) {
isTransportOK = true;
doReconnect();
}
else if (event.getChannelStatus() == TransportStatusEvent.DISCONNECTED) {
isTransportOK = false;
clearMessagesInProgress();
}
}
/**
* send a Packet through the Connection - for internal use only
*
* @param packet
* @throws JMSException
*/
public void asyncSendPacket(Packet packet) throws JMSException {
asyncSendPacket(packet, true);
}
/**
* send a Packet through the Connection - for internal use only
*
* @param packet
* @param doSendWhileReconnecting
* @throws JMSException
*/
public void asyncSendPacket(Packet packet, boolean doSendWhileReconnecting) throws JMSException {
if (isTransportOK && !closed.get() && (doSendWhileReconnecting || transportChannel.isTransportConnected())) {
packet.setReceiptRequired(false);
if (packet.isJMSMessage() && flowControlSleepTime > 0) {
try {
Thread.sleep(flowControlSleepTime);
}
catch (InterruptedException e) {
}
}
this.transportChannel.asyncSend(packet);
}
}
/**
* send a Packet through a Connection - for internal use only
*
* @param packet
* @throws JMSException
*/
public void syncSendPacket(Packet packet) throws JMSException {
syncSendPacket(packet, 0);
}
/**
* Send a packet through a Connection - for internal use only
*
* @param packet
* @param timeout
* @throws JMSException
*/
public void syncSendPacket(Packet packet, int timeout) throws JMSException {
if (isTransportOK && !closed.get()) {
Receipt receipt;
packet.setReceiptRequired(true);
receipt = this.transportChannel.send(packet, timeout);
if (receipt != null) {
if (receipt.isFailed()) {
Throwable e = receipt.getException();
if (e != null) {
throw JMSExceptionHelper.newJMSException(e);
}
throw new JMSException("syncSendPacket failed with unknown exception");
}
}
}
else {
throw new JMSException("syncSendTimedOut");
}
}
/**
* send a Packet and get a receipt
* @param packet
* @return
* @throws JMSException
*/
// Properties
//-------------------------------------------------------------------------
/**
* @return Returns the prefetchPolicy.
*/
public ActiveMQPrefetchPolicy getPrefetchPolicy() {
return prefetchPolicy;
}
/**
* @param prefetchPolicy The prefetchPolicy to set.
*/
public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
this.prefetchPolicy = prefetchPolicy;
}
public int getSendCloseTimeout() {
return sendCloseTimeout;
}
public void setSendCloseTimeout(int sendCloseTimeout) {
this.sendCloseTimeout = sendCloseTimeout;
}
public int getSendConnectionInfoTimeout() {
return sendConnectionInfoTimeout;
}
public void setSendConnectionInfoTimeout(int sendConnectionInfoTimeout) {
this.sendConnectionInfoTimeout = sendConnectionInfoTimeout;
}
public Receipt syncSendRequest(Packet packet) throws JMSException {
checkClosed();
if (isTransportOK && !closed.get()) {
Receipt receipt;
packet.setReceiptRequired(true);
if (packet.getId() == null || packet.getId().length() == 0) {
packet.setId(this.packetIdGenerator.generateId());
}
receipt = this.transportChannel.send(packet);
if (receipt != null && receipt.isFailed()) {
Throwable e = receipt.getException();
if (e != null) {
throw (JMSException) new JMSException(e.getMessage()).initCause(e);
}
throw new JMSException("syncSendPacket failed with unknown exception");
}
return receipt;
}
else {
throw new JMSException("Connection closed.");
}
}
public TransportChannel getTransportChannel() {
return transportChannel;
}
/**
* Returns the clientID of the connection, forcing one to be generated if one has not yet been configured
*/
public String getInitializedClientID() throws JMSException {
ensureClientIDInitialised();
return this.clientID;
}
// Implementation methods
//-------------------------------------------------------------------------
/**
* Used internally for adding Sessions to the Connection
*
* @param session
* @throws JMSException
*/
protected void addSession(ActiveMQSession session) throws JMSException {
this.sessions.add(session);
addMessageDispatcher(session);
if (started.get()) {
session.start();
}
SessionInfo info = createSessionInfo(session);
info.setStarted(true);
asyncSendPacket(info);
}
/**
* Used interanlly for removing Sessions from a Connection
*
* @param session
* @throws JMSException
*/
protected void removeSession(ActiveMQSession session) throws JMSException {
this.sessions.remove(session);
removeMessageDispatcher(session);
SessionInfo info = createSessionInfo(session);
info.setStarted(false);
asyncSendPacket(info, false);
}
private SessionInfo createSessionInfo(ActiveMQSession session) {
SessionInfo info = new SessionInfo();
info.setId(packetIdGenerator.generateId());
info.setClientId(clientID);
info.setSessionId(session.getSessionId());
info.setStartTime(session.getStartTime());
return info;
}
/**
* Add a ConnectionConsumer
*
* @param connectionConsumer
* @throws JMSException
*/
protected void addConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
this.connectionConsumers.add(connectionConsumer);
addMessageDispatcher(connectionConsumer);
}
/**
* Remove a ConnectionConsumer
*
* @param connectionConsumer
*/
protected void removeConnectionConsumer(ActiveMQConnectionConsumer connectionConsumer) {
this.connectionConsumers.add(connectionConsumer);
removeMessageDispatcher(connectionConsumer);
}
/**
* Add a Message dispatcher to receive messages from the Broker
*
* @param messageDispatch
* @throws JMSException if an internal error
*/
protected void addMessageDispatcher(ActiveMQMessageDispatcher messageDispatch) throws JMSException {
this.messageDispatchers.add(messageDispatch);
}
/**
* Remove a Message dispatcher
*
* @param messageDispatcher
*/
protected void removeMessageDispatcher(ActiveMQMessageDispatcher messageDispatcher) {
this.messageDispatchers.remove(messageDispatcher);
}
/**
* Used for handling async exceptions
*
* @param jmsEx
*/
protected void handleAsyncException(JMSException jmsEx) {
if (this.exceptionListener != null) {
this.exceptionListener.onException(jmsEx);
}
else {
log.warn("async exception with no exception listener", jmsEx);
}
}
protected void sendConnectionInfoToBroker() throws JMSException {
sendConnectionInfoToBroker(sendConnectionInfoTimeout, closed.get());
}
/**
* Send the ConnectionInfo to the Broker
*
* @param timeout
* @param isClosed
* @throws JMSException
*/
protected void sendConnectionInfoToBroker(int timeout, boolean forceResend) throws JMSException {
// Can we skip sending the ConnectionInfo packet??
if (isConnectionInfoSentToBroker && !forceResend ) {
return;
}
this.isConnectionInfoSentToBroker = true;
ensureClientIDInitialised();
ConnectionInfo info = new ConnectionInfo();
info.setClientId(this.clientID);
info.setHostName(IdGenerator.getHostName());
info.setUserName(userName);
info.setPassword(password);
info.setId(packetIdGenerator.generateId());
info.setStartTime(startTime);
info.setStarted(started.get());
info.setClosed(closed.get());
info.setClientVersion(connectionMetaData.getProviderVersion());
info.setWireFormatVersion(transportChannel.getCurrentWireFormatVersion());
syncSendPacket(info, timeout);
}
/**
* Set the maximum amount of memory this Connection should use for buffered inbound messages
*
* @param newMemoryLimit the new memory limit in bytes
*/
public void setConnectionMemoryLimit(int newMemoryLimit) {
boundedQueueManager.setValueLimit(newMemoryLimit);
}
/**
* Get the current value for the maximum amount of memory this Connection should use for buffered inbound messages
*
* @return the current limit in bytes
*/
public int getConnectionMemoryLimit() {
return (int) boundedQueueManager.getValueLimit();
}
/**
* CapacityMonitorEventListener implementation called when the capacity of a CapacityService changes
*
* @param event
*/
public void capacityChanged(CapacityMonitorEvent event) {
//send the event to broker ...
CapacityInfo info = new CapacityInfo();
info.setId(packetIdGenerator.generateId());
info.setResourceName(event.getMonitorName());
info.setCapacity(event.getCapacity());
//System.out.println("Cap changed: " + event);
try {
asyncSendPacket(info, false);
}
catch (JMSException e) {
JMSException jmsEx = new JMSException("failed to send change in capacity");
jmsEx.setLinkedException(e);
handleAsyncException(jmsEx);
}
}
/**
* @return a number unique for this connection
*/
protected int getNextConsumerNumber() {
return this.consumerNumberGenerator.increment();
}
protected String generateSessionId() {
return this.sessionIdGenerator.generateId();
}
protected synchronized void ensureClientIDInitialised() {
if (this.clientID == null || this.clientID.trim().equals("")) {
this.clientID = this.clientIdGenerator.generateId();
}
transportChannel.setClientID(clientID);
this.clientIDSet = true;
}
protected MemoryBoundedQueue getMemoryBoundedQueue(String name) {
return boundedQueueManager.getMemoryBoundedQueue(name);
}
protected void doReconnect() {
try {
//send the Connection info again
sendConnectionInfoToBroker(sendConnectionInfoTimeout, true);
for (Iterator iter = sessions.iterator(); iter.hasNext();) {
ActiveMQSession session = (ActiveMQSession) iter.next();
SessionInfo sessionInfo = createSessionInfo(session);
sessionInfo.setStarted(true);
asyncSendPacket(sessionInfo, false);
//send consumers
for (Iterator consumersIterator = session.consumers.iterator(); consumersIterator.hasNext();) {
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) consumersIterator.next();
ConsumerInfo consumerInfo = session.createConsumerInfo(consumer);
consumerInfo.setStarted(true);
asyncSendPacket(consumerInfo, false);
}
//send producers
for (Iterator producersIterator = session.producers.iterator(); producersIterator.hasNext();) {
ActiveMQMessageProducer producer = (ActiveMQMessageProducer) producersIterator.next();
ProducerInfo producerInfo = session.createProducerInfo(producer);
producerInfo.setStarted(true);
asyncSendPacket(producerInfo, false);
}
//send the current capacity
CapacityMonitorEvent event = boundedQueueManager.generateCapacityMonitorEvent();
if (event != null){
capacityChanged(event);
}
}
}
catch (JMSException jmsEx) {
log.error("Failed to do reconnection");
handleAsyncException(jmsEx);
isTransportOK = false;
}
}
/**
* @return Returns the useAsyncSend.
*/
public boolean isUseAsyncSend() {
return useAsyncSend;
}
/**
* @param useAsyncSend The useAsyncSend to set.
*/
public void setUseAsyncSend(boolean useAsyncSend) {
this.useAsyncSend = useAsyncSend;
}
/**
* @return Returns the j2EEcompliant.
*/
public boolean isJ2EEcompliant() {
return J2EEcompliant;
}
/**
* @param ecompliant The j2EEcompliant to set.
*/
public void setJ2EEcompliant(boolean ecompliant) {
J2EEcompliant = ecompliant;
}
protected void clearMessagesInProgress() {
for (Iterator i = sessions.iterator(); i.hasNext();) {
ActiveMQSession session = (ActiveMQSession) i.next();
session.clearMessagesInProgress();
}
}
}