Package org.codehaus.activemq.ra

Source Code of org.codehaus.activemq.ra.ActiveMQManagedConnection

/**
*
* Copyright 2004 Hiram Chirino
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.codehaus.activemq.ra;

import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Iterator;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.XAConnection;
import javax.jms.XASession;
import javax.resource.ResourceException;
import javax.resource.spi.ConnectionEvent;
import javax.resource.spi.ConnectionEventListener;
import javax.resource.spi.ConnectionRequestInfo;
import javax.resource.spi.LocalTransaction;
import javax.resource.spi.ManagedConnection;
import javax.resource.spi.ManagedConnectionMetaData;
import javax.security.auth.Subject;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.ActiveMQConnection;
import org.codehaus.activemq.ActiveMQSession;
import org.codehaus.activemq.LocalTransactionEventListener;

/**
* ActiveMQManagedConnection maps to real physical connection to the
* server.  Since a ManagedConnection has to provide a transaction
* managment interface to the physical connection, and sessions
* are the objects implement transaction managment interfaces in
* the JMS API, this object also maps to a singe physical JMS session.
* <p/>
* The side-effect is that JMS connection the application gets
* will allways create the same session object.  This is good if
* running in an app server since the sessions are elisted in the
* context transaction.  This is bad if used outside of an app
* server since the user may be trying to create 2 different
* sessions to coordinate 2 different uow.
*
* @version $Revision: 1.7 $
*/
public class ActiveMQManagedConnection implements ManagedConnection {

    private static final Log log = LogFactory.getLog(ActiveMQManagedConnection.class);

    private PrintWriter logWriter;

    private Subject subject;
    private ActiveMQConnectionRequestInfo info;
    private ArrayList listeners = new ArrayList();
    private Connection physicalConnection;
    private ArrayList proxyConnections = new ArrayList();

    private Session physicalSession;
  private XAResource xaresource=null;
    private boolean usingXAConnection;

    public ActiveMQManagedConnection(Subject subject, ActiveMQResourceAdapter adapter, ActiveMQConnectionRequestInfo info) throws ResourceException {
        this.subject = subject;
    this.info = info;
        try {
      physicalConnection = adapter.makeConnection();
             usingXAConnection = physicalConnection instanceof XAConnection;
    } catch (JMSException e) {
            throw new ResourceException("Could not create a new connection: "+e.getMessage(), e);
        }         
    }

    public Connection getPhysicalConnection() {
        return physicalConnection;
    }

    public boolean isManagedTx() {
        return physicalSession!=null &&
            (  ((ActiveMQRASession)physicalSession).isInLocalTransaction() ||
               ((ActiveMQRASession)physicalSession).isXaTransacted()
            );
    }
   
    public Session getPhysicalSession() throws JMSException {
        if(physicalSession==null) {
            physicalSession=createSession();
        }
        return physicalSession;
    }
   
    public XAResource getPhysicalXAResource() throws XAException {
        if( !usingXAConnection )
            throw new XAException("Not using a JMS XA connection.");

        if( xaresource==null ) {
            try {
                Session session = getPhysicalSession();
                xaresource = ((XASession)session).getXAResource();
            } catch (JMSException e) {           
                throw (XAException)new XAException("Session not available.").initCause(e);
            }
        }
       
        return xaresource;
    }

    private Session createSession() throws JMSException {
        Session s = physicalConnection.createSession(true, Session.SESSION_TRANSACTED);
        if (s instanceof ActiveMQSession) {
            ActiveMQSession session = (ActiveMQSession) s;
            LocalTransactionEventListener l = createLocalTransactionEventListener();
            session.setLocalTransactionEventListener(l);
        }
        else {
            log.trace("Cannot register LocalTransactionEventLister on non-ActiveMQ session");
        }
                   
        return s;           
    }
   
    /**
     * @return
     */
    private LocalTransactionEventListener createLocalTransactionEventListener() {
        return new LocalTransactionEventListener() {

            public void beginEvent() {
                ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.LOCAL_TRANSACTION_STARTED);
                Iterator iterator = listeners.iterator();
                while (iterator.hasNext()) {
                    ConnectionEventListener l = (ConnectionEventListener) iterator
                            .next();
                    l.localTransactionStarted(event);
                }
            }

            public void commitEvent() {
                ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.LOCAL_TRANSACTION_COMMITTED);
                Iterator iterator = listeners.iterator();
                while (iterator.hasNext()) {
                    ConnectionEventListener l = (ConnectionEventListener) iterator
                            .next();
                    l.localTransactionCommitted(event);
                }
            }

            public void rollbackEvent() {
                ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this, ConnectionEvent.LOCAL_TRANSACTION_ROLLEDBACK);
                Iterator iterator = listeners.iterator();
                while (iterator.hasNext()) {
                    ConnectionEventListener l = (ConnectionEventListener) iterator
                            .next();
                    l.localTransactionRolledback(event);
                }
            }
        };
    }

    /**
     * @see javax.resource.spi.ManagedConnection#getConnection(javax.security.auth.Subject,
            *      javax.resource.spi.ConnectionRequestInfo)
     */
    public Object getConnection(Subject subject, ConnectionRequestInfo info)
            throws ResourceException {
        JMSConnectionProxy proxy = new JMSConnectionProxy(this);
        proxyConnections.add(proxy);
        return proxy;
    }

    private boolean isDestroyed() {
      return physicalConnection == null;
    }
   
    /**
     * Close down the physical connection to the server.
     *
     * @see javax.resource.spi.ManagedConnection#destroy()
     */
    public void destroy() throws ResourceException {
        // Have we allready been destroyed??
        if (isDestroyed()) {
            return;
        }

        cleanup();

        try {
          physicalConnection.close();
            physicalConnection = null;
        }
        catch (JMSException e) {
            log.info("Error occured during close of a JMS connection.", e);
        }
    }

    /**
     * Cleans up all proxy handles attached to this physical connection so that
     * they cannot be used anymore.
     *
     * @see javax.resource.spi.ManagedConnection#cleanup()
     */
    public void cleanup() throws ResourceException {
     
        // Have we allready been destroyed??
        if (isDestroyed()) {
            return;
        }

        Iterator iterator = proxyConnections.iterator();
        while (iterator.hasNext()) {
            JMSConnectionProxy proxy = (JMSConnectionProxy) iterator.next();
            proxy.cleanup();
            iterator.remove();
        }

        if( physicalConnection instanceof ActiveMQConnection ) {
           
            try {
                physicalSession=null;
                xaresource=null;
                ((ActiveMQConnection)physicalConnection).cleanup();
            } catch (JMSException e) {
                throw new ResourceException("Could cleanup the ActiveMQ connection: "+e, e);
            }
           
        } else {
           
            // Otherwise...           
            // closing and creating the session is sure way to clean up all state 
            // the client may have left around when using the session.
            if( physicalSession!=null ) {
                try {         
                    physicalSession.close();
                } catch (JMSException e) {
                    throw new ResourceException("Could close the JMS session.", e);
                }
            }
            physicalSession=null;
            xaresource=null;
   
            try {         
                physicalConnection.setExceptionListener(null);
                physicalConnection.stop();
            } catch (JMSException e) {
                throw new ResourceException("Could stop the JMS connection.", e);
            }
           
        }
    }

    /**
     * @see javax.resource.spi.ManagedConnection#associateConnection(java.lang.Object)
     */
    public void associateConnection(Object connection) throws ResourceException {
        throw new ResourceException("Not supported.");
    }

    /**
     * @see javax.resource.spi.ManagedConnection#addConnectionEventListener(javax.resource.spi.ConnectionEventListener)
     */
    public void addConnectionEventListener(ConnectionEventListener listener) {
        listeners.add(listener);
    }

    /**
     * @see javax.resource.spi.ManagedConnection#removeConnectionEventListener(javax.resource.spi.ConnectionEventListener)
     */
    public void removeConnectionEventListener(ConnectionEventListener listener) {
        listeners.remove(listener);
    }

    /**
     * @see javax.resource.spi.ManagedConnection#getXAResource()
     */
    public XAResource getXAResource() throws ResourceException {
          if( !usingXAConnection )
            throw new ResourceException("Not using a JMS XA connection.");
         
          // We proxy the XAResource because the XAResource object chanages
          // every time the managed connection is cleaned up.
          return new TxResourceProxy(){
                Session getSession() throws ResourceException {
                    try {
                        return getPhysicalSession();
                    } catch (JMSException e) {
                        throw new ResourceException("Could not create a new session: "+e.getMessage(), e);
                    }
                }
                XAResource getXAResource() throws XAException {
                    return getPhysicalXAResource();
                }
            };
    }

    /**
     * @see javax.resource.spi.ManagedConnection#getLocalTransaction()
     */
    public LocalTransaction getLocalTransaction() throws ResourceException {
      return new TxResourceProxy(){
            Session getSession() throws ResourceException {
                try {
                    return getPhysicalSession();
                } catch (JMSException e) {
                    throw new ResourceException("Could not create a new session: "+e.getMessage(), e);
                }
            }
            XAResource getXAResource() throws XAException {
                return getPhysicalXAResource();
            }
        };
    }

    /**
     * @see javax.resource.spi.ManagedConnection#getMetaData()
     */
    public ManagedConnectionMetaData getMetaData() throws ResourceException {
        return new ManagedConnectionMetaData() {

            public String getEISProductName() throws ResourceException {
                if (physicalConnection == null) {
                    throw new ResourceException("Not connected.");
                }
                try {
                    return physicalConnection.getMetaData()
                            .getJMSProviderName();
                }
                catch (JMSException e) {
                    throw new ResourceException("Error accessing provider.", e);
                }
            }

            public String getEISProductVersion() throws ResourceException {
                if (physicalConnection == null) {
                    throw new ResourceException("Not connected.");
                }
                try {
                    return physicalConnection.getMetaData()
                            .getProviderVersion();
                }
                catch (JMSException e) {
                    throw new ResourceException("Error accessing provider.", e);
                }
            }

            public int getMaxConnections() throws ResourceException {
                if (physicalConnection == null) {
                    throw new ResourceException("Not connected.");
                }
                return Integer.MAX_VALUE;
            }

            public String getUserName() throws ResourceException {
                if (physicalConnection == null) {
                    throw new ResourceException("Not connected.");
                }
                try {
                    return physicalConnection.getClientID();
                }
                catch (JMSException e) {
                    throw new ResourceException("Error accessing provider.", e);
                }
            }
        };
    }

    /**
     * @see javax.resource.spi.ManagedConnection#setLogWriter(java.io.PrintWriter)
     */
    public void setLogWriter(PrintWriter logWriter) throws ResourceException {
        this.logWriter = logWriter;
    }

    /**
     * @see javax.resource.spi.ManagedConnection#getLogWriter()
     */
    public PrintWriter getLogWriter() throws ResourceException {
        return logWriter;
    }

    /**
     * @param subject
     * @param info
     * @return
     */
    public boolean matches(Subject subject, ConnectionRequestInfo info) {

        // Check to see if it is our info class
        if (info == null) {
            return false;
        }
        if (info.getClass() != ActiveMQConnectionRequestInfo.class) {
            return false;
        }

        // Do the subjects match?
        if (subject == null ^ this.subject == null) {
            return false;
        }
        if (subject != null && !subject.equals(this.subject)) {
            return false;
        }

        // Does the info match?
        return info.equals(this.info);
    }

    /**
     * When a proxy is closed this cleans up the proxy and notifys the
     * ConnectionEventListeners that a connection closed.
     *
     * @param proxy
     */
    public void proxyClosedEvent(JMSConnectionProxy proxy) {
        proxyConnections.remove(proxy);
        proxy.cleanup();

        ConnectionEvent event = new ConnectionEvent(this,
                ConnectionEvent.CONNECTION_CLOSED);
        event.setConnectionHandle(proxy);
        Iterator iterator = listeners.iterator();
        while (iterator.hasNext()) {
            ConnectionEventListener l = (ConnectionEventListener) iterator
                    .next();
            l.connectionClosed(event);
        }
    }

}
TOP

Related Classes of org.codehaus.activemq.ra.ActiveMQManagedConnection

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.