/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.activemq;
import java.util.Iterator;
import java.util.Map;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.XAConnection;
import org.activemq.advisories.TempDestinationAdvisor;
import org.activemq.advisories.TempDestinationAdvisoryEvent;
import org.activemq.capacity.CapacityMonitorEvent;
import org.activemq.capacity.CapacityMonitorEventListener;
import org.activemq.filter.AndFilter;
import org.activemq.filter.Filter;
import org.activemq.filter.FilterFactory;
import org.activemq.filter.FilterFactoryImpl;
import org.activemq.filter.NoLocalFilter;
import org.activemq.io.util.ByteArrayCompression;
import org.activemq.io.util.ByteArrayFragmentation;
import org.activemq.io.util.MemoryBoundedObjectManager;
import org.activemq.io.util.MemoryBoundedQueue;
import org.activemq.io.util.MemoryBoundedQueueManager;
import org.activemq.management.JMSConnectionStatsImpl;
import org.activemq.management.JMSStatsImpl;
import org.activemq.management.StatsCapable;
import org.activemq.management.StatsImpl;
import org.activemq.message.ActiveMQDestination;
import org.activemq.message.ActiveMQMessage;
import org.activemq.message.ActiveMQObjectMessage;
import org.activemq.message.BrokerAdminCommand;
import org.activemq.message.CapacityInfo;
import org.activemq.message.CleanupConnectionInfo;
import org.activemq.message.ConnectionInfo;
import org.activemq.message.ConsumerInfo;
import org.activemq.message.Packet;
import org.activemq.message.PacketListener;
import org.activemq.message.ProducerInfo;
import org.activemq.message.Receipt;
import org.activemq.message.ResponseReceipt;
import org.activemq.message.SessionInfo;
import org.activemq.message.TransactionInfo;
import org.activemq.message.WireFormatInfo;
import org.activemq.message.XATransactionInfo;
import org.activemq.transport.TransportChannel;
import org.activemq.transport.TransportStatusEvent;
import org.activemq.transport.TransportStatusEventListener;
import org.activemq.util.IdGenerator;
import org.activemq.util.JMSExceptionHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
/**
* A <CODE>Connection</CODE> object is a client's active connection to its JMS
* provider. It typically allocates provider resources outside the Java virtual
* machine (JVM).
* <P>
* Connections support concurrent use.
* <P>
* A connection serves several purposes:
* <UL>
* <LI>It encapsulates an open connection with a JMS provider. It typically
* represents an open TCP/IP socket between a client and the service provider
* software.
* <LI>Its creation is where client authentication takes place.
* <LI>It can specify a unique client identifier.
* <LI>It provides a <CODE>ConnectionMetaData</CODE> object.
* <LI>It supports an optional <CODE>ExceptionListener</CODE> object.
* </UL>
* <P>
* Because the creation of a connection involves setting up authentication and
* communication, a connection is a relatively heavyweight object. Most clients
* will do all their messaging with a single connection. Other more advanced
* applications may use several connections. The JMS API does not architect a
* reason for using multiple connections; however, there may be operational
* reasons for doing so.
* <P>
* A JMS client typically creates a connection, one or more sessions, and a
* number of message producers and consumers. When a connection is created, it
* is in stopped mode. That means that no messages are being delivered.
* <P>
* It is typical to leave the connection in stopped mode until setup is complete
* (that is, until all message consumers have been created). At that point, the
* client calls the connection's <CODE>start</CODE> method, and messages begin
* arriving at the connection's consumers. This setup convention minimizes any
* client confusion that may result from asynchronous message delivery while the
* client is still in the process of setting itself up.
* <P>
* A connection can be started immediately, and the setup can be done
* afterwards. Clients that do this must be prepared to handle asynchronous
* message delivery while they are still in the process of setting up.
* <P>
* A message producer can send messages while a connection is stopped. <p/>This
* class is also a <CODE>TopicConnection </CODE>. A <CODE>TopicConnection</CODE>
* object is an active connection to a publish/subscribe JMS provider. A client
* uses a <CODE>TopicConnection</CODE> object to create one or more <CODE>TopicSession</CODE>
* objects for producing and consuming messages.
* <P>
* A <CODE>TopicConnection</CODE> can be used to create a <CODE>TopicSession</CODE>,
* from which specialized topic-related objects can be created. A more general,
* and recommended approach is to use the <CODE>Connection </CODE> object.
* <P>
* <p/><p/>This class is also a <CODE>QueueConnection</CODE>. A A <CODE>QueueConnection</CODE>
* object is an active connection to a point-to-point JMS provider. A client
* uses a <CODE>QueueConnection</CODE> object to create one or more <CODE>QueueSession</CODE>
* objects for producing and consuming messages.
* <P>
* A <CODE>QueueConnection</CODE> can be used to create a <CODE>QueueSession</CODE>,
* from which specialized queue-related objects can be created. A more general,
* and recommended, approach is to use the <CODE>Connection </CODE> object.
* <P>
* A <CODE>QueueConnection</CODE> cannot be used to create objects specific to
* the publish/subscribe domain. The <CODE>createDurableConnectionConsumer</CODE>
* method inherits from <CODE>Connection</CODE>, but must throw an <CODE>IllegalStateException</CODE>
* if used from <CODE>QueueConnection</CODE>. // *
*
* @version $Revision: 1.1.1.1 $
* @see javax.jms.Connection
* @see javax.jms.ConnectionFactory
* @see javax.jms.QueueConnection
* @see javax.jms.TopicConnection
* @see javax.jms.TopicConnectionFactory
* @see javax.jms.QueueConnection
* @see javax.jms.QueueConnectionFactory
*/
public class ActiveMQConnection implements Connection, PacketListener,
ExceptionListener, TopicConnection, QueueConnection, StatsCapable,
CapacityMonitorEventListener, TransportStatusEventListener, Closeable {
/**
* Default UserName for the Connection
*/
public static final String DEFAULT_USER = "defaultUser";
/**
* Default URL for the ActiveMQ Broker
*/
public static final String DEFAULT_BROKER_URL = "tcp://localhost:61616";
/**
* Default client URL. If using a message broker in a hub(s)/spoke
* architecture - use the DEFAULT_BROKER_URL
*
* @see ActiveMQConnection#DEFAULT_BROKER_URL
*/
public static final String DEFAULT_URL = "peer://development";
/**
* Default Password for the Connection
*/
public static final String DEFAULT_PASSWORD = "defaultPassword";
private static final Log log = LogFactory.getLog(ActiveMQConnection.class);
private static final int DEFAULT_CONNECTION_MEMORY_LIMIT = 10 * 1024 * 1024;
// properties
private ActiveMQConnectionFactory factory;
private String userName;
private String password;
protected String clientID;
private int sendCloseTimeout = 2000;
private TransportChannel transportChannel;
private ExceptionListener exceptionListener;
private ActiveMQPrefetchPolicy prefetchPolicy;
private JMSStatsImpl factoryStats;
private MemoryBoundedObjectManager memoryManager;
private MemoryBoundedQueueManager boundedQueueManager;
protected IdGenerator handleIdGenerator;
private IdGenerator clientIdGenerator;
protected IdGenerator packetIdGenerator;
private IdGenerator sessionIdGenerator;
private JMSConnectionStatsImpl stats;
// internal state
private CopyOnWriteArrayList sessions;
private CopyOnWriteArrayList messageDispatchers;
private CopyOnWriteArrayList connectionConsumers;
private SynchronizedInt consumerNumberGenerator;
private ActiveMQConnectionMetaData connectionMetaData;
private boolean closed;
private SynchronizedBoolean started;
private boolean clientIDSet;
private boolean isConnectionInfoSentToBroker;
private boolean isTransportOK;
private boolean startedTransport;
private long startTime;
private long flowControlSleepTime = 0;
private boolean quickClose;
private boolean internalConnection;// used for notifying that the
// connection is used for networks etc.
private boolean userSpecifiedClientID;
/**
* Should we use an async send for persistent non transacted messages ?
*/
protected boolean useAsyncSend = true;
private int sendConnectionInfoTimeout = 30000;
private boolean disableTimeStampsByDefault = false;
private boolean J2EEcompliant = true;
private boolean prepareMessageBodyOnSend = true;
private boolean copyMessageOnSend = true;
// compression and fragmentation variables
private boolean doMessageCompression = true;
private int messageCompressionLimit = ByteArrayCompression.DEFAULT_COMPRESSION_LIMIT;// data
// size
// above
// which
// compression
// will
// be
// used
private int messageCompressionLevel = ByteArrayCompression.DEFAULT_COMPRESSION_LEVEL;
private int messageCompressionStrategy = ByteArrayCompression.DEFAULT_COMPRESSION_STRATEGY;// default
// compression
// strategy
private boolean doMessageFragmentation = true;
private int messageFragmentationLimit = ByteArrayFragmentation.DEFAULT_FRAGMENTATION_LIMIT;
private boolean cachingEnabled = true;
private boolean optimizedMessageDispatch = false;
private CopyOnWriteArrayList transientConsumedRedeliverCache;
private FilterFactory filterFactory;
private Map tempDestinationMap;
private Map validDestinationsMap;
private String resourceManagerId;
/**
* A static helper method to create a new connection
*
* @return an ActiveMQConnection
* @throws JMSException
*/
public static ActiveMQConnection makeConnection() throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
return (ActiveMQConnection) factory.createConnection();
}
/**
* A static helper method to create a new connection
*
* @param uri
* @return and ActiveMQConnection
* @throws JMSException
*/
public static ActiveMQConnection makeConnection(String uri)
throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
return (ActiveMQConnection) factory.createConnection();
}
/**
* A static helper method to create a new connection
*
* @param user
* @param password
* @param uri
* @return an ActiveMQConnection
* @throws JMSException
*/
public static ActiveMQConnection makeConnection(String user,
String password, String uri) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user,
password, uri);
return (ActiveMQConnection) factory.createConnection();
}
/**
* Constructs a connection from an existing TransportChannel and
* user/password.
*
* @param factory
* @param theUserName
* the users name
* @param thePassword
* the password
* @param transportChannel
* the transport channel to communicate with the server
* @throws JMSException
*/
public ActiveMQConnection(ActiveMQConnectionFactory factory,
String theUserName, String thePassword,
TransportChannel transportChannel) throws JMSException {
this(factory, theUserName, thePassword);
this.transportChannel = transportChannel;
this.transportChannel.setPacketListener(this);
this.transportChannel.setExceptionListener(this);
this.transportChannel.addTransportStatusEventListener(this);
this.isTransportOK = true;
}
protected ActiveMQConnection(ActiveMQConnectionFactory factory,
String theUserName, String thePassword) {
this.factory = factory;
this.userName = theUserName;
this.password = thePassword;
this.clientIdGenerator = new IdGenerator();
this.packetIdGenerator = new IdGenerator();
this.handleIdGenerator = new IdGenerator();
this.sessionIdGenerator = new IdGenerator();
this.consumerNumberGenerator = new SynchronizedInt(0);
this.sessions = new CopyOnWriteArrayList();
this.messageDispatchers = new CopyOnWriteArrayList();
this.connectionConsumers = new CopyOnWriteArrayList();
this.connectionMetaData = new ActiveMQConnectionMetaData();
this.started = new SynchronizedBoolean(false);
this.startTime = System.currentTimeMillis();
this.prefetchPolicy = new ActiveMQPrefetchPolicy();
this.memoryManager = new MemoryBoundedObjectManager(clientID,
DEFAULT_CONNECTION_MEMORY_LIMIT);
this.boundedQueueManager = new MemoryBoundedQueueManager(memoryManager);
this.memoryManager.addCapacityEventListener(this);
boolean transactional = this instanceof XAConnection;
factoryStats = factory.getFactoryStats();
factoryStats.addConnection(this);
stats = new JMSConnectionStatsImpl(sessions, transactional);
this.transientConsumedRedeliverCache = new CopyOnWriteArrayList();
this.tempDestinationMap = new ConcurrentHashMap();
this.validDestinationsMap = new ConcurrentHashMap();
factory.onConnectionCreate(this);
}
/**
* @return statistics for this Connection
*/
public StatsImpl getStats() {
return stats;
}
/**
* @return a number unique for this connection
*/
public JMSConnectionStatsImpl getConnectionStats() {
return stats;
}
/**
* Creates a <CODE>Session</CODE> object.
*
* @param transacted
* indicates whether the session is transacted
* @param acknowledgeMode
* indicates whether the consumer or the client will acknowledge
* any messages it receives; ignored if the session is
* transacted. Legal values are
* <code>Session.AUTO_ACKNOWLEDGE</code>,
* <code>Session.CLIENT_ACKNOWLEDGE</code>, and
* <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
* @return a newly created session
* @throws JMSException
* if the <CODE>Connection</CODE> object fails to create a
* session due to some internal error or lack of support for the
* specific transaction and acknowledgement mode.
* @see Session#AUTO_ACKNOWLEDGE
* @see Session#CLIENT_ACKNOWLEDGE
* @see Session#DUPS_OK_ACKNOWLEDGE
* @since 1.1
*/
public Session createSession(boolean transacted, int acknowledgeMode)
throws JMSException {
checkClosed();
sendConnectionInfoToBroker();
return new ActiveMQSession(
this,
(transacted ? Session.SESSION_TRANSACTED
: (acknowledgeMode == Session.SESSION_TRANSACTED ? Session.AUTO_ACKNOWLEDGE
: acknowledgeMode)));
}
/**
* Creates a <CODE>Session</CODE> object.
*
* @param transacted
* indicates whether the session is transacted
* @param acknowledgeMode
* indicates whether the consumer or the client will acknowledge
* any messages it receives; ignored if the session is
* transacted. Legal values are
* <code>Session.AUTO_ACKNOWLEDGE</code>,
* <code>Session.CLIENT_ACKNOWLEDGE</code>, and
* <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
* @param optimizedDispatch
* @return a newly created session
* @throws JMSException
* if the <CODE>Connection</CODE> object fails to create a
* session due to some internal error or lack of support for the
* specific transaction and acknowledgement mode.
* @see Session#AUTO_ACKNOWLEDGE
* @see Session#CLIENT_ACKNOWLEDGE
* @see Session#DUPS_OK_ACKNOWLEDGE
* @since 1.1
*/
public Session createSession(boolean transacted, int acknowledgeMode,
boolean optimizedDispatch) throws JMSException {
checkClosed();
sendConnectionInfoToBroker();
return new ActiveMQSession(this,
(transacted ? Session.SESSION_TRANSACTED : acknowledgeMode),
optimizedDispatch);
}
/**
* Gets the client identifier for this connection.
* <P>
* This value is specific to the JMS provider. It is either preconfigured by
* an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
* dynamically by the application by calling the <code>setClientID</code>
* method.
*
* @return the unique client identifier
* @throws JMSException
* if the JMS provider fails to return the client ID for this
* connection due to some internal error.
*/
public String getClientID() throws JMSException {
checkClosed();
return this.clientID;
}
/**
* Sets the client identifier for this connection.
* <P>
* The preferred way to assign a JMS client's client identifier is for it to
* be configured in a client-specific <CODE>ConnectionFactory</CODE>
* object and transparently assigned to the <CODE>Connection</CODE> object
* it creates.
* <P>
* Alternatively, a client can set a connection's client identifier using a
* provider-specific value. The facility to set a connection's client
* identifier explicitly is not a mechanism for overriding the identifier
* that has been administratively configured. It is provided for the case
* where no administratively specified identifier exists. If one does exist,
* an attempt to change it by setting it must throw an <CODE>IllegalStateException</CODE>.
* If a client sets the client identifier explicitly, it must do so
* immediately after it creates the connection and before any other action
* on the connection is taken. After this point, setting the client
* identifier is a programming error that should throw an <CODE>IllegalStateException</CODE>.
* <P>
* The purpose of the client identifier is to associate a connection and its
* objects with a state maintained on behalf of the client by a provider.
* The only such state identified by the JMS API is that required to support
* durable subscriptions.
* <P>
* If another connection with the same <code>clientID</code> is already
* running when this method is called, the JMS provider should detect the
* duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
*
* @param newClientID
* the unique client identifier
* @throws JMSException
* if the JMS provider fails to set the client ID for this
* connection due to some internal error.
* @throws javax.jms.InvalidClientIDException
* if the JMS client specifies an invalid or duplicate client
* ID.
* @throws javax.jms.IllegalStateException
* if the JMS client attempts to set a connection's client ID at
* the wrong time or when it has been administratively
* configured.
*/
public void setClientID(String newClientID) throws JMSException {
if (this.clientIDSet) {
throw new IllegalStateException("The clientID has already been set");
}
if (this.isConnectionInfoSentToBroker) {
throw new IllegalStateException(
"Setting clientID on a used Connection is not allowed");
}
checkClosed();
this.clientID = newClientID;
this.userSpecifiedClientID = true;
ensureClientIDInitialised();
}
/**
* Gets the metadata for this connection.
*
* @return the connection metadata
* @throws JMSException
* if the JMS provider fails to get the connection metadata for
* this connection.
* @see javax.jms.ConnectionMetaData
*/
public ConnectionMetaData getMetaData() throws JMSException {
checkClosed();
return this.connectionMetaData;
}
/**
* Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
* every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
* associated with it.
*
* @return the <CODE>ExceptionListener</CODE> for this connection, or
* null. if no <CODE>ExceptionListener</CODE> is associated with
* this connection.
* @throws JMSException
* if the JMS provider fails to get the <CODE>ExceptionListener</CODE>
* for this connection.
* @see javax.jms.Connection#setExceptionListener(ExceptionListener)
*/
public ExceptionListener getExceptionListener() throws JMSException {
checkClosed();
return this.exceptionListener;
}
/**
* Sets an exception listener for this connection.
* <P>
* If a JMS provider detects a serious problem with a connection, it informs
* the connection's <CODE> ExceptionListener</CODE>, if one has been
* registered. It does this by calling the listener's <CODE>onException
* </CODE> method, passing it a <CODE>JMSException</CODE> object
* describing the problem.
* <P>
* An exception listener allows a client to be notified of a problem
* asynchronously. Some connections only consume messages, so they would
* have no other way to learn their connection has failed.
* <P>
* A connection serializes execution of its <CODE>ExceptionListener</CODE>.
* <P>
* A JMS provider should attempt to resolve connection problems itself
* before it notifies the client of them.
*
* @param listener
* the exception listener
* @throws JMSException
* if the JMS provider fails to set the exception listener for
* this connection.
*/
public void setExceptionListener(ExceptionListener listener)
throws JMSException {
checkClosed();
this.exceptionListener = listener;
this.transportChannel.setExceptionListener(listener);
}
/**
* Starts (or restarts) a connection's delivery of incoming messages. A call
* to <CODE>start</CODE> on a connection that has already been started is
* ignored.
*
* @throws JMSException
* if the JMS provider fails to start message delivery due to
* some internal error.
* @see javax.jms.Connection#stop()
*/
public void start() throws JMSException {
checkClosed();
if (started.commit(false, true)) {
// We have a change in connection info to send.
// send the Connection info again
sendConnectionInfoToBroker(sendConnectionInfoTimeout, true, false);
for (Iterator i = sessions.iterator(); i.hasNext();) {
ActiveMQSession s = (ActiveMQSession) i.next();
s.start();
}
}
}
/**
* @return true if this Connection is started
*/
protected boolean isStarted() {
return started.get();
}
/**
* Temporarily stops a connection's delivery of incoming messages. Delivery
* can be restarted using the connection's <CODE>start</CODE> method. When
* the connection is stopped, delivery to all the connection's message
* consumers is inhibited: synchronous receives block, and messages are not
* delivered to message listeners.
* <P>
* This call blocks until receives and/or message listeners in progress have
* completed.
* <P>
* Stopping a connection has no effect on its ability to send messages. A
* call to <CODE>stop</CODE> on a connection that has already been stopped
* is ignored.
* <P>
* A call to <CODE>stop</CODE> must not return until delivery of messages
* has paused. This means that a client can rely on the fact that none of
* its message listeners will be called and that all threads of control
* waiting for <CODE>receive</CODE> calls to return will not return with a
* message until the connection is restarted. The receive timers for a
* stopped connection continue to advance, so receives may time out while
* the connection is stopped.
* <P>
* If message listeners are running when <CODE>stop</CODE> is invoked, the
* <CODE>stop</CODE> call must wait until all of them have returned before
* it may return. While these message listeners are completing, they must
* have the full services of the connection available to them.
*
* @throws JMSException
* if the JMS provider fails to stop message delivery due to
* some internal error.
* @see javax.jms.Connection#start()
*/
public void stop() throws JMSException {
checkClosed();
if (started.commit(true, false)) {
for (Iterator i = sessions.iterator(); i.hasNext();) {
ActiveMQSession s = (ActiveMQSession) i.next();
s.stop();
}
sendConnectionInfoToBroker(2000, true, false);
}
}
/**
* Closes the connection.
* <P>
* Since a provider typically allocates significant resources outside the
* JVM on behalf of a connection, clients should close these resources when
* they are not needed. Relying on garbage collection to eventually reclaim
* these resources may not be timely enough.
* <P>
* There is no need to close the sessions, producers, and consumers of a
* closed connection.
* <P>
* Closing a connection causes all temporary destinations to be deleted.
* <P>
* When this method is invoked, it should not return until message
* processing has been shut down in an orderly fashion. This means that all
* message listeners that may have been running have returned, and that all
* pending receives have returned. A close terminates all pending message
* receives on the connection's sessions' consumers. The receives may return
* with a message or with null, depending on whether there was a message
* available at the time of the close. If one or more of the connection's
* sessions' message listeners is processing a message at the time when
* connection <CODE>close</CODE> is invoked, all the facilities of the
* connection and its sessions must remain available to those listeners
* until they return control to the JMS provider.
* <P>
* Closing a connection causes any of its sessions' transactions in progress
* to be rolled back. In the case where a session's work is coordinated by
* an external transaction manager, a session's <CODE>commit</CODE> and
* <CODE> rollback</CODE> methods are not used and the result of a closed
* session's work is determined later by the transaction manager. Closing a
* connection does NOT force an acknowledgment of client-acknowledged
* sessions.
* <P>
* Invoking the <CODE>acknowledge</CODE> method of a received message from
* a closed connection's session must throw an <CODE>IllegalStateException</CODE>.
* Closing a closed connection must NOT throw an exception.
*
* @throws JMSException
* if the JMS provider fails to close the connection due to some
* internal error. For example, a failure to release resources
* or to close a socket connection can cause this exception to
* be thrown.
*/
public void close() throws JMSException {
this.transportChannel.setPendingStop(true);
synchronized (this) {
if (!closed) {
memoryManager.removeCapacityEventListener(this);
try {
closeTemporaryDestinations();
for (Iterator i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = (ActiveMQSession) i.next();
s.close();
}
for (Iterator i = this.connectionConsumers.iterator(); i
.hasNext();) {
ActiveMQConnectionConsumer c = (ActiveMQConnectionConsumer) i
.next();
c.close();
}
try {
sendConnectionInfoToBroker(sendCloseTimeout, true, true);
} catch (TimeoutExpiredException e) {
log
.warn("Failed to send close to broker, timeout expired of: "
+ sendCloseTimeout + " millis");
}
this.connectionConsumers.clear();
this.messageDispatchers.clear();
this.transportChannel.stop();
} finally {
this.sessions.clear();
started.set(false);
factory.onConnectionClose(this);
}
closed = true;
transientConsumedRedeliverCache.clear();
validDestinationsMap.clear();
}
}
}
/**
* Tells the broker to terminate its VM. This can be used to cleanly terminate a broker running in
* a standalone java process. Server must have property enable.vm.shutdown=true defined
* to allow this to work.
*/
public void terminateBrokerVM() throws JMSException {
BrokerAdminCommand command = new BrokerAdminCommand();
command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
asyncSendPacket(command);
}
/**
* simply throws an exception if the Connection is already closed
*
* @throws JMSException
*/
protected synchronized void checkClosed() throws JMSException {
if (!startedTransport) {
startedTransport = true;
this.transportChannel.setCachingEnabled(isCachingEnabled());
if (useAsyncSend == false) {
this.transportChannel.setNoDelay(true);
}
this.transportChannel.setUsedInternally(internalConnection);
this.transportChannel.start();
if (transportChannel.doesSupportWireFormatVersioning()) {
WireFormatInfo info = new WireFormatInfo();
info.setVersion(transportChannel.getCurrentWireFormatVersion());
this.asyncSendPacket(info);
}
}
if (this.closed) {
throw new ConnectionClosedException();
}
}
/**
* Creates a connection consumer for this connection (optional operation).
* This is an expert facility not used by regular JMS clients.
*
* @param destination
* the destination to access
* @param messageSelector
* only messages with properties matching the message selector
* expression are delivered. A value of null or an empty string
* indicates that there is no message selector for the message
* consumer.
* @param sessionPool
* the server session pool to associate with this connection
* consumer
* @param maxMessages
* the maximum number of messages that can be assigned to a
* server session at one time
* @return the connection consumer
* @throws JMSException
* if the <CODE>Connection</CODE> object fails to create a
* connection consumer due to some internal error or invalid
* arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
* @throws javax.jms.InvalidDestinationException
* if an invalid destination is specified.
* @throws javax.jms.InvalidSelectorException
* if the message selector is invalid.
* @see javax.jms.ConnectionConsumer
* @since 1.1
*/
public ConnectionConsumer createConnectionConsumer(Destination destination,
String messageSelector, ServerSessionPool sessionPool,
int maxMessages) throws JMSException {
checkClosed();
ensureClientIDInitialised();
ConsumerInfo info = new ConsumerInfo();
info.setConsumerId(handleIdGenerator.generateId());
info.setDestination(ActiveMQMessageTransformation
.transformDestination(destination));
info.setSelector(messageSelector);
info.setConsumerNo(handleIdGenerator.getNextShortSequence());
return new ActiveMQConnectionConsumer(this, sessionPool, info,
maxMessages);
}
/**
* Creates a connection consumer for this connection (optional operation).
* This is an expert facility not used by regular JMS clients.
*
* @param destination
* the destination to access
* @param messageSelector
* only messages with properties matching the message selector
* expression are delivered. A value of null or an empty string
* indicates that there is no message selector for the message
* consumer.
* @param sessionPool
* the server session pool to associate with this connection
* consumer
* @param maxMessages
* the maximum number of messages that can be assigned to a
* server session at one time
* @param noLocal
* set true if you want to filter out messages published locally
*
* @return the connection consumer
* @throws JMSException
* if the <CODE>Connection</CODE> object fails to create a
* connection consumer due to some internal error or invalid
* arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
* @throws javax.jms.InvalidDestinationException
* if an invalid destination is specified.
* @throws javax.jms.InvalidSelectorException
* if the message selector is invalid.
* @see javax.jms.ConnectionConsumer
* @since 1.1
*/
public ConnectionConsumer createConnectionConsumer(Destination destination,
String messageSelector, ServerSessionPool sessionPool,
int maxMessages, boolean noLocal) throws JMSException {
checkClosed();
ensureClientIDInitialised();
ConsumerInfo info = new ConsumerInfo();
info.setConsumerId(handleIdGenerator.generateId());
info.setDestination(ActiveMQMessageTransformation
.transformDestination(destination));
info.setSelector(messageSelector);
info.setConsumerNo(handleIdGenerator.getNextShortSequence());
info.setNoLocal(noLocal);
return new ActiveMQConnectionConsumer(this, sessionPool, info,
maxMessages);
}
/**
* Create a durable connection consumer for this connection (optional
* operation). This is an expert facility not used by regular JMS clients.
*
* @param topic
* topic to access
* @param subscriptionName
* durable subscription name
* @param messageSelector
* only messages with properties matching the message selector
* expression are delivered. A value of null or an empty string
* indicates that there is no message selector for the message
* consumer.
* @param sessionPool
* the server session pool to associate with this durable
* connection consumer
* @param maxMessages
* the maximum number of messages that can be assigned to a
* server session at one time
* @return the durable connection consumer
* @throws JMSException
* if the <CODE>Connection</CODE> object fails to create a
* connection consumer due to some internal error or invalid
* arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
* @throws javax.jms.InvalidDestinationException
* if an invalid destination is specified.
* @throws javax.jms.InvalidSelectorException
* if the message selector is invalid.
* @see javax.jms.ConnectionConsumer
* @since 1.1
*/
public ConnectionConsumer createDurableConnectionConsumer(Topic topic,
String subscriptionName, String messageSelector,
ServerSessionPool sessionPool, int maxMessages) throws JMSException {
checkClosed();
ensureClientIDInitialised();
ConsumerInfo info = new ConsumerInfo();
info.setConsumerId(this.handleIdGenerator.generateId());
info.setDestination(ActiveMQMessageTransformation
.transformDestination(topic));
info.setSelector(messageSelector);
info.setConsumerName(subscriptionName);
info.setConsumerNo(handleIdGenerator.getNextShortSequence());
return new ActiveMQConnectionConsumer(this, sessionPool, info,
maxMessages);
}
/**
* Create a durable connection consumer for this connection (optional
* operation). This is an expert facility not used by regular JMS clients.
*
* @param topic
* topic to access
* @param subscriptionName
* durable subscription name
* @param messageSelector
* only messages with properties matching the message selector
* expression are delivered. A value of null or an empty string
* indicates that there is no message selector for the message
* consumer.
* @param sessionPool
* the server session pool to associate with this durable
* connection consumer
* @param maxMessages
* the maximum number of messages that can be assigned to a
* server session at one time
* @param noLocal
* set true if you want to filter out messages published locally
*
* @return the durable connection consumer
* @throws JMSException
* if the <CODE>Connection</CODE> object fails to create a
* connection consumer due to some internal error or invalid
* arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
* @throws javax.jms.InvalidDestinationException
* if an invalid destination is specified.
* @throws javax.jms.InvalidSelectorException
* if the message selector is invalid.
* @see javax.jms.ConnectionConsumer
* @since 1.1
*/
public ConnectionConsumer createDurableConnectionConsumer(Topic topic,
String subscriptionName, String messageSelector,
ServerSessionPool sessionPool, int maxMessages, boolean noLocal) throws JMSException {
checkClosed();
ensureClientIDInitialised();
ConsumerInfo info = new ConsumerInfo();
info.setConsumerId(this.handleIdGenerator.generateId());
info.setDestination(ActiveMQMessageTransformation
.transformDestination(topic));
info.setSelector(messageSelector);
info.setConsumerName(subscriptionName);
info.setNoLocal(noLocal);
info.setConsumerNo(handleIdGenerator.getNextShortSequence());
return new ActiveMQConnectionConsumer(this, sessionPool, info,
maxMessages);
}
/**
* Implementation of the PacketListener interface - consume a packet
*
* @param packet -
* the Packet to consume
* @see org.activemq.message.PacketListener#consume(org.activemq.message.Packet)
*/
public void consume(Packet packet) {
if (!closed && packet != null) {
if (packet.isJMSMessage()) {
ActiveMQMessage message = (ActiveMQMessage) packet;
message.setReadOnly(true);
message.setConsumerIdentifer(clientID);
// lets check for expired messages which is only relevant for
// multicast based stuff
// as a pointcast based network should filter out this stuff
if (transportChannel.isMulticast()) {
long expiration = message.getJMSExpiration();
if (expiration > 0) {
long timeStamp = System.currentTimeMillis();
if (timeStamp > expiration) {
if (log.isDebugEnabled()) {
log.debug("Discarding expired message: "
+ message);
}
return;
}
}
}
try {
int count = 0;
for (Iterator i = this.messageDispatchers.iterator(); i
.hasNext();) {
ActiveMQMessageDispatcher dispatcher = (ActiveMQMessageDispatcher) i
.next();
if (dispatcher.isTarget(message)) {
if (count > 0) {
// separate message for each Session etc.
message = message.deepCopy();
}
dispatcher.dispatch(message);
count++;
}
}
} catch (JMSException jmsEx) {
handleAsyncException(jmsEx);
}
} else if (packet.getPacketType() == Packet.CAPACITY_INFO) {
CapacityInfo info = (CapacityInfo) packet;
flowControlSleepTime = info.getFlowControlTimeout();
// System.out.println("SET FLOW TIMEOUT = " +
// flowControlSleepTime + " FOR " + info);
} else if (packet.getPacketType() == Packet.KEEP_ALIVE
&& packet.isReceiptRequired()) {
Receipt receipt = new Receipt();
receipt.setCorrelationId(packet.getId());
receipt.setReceiptRequired(false);
try {
asyncSendPacket(receipt);
} catch (JMSException jmsEx) {
handleAsyncException(jmsEx);
}
}
}
}
/**
* @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
*/
public void onException(JMSException jmsEx) {
// Got an exception propagated up from the transport channel
handleAsyncException(jmsEx);
isTransportOK = false;
try {
close();
} catch (JMSException ex) {
log.debug("Exception closing the connection", ex);
}
}
/**
* Creates a <CODE>TopicSession</CODE> object.
*
* @param transacted
* indicates whether the session is transacted
* @param acknowledgeMode
* indicates whether the consumer or the client will acknowledge
* any messages it receives; ignored if the session is
* transacted. Legal values are
* <code>Session.AUTO_ACKNOWLEDGE</code>,
* <code>Session.CLIENT_ACKNOWLEDGE</code>, and
* <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
* @return a newly created topic session
* @throws JMSException
* if the <CODE>TopicConnection</CODE> object fails to create
* a session due to some internal error or lack of support for
* the specific transaction and acknowledgement mode.
* @see Session#AUTO_ACKNOWLEDGE
* @see Session#CLIENT_ACKNOWLEDGE
* @see Session#DUPS_OK_ACKNOWLEDGE
*/
public TopicSession createTopicSession(boolean transacted,
int acknowledgeMode) throws JMSException {
return new ActiveMQTopicSession((ActiveMQSession) createSession(
transacted, acknowledgeMode));
}
/**
* Creates a connection consumer for this connection (optional operation).
* This is an expert facility not used by regular JMS clients.
*
* @param topic
* the topic to access
* @param messageSelector
* only messages with properties matching the message selector
* expression are delivered. A value of null or an empty string
* indicates that there is no message selector for the message
* consumer.
* @param sessionPool
* the server session pool to associate with this connection
* consumer
* @param maxMessages
* the maximum number of messages that can be assigned to a
* server session at one time
* @return the connection consumer
* @throws JMSException
* if the <CODE>TopicConnection</CODE> object fails to create
* a connection consumer due to some internal error or invalid
* arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
* @throws InvalidDestinationException
* if an invalid topic is specified.
* @throws InvalidSelectorException
* if the message selector is invalid.
* @see javax.jms.ConnectionConsumer
*/
public ConnectionConsumer createConnectionConsumer(Topic topic,
String messageSelector, ServerSessionPool sessionPool,
int maxMessages) throws JMSException {
checkClosed();
ensureClientIDInitialised();
ConsumerInfo info = new ConsumerInfo();
info.setConsumerId(this.handleIdGenerator.generateId());
info.setDestination(ActiveMQMessageTransformation
.transformDestination(topic));
info.setSelector(messageSelector);
info.setConsumerNo(handleIdGenerator.getNextShortSequence());
return new ActiveMQConnectionConsumer(this, sessionPool, info,
maxMessages);
}
/**
* Creates a <CODE>QueueSession</CODE> object.
*
* @param transacted
* indicates whether the session is transacted
* @param acknowledgeMode
* indicates whether the consumer or the client will acknowledge
* any messages it receives; ignored if the session is
* transacted. Legal values are
* <code>Session.AUTO_ACKNOWLEDGE</code>,
* <code>Session.CLIENT_ACKNOWLEDGE</code>, and
* <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
* @return a newly created queue session
* @throws JMSException
* if the <CODE>QueueConnection</CODE> object fails to create
* a session due to some internal error or lack of support for
* the specific transaction and acknowledgement mode.
* @see Session#AUTO_ACKNOWLEDGE
* @see Session#CLIENT_ACKNOWLEDGE
* @see Session#DUPS_OK_ACKNOWLEDGE
*/
public QueueSession createQueueSession(boolean transacted,
int acknowledgeMode) throws JMSException {
return new ActiveMQQueueSession((ActiveMQSession) createSession(
transacted, acknowledgeMode));
}
/**
* Creates a connection consumer for this connection (optional operation).
* This is an expert facility not used by regular JMS clients.
*
* @param queue
* the queue to access
* @param messageSelector
* only messages with properties matching the message selector
* expression are delivered. A value of null or an empty string
* indicates that there is no message selector for the message
* consumer.
* @param sessionPool
* the server session pool to associate with this connection
* consumer
* @param maxMessages
* the maximum number of messages that can be assigned to a
* server session at one time
* @return the connection consumer
* @throws JMSException
* if the <CODE>QueueConnection</CODE> object fails to create
* a connection consumer due to some internal error or invalid
* arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
* @throws InvalidDestinationException
* if an invalid queue is specified.
* @throws InvalidSelectorException
* if the message selector is invalid.
* @see javax.jms.ConnectionConsumer
*/
public ConnectionConsumer createConnectionConsumer(Queue queue,
String messageSelector, ServerSessionPool sessionPool,
int maxMessages) throws JMSException {
checkClosed();
ensureClientIDInitialised();
ConsumerInfo info = new ConsumerInfo();
info.setConsumerId(this.handleIdGenerator.generateId());
info.setDestination(ActiveMQMessageTransformation
.transformDestination(queue));
info.setSelector(messageSelector);
info.setConsumerNo(handleIdGenerator.getNextShortSequence());
return new ActiveMQConnectionConsumer(this, sessionPool, info,
maxMessages);
}
/**
* Ensures that the clientID was manually specified and not auto-generated.
* If the clientID was not specified this method will throw an exception.
* This method is used to ensure that the clientID + durableSubscriber name
* are used correctly.
*
* @throws JMSException
*/
public void checkClientIDWasManuallySpecified() throws JMSException {
if (!userSpecifiedClientID) {
throw new JMSException(
"You cannot create a durable subscriber without specifying a unique clientID on a Connection");
}
}
/**
* handle disconnect/reconnect events
*
* @param event
*/
public void statusChanged(TransportStatusEvent event) {
log.info("channel status changed: " + event);
if (event.getChannelStatus() == TransportStatusEvent.RECONNECTED) {
isTransportOK = true;
doReconnect();
} else if (event.getChannelStatus() == TransportStatusEvent.DISCONNECTED) {
isTransportOK = false;
clearMessagesInProgress();
}
}
/**
* send a Packet through the Connection - for internal use only
*
* @param packet
* @throws JMSException
*/
public void asyncSendPacket(Packet packet) throws JMSException {
asyncSendPacket(packet, true);
}
/**
* send a Packet through the Connection - for internal use only
*
* @param packet
* @param doSendWhileReconnecting
* @throws JMSException
*/
public void asyncSendPacket(Packet packet, boolean doSendWhileReconnecting)
throws JMSException {
if (isTransportOK
&& !closed
&& (doSendWhileReconnecting || transportChannel
.isTransportConnected())) {
packet.setId(packetIdGenerator.getNextShortSequence());
packet.setReceiptRequired(false);
if (packet.isJMSMessage() && flowControlSleepTime > 0) {
try {
Thread.sleep(flowControlSleepTime);
} catch (InterruptedException e) {
}
}
this.transportChannel.asyncSend(packet);
}
}
/**
* send a Packet through a Connection - for internal use only
*
* @param packet
* @throws JMSException
*/
public void syncSendPacket(Packet packet) throws JMSException {
syncSendPacket(packet, 0);
}
/**
* Send a packet through a Connection - for internal use only
*
* @param packet
* @param timeout
* @throws JMSException
*/
public void syncSendPacket(Packet packet, int timeout) throws JMSException {
if (isTransportOK && !closed) {
Receipt receipt;
packet.setId(packetIdGenerator.getNextShortSequence());
packet.setReceiptRequired(true);
receipt = this.transportChannel.send(packet, timeout);
if (receipt != null) {
if (receipt.isFailed()) {
Throwable e = receipt.getException();
if (e != null) {
throw JMSExceptionHelper.newJMSException(e);
}
throw new JMSException(
"syncSendPacket failed with unknown exception");
}
}
} else {
if (closed) {
throw new ConnectionClosedException();
} else {
throw new JMSException(
"syncSendTimedOut: connection no longer OK");
}
}
}
public Receipt syncSendRequest(Packet packet) throws JMSException {
checkClosed();
if (isTransportOK && !closed) {
Receipt receipt;
packet.setReceiptRequired(true);
packet.setId(this.packetIdGenerator.getNextShortSequence());
receipt = this.transportChannel.send(packet);
if (receipt != null && receipt.isFailed()) {
Throwable e = receipt.getException();
if (e != null) {
throw (JMSException) new JMSException(e.getMessage())
.initCause(e);
}
throw new JMSException(
"syncSendPacket failed with unknown exception");
}
return receipt;
} else {
if (closed) {
throw new ConnectionClosedException();
} else {
throw new JMSException(
"syncSendTimedOut: connection no longer OK");
}
}
}
// Properties
// -------------------------------------------------------------------------
/**
* @return Returns the prefetchPolicy.
*/
public ActiveMQPrefetchPolicy getPrefetchPolicy() {
return prefetchPolicy;
}
/**
* @param prefetchPolicy
* The prefetchPolicy to set.
*/
public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
this.prefetchPolicy = prefetchPolicy;
}
public int getSendCloseTimeout() {
return sendCloseTimeout;
}
public void setSendCloseTimeout(int sendCloseTimeout) {
this.sendCloseTimeout = sendCloseTimeout;
}
public int getSendConnectionInfoTimeout() {
return sendConnectionInfoTimeout;
}
public void setSendConnectionInfoTimeout(int sendConnectionInfoTimeout) {
this.sendConnectionInfoTimeout = sendConnectionInfoTimeout;
}
public TransportChannel getTransportChannel() {
return transportChannel;
}
/**
* Returns the clientID of the connection, forcing one to be generated if
* one has not yet been configured
*/
public String getInitializedClientID() throws JMSException {
ensureClientIDInitialised();
return this.clientID;
}
// Implementation methods
// -------------------------------------------------------------------------
/**
* Used internally for adding Sessions to the Connection
*
* @param session
* @throws JMSException
*/
protected void addSession(ActiveMQSession session) throws JMSException {
this.sessions.add(session);
addMessageDispatcher(session);
if (started.get()) {
session.start();
}
SessionInfo info = createSessionInfo(session);
info.setStarted(true);
asyncSendPacket(info);
}
/**
* Used interanlly for removing Sessions from a Connection
*
* @param session
* @throws JMSException
*/
protected void removeSession(ActiveMQSession session) throws JMSException {
this.sessions.remove(session);
removeMessageDispatcher(session);
SessionInfo info = createSessionInfo(session);
info.setStarted(false);
asyncSendPacket(info, false);
}
private SessionInfo createSessionInfo(ActiveMQSession session) {
SessionInfo info = new SessionInfo();
info.setClientId(clientID);
info.setSessionId(session.getSessionId());
info.setStartTime(session.getStartTime());
return info;
}
/**
* Add a ConnectionConsumer
*
* @param connectionConsumer
* @throws JMSException
*/
protected void addConnectionConsumer(
ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
this.connectionConsumers.add(connectionConsumer);
addMessageDispatcher(connectionConsumer);
}
/**
* Remove a ConnectionConsumer
*
* @param connectionConsumer
*/
protected void removeConnectionConsumer(
ActiveMQConnectionConsumer connectionConsumer) {
this.connectionConsumers.add(connectionConsumer);
removeMessageDispatcher(connectionConsumer);
}
/**
* Add a Message dispatcher to receive messages from the Broker
*
* @param messageDispatch
* @throws JMSException
* if an internal error
*/
protected void addMessageDispatcher(
ActiveMQMessageDispatcher messageDispatch) throws JMSException {
this.messageDispatchers.add(messageDispatch);
}
/**
* Remove a Message dispatcher
*
* @param messageDispatcher
*/
protected void removeMessageDispatcher(
ActiveMQMessageDispatcher messageDispatcher) {
this.messageDispatchers.remove(messageDispatcher);
}
/**
* Used for handling async exceptions
*
* @param jmsEx
*/
protected void handleAsyncException(JMSException jmsEx) {
if (!closed) {
if (this.exceptionListener != null) {
this.exceptionListener.onException(jmsEx);
} else {
log.warn(
"Async exception with no exception listener: " + jmsEx,
jmsEx);
}
}
}
protected void sendConnectionInfoToBroker() throws JMSException {
sendConnectionInfoToBroker(sendConnectionInfoTimeout, closed, false);
}
/**
* Send the ConnectionInfo to the Broker
*
* @param timeout
* @param isClosed
* @throws JMSException
*/
protected void sendConnectionInfoToBroker(int timeout, boolean forceResend,
boolean closing) throws JMSException {
// Can we skip sending the ConnectionInfo packet??
if (isConnectionInfoSentToBroker && !forceResend) {
return;
}
this.isConnectionInfoSentToBroker = true;
ensureClientIDInitialised();
ConnectionInfo info = new ConnectionInfo();
info.setClientId(this.clientID);
info.setHostName(IdGenerator.getHostName());
info.setUserName(userName);
info.setPassword(password);
info.setStartTime(startTime);
info.setStarted(started.get());
info.setClosed(closed || closing);
info.setClientVersion(connectionMetaData.getProviderVersion());
info.setWireFormatVersion(transportChannel
.getCurrentWireFormatVersion());
if (info.getProperties() != null) {
info.getProperties().setProperty(ConnectionInfo.NO_DELAY_PROPERTY,
new Boolean(!useAsyncSend).toString());
}
if (quickClose && info.isClosed()) {
asyncSendPacket(info);
} else {
syncSendPacket(info, timeout);
}
}
/**
* Set the maximum amount of memory this Connection should use for buffered
* inbound messages
*
* @param newMemoryLimit
* the new memory limit in bytes
*/
public void setConnectionMemoryLimit(int newMemoryLimit) {
memoryManager.setValueLimit(newMemoryLimit);
}
/**
* Get the current value for the maximum amount of memory this Connection
* should use for buffered inbound messages
*
* @return the current limit in bytes
*/
public int getConnectionMemoryLimit() {
return (int) memoryManager.getValueLimit();
}
/**
* CapacityMonitorEventListener implementation called when the capacity of a
* CapacityService changes
*
* @param event
*/
public void capacityChanged(CapacityMonitorEvent event) {
// send the event to broker ...
CapacityInfo info = new CapacityInfo();
info.setResourceName(event.getMonitorName());
info.setCapacity(event.getCapacity());
// System.out.println("Cap changed: " + event);
try {
asyncSendPacket(info, false);
} catch (JMSException e) {
JMSException jmsEx = new JMSException(
"failed to send change in capacity");
jmsEx.setLinkedException(e);
handleAsyncException(jmsEx);
}
}
/**
* @return a number unique for this connection
*/
protected int getNextConsumerNumber() {
return this.consumerNumberGenerator.increment();
}
protected short generateSessionId() {
return this.sessionIdGenerator.getNextShortSequence();
}
private synchronized void ensureClientIDInitialised() {
if (this.clientID == null || this.clientID.trim().equals("")) {
this.clientID = this.clientIdGenerator.generateId();
}
transportChannel.setClientID(clientID);
this.clientIDSet = true;
}
protected MemoryBoundedQueue getMemoryBoundedQueue(String name) {
return boundedQueueManager.getMemoryBoundedQueue(name);
}
protected void doReconnect() {
try {
// send the Connection info again
sendConnectionInfoToBroker(sendConnectionInfoTimeout, true, false);
for (Iterator iter = sessions.iterator(); iter.hasNext();) {
ActiveMQSession session = (ActiveMQSession) iter.next();
SessionInfo sessionInfo = createSessionInfo(session);
sessionInfo.setStarted(true);
asyncSendPacket(sessionInfo, false);
// send consumers
for (Iterator consumersIterator = session.consumers.iterator(); consumersIterator
.hasNext();) {
ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) consumersIterator
.next();
ConsumerInfo consumerInfo = session
.createConsumerInfo(consumer);
consumerInfo.setStarted(true);
asyncSendPacket(consumerInfo, false);
}
// send producers
for (Iterator producersIterator = session.producers.iterator(); producersIterator
.hasNext();) {
ActiveMQMessageProducer producer = (ActiveMQMessageProducer) producersIterator
.next();
ProducerInfo producerInfo = session
.createProducerInfo(producer);
producerInfo.setStarted(true);
asyncSendPacket(producerInfo, false);
}
// send the current capacity
CapacityMonitorEvent event = memoryManager
.generateCapacityMonitorEvent();
if (event != null) {
capacityChanged(event);
}
}
} catch (JMSException jmsEx) {
log.error("Failed to do reconnection");
handleAsyncException(jmsEx);
isTransportOK = false;
}
}
/**
* @return Returns the useAsyncSend.
*/
public boolean isUseAsyncSend() {
return useAsyncSend;
}
/**
* @param useAsyncSend
* The useAsyncSend to set.
*/
public void setUseAsyncSend(boolean useAsyncSend) {
this.useAsyncSend = useAsyncSend;
}
/**
* @return Returns the cachingEnabled.
*/
public boolean isCachingEnabled() {
return cachingEnabled;
}
/**
* @param cachingEnabled
* The cachingEnabled to set.
*/
public void setCachingEnabled(boolean cachingEnabled) {
this.cachingEnabled = cachingEnabled;
}
/**
* @return Returns the j2EEcompliant.
*/
public boolean isJ2EEcompliant() {
return J2EEcompliant;
}
/**
* @param ecompliant
* The j2EEcompliant to set.
*/
public void setJ2EEcompliant(boolean ecompliant) {
J2EEcompliant = ecompliant;
}
/**
* @return Returns the internalConnection.
*/
public boolean isInternalConnection() {
return internalConnection;
}
/**
* @param internalConnection
* The internalConnection to set.
*/
public void setInternalConnection(boolean internalConnection) {
this.internalConnection = internalConnection;
}
/**
* @return Returns the doMessageCompression.
*/
public boolean isDoMessageCompression() {
return doMessageCompression
&& transportChannel.doesSupportMessageCompression();
}
/**
* @param doMessageCompression
* The doMessageCompression to set.
*/
public void setDoMessageCompression(boolean doMessageCompression) {
this.doMessageCompression = doMessageCompression
&& transportChannel.doesSupportMessageCompression();
}
/**
* @return Returns the doMessageFragmentation.
*/
public boolean isDoMessageFragmentation() {
return doMessageFragmentation
&& transportChannel.doesSupportMessageFragmentation();
}
/**
* @param doMessageFragmentation
* The doMessageFragmentation to set.
*/
public void setDoMessageFragmentation(boolean doMessageFragmentation) {
this.doMessageFragmentation = doMessageFragmentation
&& transportChannel.doesSupportMessageFragmentation();
}
/**
* @return Returns the messageCompressionLevel.
*/
public int getMessageCompressionLevel() {
return messageCompressionLevel;
}
/**
* @param messageCompressionLevel
* The messageCompressionLevel to set.
*/
public void setMessageCompressionLevel(int messageCompressionLevel) {
this.messageCompressionLevel = messageCompressionLevel;
}
/**
* @return Returns the messageCompressionLimit.
*/
public int getMessageCompressionLimit() {
return messageCompressionLimit;
}
/**
* @param messageCompressionLimit
* The messageCompressionLimit to set.
*/
public void setMessageCompressionLimit(int messageCompressionLimit) {
this.messageCompressionLimit = messageCompressionLimit;
}
/**
* @return Returns the messageCompressionStrategy.
*/
public int getMessageCompressionStrategy() {
return messageCompressionStrategy;
}
/**
* @param messageCompressionStrategy
* The messageCompressionStrategy to set.
*/
public void setMessageCompressionStrategy(int messageCompressionStrategy) {
this.messageCompressionStrategy = messageCompressionStrategy;
}
/**
* @return Returns the messageFragmentationLimit.
*/
public int getMessageFragmentationLimit() {
return messageFragmentationLimit;
}
/**
* @param messageFragmentationLimit
* The messageFragmentationLimit to set.
*/
public void setMessageFragmentationLimit(int messageFragmentationLimit) {
this.messageFragmentationLimit = messageFragmentationLimit;
}
/**
* @return Returns the disableTimeStampsByDefault.
*/
public boolean isDisableTimeStampsByDefault() {
return disableTimeStampsByDefault;
}
/**
* @param disableTimeStampsByDefault
* The disableTimeStampsByDefault to set.
*/
public void setDisableTimeStampsByDefault(boolean disableTimeStampsByDefault) {
this.disableTimeStampsByDefault = disableTimeStampsByDefault;
}
/**
* Causes pre-serialization of messages before send By default this is on
*
* @return Returns the prePrepareMessageOnSend.
*/
public boolean isPrepareMessageBodyOnSend() {
return prepareMessageBodyOnSend;
}
/**
* Causes pre-serialization of messages before send By default this is on
*
* @param prePrepareMessageOnSend
* The prePrepareMessageOnSend to set.
*/
public void setPrepareMessageBodyOnSend(boolean prePrepareMessageOnSend) {
this.prepareMessageBodyOnSend = prePrepareMessageOnSend;
}
/**
* @return Returns the copyMessageOnSend.
*/
public boolean isCopyMessageOnSend() {
return copyMessageOnSend;
}
/**
* @param copyMessageOnSend
* The copyMessageOnSend to set.
*/
public void setCopyMessageOnSend(boolean copyMessageOnSend) {
this.copyMessageOnSend = copyMessageOnSend;
}
/**
* @return Returns the quickClose.
*/
public boolean isQuickClose() {
return quickClose;
}
/**
* @param quickClose
* The quickClose to set.
*/
public void setQuickClose(boolean quickClose) {
this.quickClose = quickClose;
}
/**
* @return Returns the optimizedMessageDispatch.
*/
public boolean isOptimizedMessageDispatch() {
return optimizedMessageDispatch;
}
/**
* @param optimizedMessageDispatch
* The optimizedMessageDispatch to set.
*/
public void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) {
this.optimizedMessageDispatch = optimizedMessageDispatch;
}
protected void clearMessagesInProgress() {
for (Iterator i = sessions.iterator(); i.hasNext();) {
ActiveMQSession session = (ActiveMQSession) i.next();
session.clearMessagesInProgress();
}
}
/**
* Tells the broker to destroy a destination.
*
* @param destination
*/
public void destroyDestination(ActiveMQDestination destination)
throws JMSException {
BrokerAdminCommand command = new BrokerAdminCommand();
command.setCommand(BrokerAdminCommand.DESTROY_DESTINATION);
command.setDestination(destination);
syncSendPacket(command);
}
/**
* Cleans up this connection so that it's state is as if the connection was
* just created. This allows the Resource Adapter to clean up a connection
* so that it can be reused without having to close and recreate the
* connection.
*
* @param sessionId
*/
public void cleanup() throws JMSException {
try {
for (Iterator i = this.sessions.iterator(); i.hasNext();) {
ActiveMQSession s = (ActiveMQSession) i.next();
s.close();
}
for (Iterator i = this.connectionConsumers.iterator(); i.hasNext();) {
ActiveMQConnectionConsumer c = (ActiveMQConnectionConsumer) i
.next();
c.close();
}
this.connectionConsumers.clear();
this.messageDispatchers.clear();
} finally {
this.sessions.clear();
started.set(false);
}
setExceptionListener(null);
clientIDSet = false;
isConnectionInfoSentToBroker = false;
CleanupConnectionInfo cleanupInfo = new CleanupConnectionInfo();
cleanupInfo.setClientId(getClientID());
asyncSendPacket(cleanupInfo);
}
/**
* Changes the associated username/password that is associated with this
* connection. If the connection has been used, you must called cleanup()
* before calling this method.
*
* @throws IllegalStateException
* if the connection is in used.
* @param sessionId
*/
public void changeUserInfo(String theUserName, String thePassword)
throws JMSException {
if (isConnectionInfoSentToBroker)
throw new IllegalStateException(
"changeUserInfo used Connection is not allowed");
this.userName = theUserName;
this.password = thePassword;
}
protected void addToTransientConsumedRedeliverCache(ActiveMQMessage message) {
transientConsumedRedeliverCache.add(message);
}
protected void replayTransientConsumedRedeliveredMessages(
ActiveMQSession session, ActiveMQMessageConsumer consumer)
throws JMSException {
if (consumer.getDestination().isTopic()
&& !transientConsumedRedeliverCache.isEmpty()) {
Filter filter = getFilterFactory().createFilter(
consumer.getDestination(), consumer.getMessageSelector());
if (consumer.isNoLocal()) {
filter = new AndFilter(filter, new NoLocalFilter(clientID));
}
for (Iterator i = transientConsumedRedeliverCache.iterator(); i
.hasNext();) {
ActiveMQMessage message = (ActiveMQMessage) i.next();
if (filter.matches(message)) {
transientConsumedRedeliverCache.remove(message);
message.setMessageAcknowledge(session);
message.setJMSRedelivered(true);
message.setConsumerNos(new int[] { consumer
.getConsumerNumber() });
consumer.processMessage(message);
}
}
}
}
private FilterFactory getFilterFactory() {
if (filterFactory == null) {
filterFactory = new FilterFactoryImpl();
}
return filterFactory;
}
protected void startTemporaryDestination(ActiveMQDestination dest)
throws JMSException {
if (dest != null && dest.isTemporary()) {
TempDestinationAdvisoryEvent event = (TempDestinationAdvisoryEvent) tempDestinationMap
.get(dest);
if (event == null) {
event = new TempDestinationAdvisoryEvent(dest, true);
tempDestinationMap.put(dest, event);
ActiveMQObjectMessage msg = new ActiveMQObjectMessage();
msg.setObject(event);
msg.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
msg.setJMSDestination(dest.getTopicForTempAdvisory());
msg.setJMSMessageID("ID:" + dest.getPhysicalName()
+ " .started");
this.syncSendPacket(msg);
}
}
}
protected void stopTemporaryDestination(ActiveMQDestination dest)
throws JMSException {
if (dest != null && dest.isTemporary()) {
TempDestinationAdvisoryEvent event = (TempDestinationAdvisoryEvent) tempDestinationMap
.remove(dest);
if (event != null) {
event.setStarted(false);
ActiveMQObjectMessage msg = new ActiveMQObjectMessage();
msg.setObject(event);
msg.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
msg.setJMSDestination(dest.getTopicForTempAdvisory());
msg.setJMSMessageID("ID:" + dest.getPhysicalName()
+ " .stopped");
this.syncSendPacket(msg);
}
}
}
protected void closeTemporaryDestinations() throws JMSException {
for (Iterator i = tempDestinationMap.keySet().iterator(); i.hasNext();) {
ActiveMQDestination dest = (ActiveMQDestination) i.next();
stopTemporaryDestination(dest);
}
}
protected void startAdvisoryForTempDestination(Destination d)
throws JMSException {
if (d != null) {
ActiveMQDestination dest = (ActiveMQDestination) ActiveMQMessageTransformation
.transformDestination(d);
if (dest.isTemporary()) {
TempDestinationAdvisor test = (TempDestinationAdvisor) validDestinationsMap
.get(dest);
if (test == null) {
test = new TempDestinationAdvisor(this, dest);
test.start();
validDestinationsMap.put(dest, test);
}
}
}
}
protected void stopAdvisoryForTempDestination(ActiveMQDestination d)
throws JMSException {
if (d != null) {
ActiveMQDestination dest = (ActiveMQDestination) ActiveMQMessageTransformation
.transformDestination(d);
if (dest.isTemporary()) {
TempDestinationAdvisor test = (TempDestinationAdvisor) validDestinationsMap
.remove(dest);
if (test != null) {
test.stop();
}
}
}
}
protected final void validateDestination(ActiveMQDestination dest)
throws JMSException {
if (dest != null) {
if (dest.isTemporary()) {
TempDestinationAdvisor test = (TempDestinationAdvisor) validDestinationsMap
.get(dest);
if (dest.isDeleted() || test == null || !test.isActive(dest)) {
throw new JMSException(
"Cannot publish to a deleted Destination: " + dest);
}
}
}
}
/**
* @return Returns the resourceManagerId.
* @throws JMSException
*/
synchronized public String getResourceManagerId() throws JMSException {
if (resourceManagerId == null) {
resourceManagerId = determineResourceManagerId();
}
return resourceManagerId;
}
/**
* Get's the resource manager id.
*/
private String determineResourceManagerId() throws JMSException {
XATransactionInfo info = new XATransactionInfo();
info.setType(TransactionInfo.GET_RM_ID);
ResponseReceipt receipt = (ResponseReceipt) syncSendRequest(info);
String rmId = (String) receipt.getResult();
assert rmId != null;
return rmId;
}
}