/**
*
* Copyright 2004 Hiram Chirino
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.codehaus.activemq.ra;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Iterator;
import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.XASession;
import javax.resource.ResourceException;
import javax.resource.spi.ConnectionEvent;
import javax.resource.spi.ConnectionEventListener;
import javax.resource.spi.ConnectionRequestInfo;
import javax.resource.spi.LocalTransaction;
import javax.resource.spi.ManagedConnection;
import javax.resource.spi.ManagedConnectionMetaData;
import javax.security.auth.Subject;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.ActiveMQSession;
/**
* ActiveMQManagedConnection maps to real physical connection to the
* server. Since a ManagedConnection has to provide a transaction
* managment interface to the physical connection, and sessions
* are the objects implement transaction managment interfaces in
* the JMS API, this object also maps to a singe physical JMS session.
* <p/>
* The side-effect is that JMS connection the application gets
* will allways create the same session object. This is good if
* running in an app server since the sessions are elisted in the
* context transaction. This is bad if used outside of an app
* server since the user may be trying to create 2 different
* sessions to coordinate 2 different uow.
*
* @version $Revision: 1.14 $
*/
public class ActiveMQManagedConnection implements ManagedConnection {
private static final Log log = LogFactory.getLog(ActiveMQManagedConnection.class);
private PrintWriter logWriter;
private Subject subject;
private ActiveMQConnectionRequestInfo info;
private ArrayList listeners = new ArrayList();
private Connection physicalConnection;
private Session physicalSession;
private ArrayList proxyConnections = new ArrayList();
private XAResource xaresource=null;
public Connection getPhysicalConnection() {
return physicalConnection;
}
public Session getPhysicalSession() {
return physicalSession;
}
public ActiveMQManagedConnection(Subject subject, ActiveMQResourceAdapter adapter, ActiveMQConnectionRequestInfo info) throws ResourceException {
this.subject = subject;
this.info = info;
physicalConnection = adapter.getPhysicalConnection();
createSession();
}
private void createSession() throws ResourceException {
try {
physicalSession = physicalConnection
.createSession(true, Session.SESSION_TRANSACTED);
if (physicalSession instanceof ActiveMQSession) {
ActiveMQSession session = (ActiveMQSession) physicalSession;
LocalTransactionEventListener l = createLocalTransactionEventListener();
session.setLocalTransactionEventListener(l);
}
else {
log.trace("Cannot register LocalTransactionEventLister on non-ActiveMQ session");
}
if (physicalSession instanceof XASession) {
xaresource = ((XASession)physicalSession).getXAResource();
} else {
xaresource=null;
}
}
catch (JMSException e) {
throw new ResourceException("Could not create a new session.", e);
}
}
/**
* @return
*/
private LocalTransactionEventListener createLocalTransactionEventListener() {
return new LocalTransactionEventListener() {
public void beginEvent() {
ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.LOCAL_TRANSACTION_STARTED);
Iterator iterator = listeners.iterator();
while (iterator.hasNext()) {
ConnectionEventListener l = (ConnectionEventListener) iterator
.next();
l.localTransactionStarted(event);
}
}
public void commitEvent() {
ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.LOCAL_TRANSACTION_COMMITTED);
Iterator iterator = listeners.iterator();
while (iterator.hasNext()) {
ConnectionEventListener l = (ConnectionEventListener) iterator
.next();
l.localTransactionCommitted(event);
}
}
public void rollbackEvent() {
ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.LOCAL_TRANSACTION_ROLLEDBACK);
Iterator iterator = listeners.iterator();
while (iterator.hasNext()) {
ConnectionEventListener l = (ConnectionEventListener) iterator
.next();
l.localTransactionRolledback(event);
}
}
};
}
/**
* @see javax.resource.spi.ManagedConnection#getConnection(javax.security.auth.Subject,
* javax.resource.spi.ConnectionRequestInfo)
*/
public Object getConnection(Subject subject, ConnectionRequestInfo info)
throws ResourceException {
JMSConnectionProxy proxy = new JMSConnectionProxy(this);
proxyConnections.add(proxy);
return proxy;
}
private boolean isDestroyed() {
return physicalConnection == null;
}
/**
* Close down the physical connection to the server.
*
* @see javax.resource.spi.ManagedConnection#destroy()
*/
public void destroy() throws ResourceException {
// Have we allready been destroyed??
if (isDestroyed()) {
return;
}
cleanup();
try {
physicalSession.close();
physicalConnection = null;
}
catch (JMSException e) {
log.info("Error occured during close of a JMS connection.", e);
}
}
/**
* Cleans up all proxy handles attached to this physical connection so that
* they cannot be used anymore.
*
* @see javax.resource.spi.ManagedConnection#cleanup()
*/
public void cleanup() throws ResourceException {
// Have we allready been destroyed??
if (isDestroyed()) {
return;
}
Iterator iterator = proxyConnections.iterator();
while (iterator.hasNext()) {
JMSConnectionProxy proxy = (JMSConnectionProxy) iterator.next();
proxy.cleanup();
iterator.remove();
}
// closing and creating the session is sure way to clean up all state
// the client may have left around when using the session.
try {
physicalSession.close();
physicalSession=null;
} catch (JMSException e) {
throw new ResourceException("Could close the JMS session.", e);
}
// Create the new session.
createSession();
}
/**
* @see javax.resource.spi.ManagedConnection#associateConnection(java.lang.Object)
*/
public void associateConnection(Object connection) throws ResourceException {
throw new ResourceException("Not supported.");
}
/**
* @see javax.resource.spi.ManagedConnection#addConnectionEventListener(javax.resource.spi.ConnectionEventListener)
*/
public void addConnectionEventListener(ConnectionEventListener listener) {
listeners.add(listener);
}
/**
* @see javax.resource.spi.ManagedConnection#removeConnectionEventListener(javax.resource.spi.ConnectionEventListener)
*/
public void removeConnectionEventListener(ConnectionEventListener listener) {
listeners.remove(listener);
}
/**
* @see javax.resource.spi.ManagedConnection#getXAResource()
*/
public XAResource getXAResource() throws ResourceException {
if( xaresource == null )
throw new ResourceException("This is not an XA connection.");
// We proxy the XAResource because the XAResource object chanages
// every time the managed connection is cleaned up.
return new XAResource() {
public void commit(Xid arg0, boolean arg1) throws XAException {
xaresource.commit(arg0, arg1);
}
public void end(Xid arg0, int arg1) throws XAException {
xaresource.end(arg0, arg1);
}
public void forget(Xid arg0) throws XAException {
xaresource.forget(arg0);
}
public int getTransactionTimeout() throws XAException {
return xaresource.getTransactionTimeout();
}
public boolean isSameRM(XAResource arg0) throws XAException {
return xaresource.isSameRM(arg0);
}
public int prepare(Xid arg0) throws XAException {
return xaresource.prepare(arg0);
}
public Xid[] recover(int arg0) throws XAException {
return xaresource.recover(arg0);
}
public void rollback(Xid arg0) throws XAException {
xaresource.rollback(arg0);
}
public boolean setTransactionTimeout(int arg0) throws XAException {
return xaresource.setTransactionTimeout(arg0);
}
public void start(Xid arg0, int arg1) throws XAException {
xaresource.start(arg0, arg1);
}
};
}
/**
* @see javax.resource.spi.ManagedConnection#getLocalTransaction()
*/
public LocalTransaction getLocalTransaction() throws ResourceException {
return new LocalTransaction() {
public void begin() {
// TODO: jms api does not have a begin...
// add a method to ActiveMQSession to allow for this.
}
public void commit() throws ResourceException {
try {
physicalSession.commit();
}
catch (JMSException e) {
throw new ResourceException("commit failed.", e);
}
}
public void rollback() throws ResourceException {
try {
physicalSession.rollback();
}
catch (JMSException e) {
throw new ResourceException("rollback failed.", e);
}
}
};
}
/**
* @see javax.resource.spi.ManagedConnection#getMetaData()
*/
public ManagedConnectionMetaData getMetaData() throws ResourceException {
return new ManagedConnectionMetaData() {
public String getEISProductName() throws ResourceException {
if (physicalConnection == null) {
throw new ResourceException("Not connected.");
}
try {
return physicalConnection.getMetaData()
.getJMSProviderName();
}
catch (JMSException e) {
throw new ResourceException("Error accessing provider.", e);
}
}
public String getEISProductVersion() throws ResourceException {
if (physicalConnection == null) {
throw new ResourceException("Not connected.");
}
try {
return physicalConnection.getMetaData()
.getProviderVersion();
}
catch (JMSException e) {
throw new ResourceException("Error accessing provider.", e);
}
}
public int getMaxConnections() throws ResourceException {
if (physicalConnection == null) {
throw new ResourceException("Not connected.");
}
return Integer.MAX_VALUE;
}
public String getUserName() throws ResourceException {
if (physicalConnection == null) {
throw new ResourceException("Not connected.");
}
try {
return physicalConnection.getClientID();
}
catch (JMSException e) {
throw new ResourceException("Error accessing provider.", e);
}
}
};
}
/**
* @see javax.resource.spi.ManagedConnection#setLogWriter(java.io.PrintWriter)
*/
public void setLogWriter(PrintWriter logWriter) throws ResourceException {
this.logWriter = logWriter;
}
/**
* @see javax.resource.spi.ManagedConnection#getLogWriter()
*/
public PrintWriter getLogWriter() throws ResourceException {
return logWriter;
}
/**
* @param subject
* @param info
* @return
*/
public boolean matches(Subject subject, ConnectionRequestInfo info) {
// Check to see if it is our info class
if (info == null) {
return false;
}
if (info.getClass() != ActiveMQConnectionRequestInfo.class) {
return false;
}
// Do the subjects match?
if (subject == null ^ this.subject == null) {
return false;
}
if (subject != null && !subject.equals(this.subject)) {
return false;
}
// Does the info match?
return info.equals(this.info);
}
/**
* When a proxy is closed this cleans up the proxy and notifys the
* ConnectionEventListeners that a connection closed.
*
* @param proxy
*/
public void proxyClosedEvent(JMSConnectionProxy proxy) {
proxyConnections.remove(proxy);
proxy.cleanup();
ConnectionEvent event = new ConnectionEvent(this,
ConnectionEvent.CONNECTION_CLOSED);
event.setConnectionHandle(proxy);
Iterator iterator = listeners.iterator();
while (iterator.hasNext()) {
ConnectionEventListener l = (ConnectionEventListener) iterator
.next();
l.connectionClosed(event);
}
}
}