/**
*
* Copyright 2004 Hiram Chirino
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.codehaus.activemq.ra;
import javax.jms.Connection;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueSession;
import javax.jms.ServerSessionPool;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import org.codehaus.activemq.ActiveMQQueueSession;
import org.codehaus.activemq.ActiveMQTopicSession;
import java.util.ArrayList;
import java.util.Iterator;
/**
* Acts as a pass through proxy for a JMS Connection object.
* It intercepts events that are of interest of the ActiveMQManagedConnection.
*
* @version $Revision: 1.6 $
*/
public class JMSConnectionProxy implements Connection, QueueConnection, TopicConnection {
private ActiveMQManagedConnection managedConnection;
private ArrayList sessions = new ArrayList();
public JMSConnectionProxy(ActiveMQManagedConnection managedConnection) {
this.managedConnection = managedConnection;
}
/**
* Used to let the ActiveMQManagedConnection that this connection
* handel is not needed by the app.
*
* @throws JMSException
*/
public void close() throws JMSException {
if( managedConnection!=null )
managedConnection.proxyClosedEvent(this);
}
/**
* Called by the ActiveMQManagedConnection to invalidate this proxy.
*/
public void cleanup() {
managedConnection = null;
for (Iterator iter = sessions.iterator(); iter.hasNext();) {
JMSSessionProxy p = (JMSSessionProxy) iter.next();
try {
p.cleanup();
} catch (JMSException ignore) {
}
iter.remove();
}
}
/**
*
*/
private Connection getConnection() throws JMSException {
if (managedConnection == null) {
throw new IllegalStateException("The Connection is closed");
}
return managedConnection.getPhysicalConnection();
}
/**
* @param transacted
* @param acknowledgeMode
* @return
* @throws JMSException
*/
public Session createSession(boolean transacted, int acknowledgeMode)
throws JMSException {
return createSessionProxy(transacted, acknowledgeMode);
}
/**
* @param acknowledgeMode
* @param transacted
* @return
* @throws JMSException
*/
private JMSSessionProxy createSessionProxy(boolean transacted, int acknowledgeMode) throws JMSException {
getConnection();
JMSSessionProxy p;
if( managedConnection.isManagedTx() ) {
ActiveMQRASession session = (ActiveMQRASession)managedConnection.getPhysicalSession();
session.setAutoCommit(!transacted);
session.setAcknowlegeMode((transacted ? Session.SESSION_TRANSACTED : acknowledgeMode));
p = new JMSSessionProxy(session, false);
} else {
Session session = getConnection().createSession(transacted, acknowledgeMode);
p = new JMSSessionProxy(session, true);
}
sessions.add(p);
return p;
}
/**
* @param transacted
* @param acknowledgeMode
* @return
* @throws JMSException
*/
public QueueSession createQueueSession(boolean transacted,
int acknowledgeMode) throws JMSException {
return new ActiveMQQueueSession(createSessionProxy(transacted, acknowledgeMode));
}
/**
* @param transacted
* @param acknowledgeMode
* @return
* @throws JMSException
*/
public TopicSession createTopicSession(boolean transacted,
int acknowledgeMode) throws JMSException {
return new ActiveMQTopicSession(createSessionProxy(transacted, acknowledgeMode));
}
/**
* @return
* @throws JMSException
*/
public String getClientID() throws JMSException {
return getConnection().getClientID();
}
/**
* @return
* @throws JMSException
*/
public ExceptionListener getExceptionListener() throws JMSException {
return getConnection().getExceptionListener();
}
/**
* @return
* @throws JMSException
*/
public ConnectionMetaData getMetaData() throws JMSException {
return getConnection().getMetaData();
}
/**
* @param clientID
* @throws JMSException
*/
public void setClientID(String clientID) throws JMSException {
getConnection().setClientID(clientID);
}
/**
* @param listener
* @throws JMSException
*/
public void setExceptionListener(ExceptionListener listener)
throws JMSException {
getConnection().setExceptionListener(listener);
}
/**
* @throws JMSException
*/
public void start() throws JMSException {
getConnection().start();
}
/**
* @throws JMSException
*/
public void stop() throws JMSException {
getConnection().stop();
}
/**
* @param queue
* @param messageSelector
* @param sessionPool
* @param maxMessages
* @return
* @throws JMSException
*/
public ConnectionConsumer createConnectionConsumer(Queue queue,
String messageSelector, ServerSessionPool sessionPool,
int maxMessages) throws JMSException {
throw new JMSException("Not Supported.");
}
/**
* @param topic
* @param messageSelector
* @param sessionPool
* @param maxMessages
* @return
* @throws JMSException
*/
public ConnectionConsumer createConnectionConsumer(Topic topic,
String messageSelector, ServerSessionPool sessionPool,
int maxMessages) throws JMSException {
throw new JMSException("Not Supported.");
}
/**
* @param destination
* @param messageSelector
* @param sessionPool
* @param maxMessages
* @return
* @throws JMSException
*/
public ConnectionConsumer createConnectionConsumer(Destination destination,
String messageSelector, ServerSessionPool sessionPool,
int maxMessages) throws JMSException {
throw new JMSException("Not Supported.");
}
/**
* @param topic
* @param subscriptionName
* @param messageSelector
* @param sessionPool
* @param maxMessages
* @return
* @throws JMSException
*/
public ConnectionConsumer createDurableConnectionConsumer(Topic topic,
String subscriptionName, String messageSelector,
ServerSessionPool sessionPool, int maxMessages) throws JMSException {
throw new JMSException("Not Supported.");
}
/**
* @return Returns the managedConnection.
*/
public ActiveMQManagedConnection getManagedConnection() {
return managedConnection;
}
}