Package org.activemq.service.impl

Source Code of org.activemq.service.impl.TransactionManagerImpl

/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.activemq.service.impl;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import javax.jms.JMSException;
import javax.transaction.xa.XAException;

import org.activemq.broker.Broker;
import org.activemq.broker.BrokerClient;
import org.activemq.message.ActiveMQMessage;
import org.activemq.message.ActiveMQXid;
import org.activemq.message.MessageAck;
import org.activemq.service.Transaction;
import org.activemq.service.TransactionManager;
import org.activemq.store.TransactionStore;
import org.activemq.store.TransactionStore.RecoveryListener;
import org.activemq.util.JMSExceptionHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;

/**
* @version $Revision: 1.1.1.1 $
*/
public class TransactionManagerImpl extends TransactionManager {
    private static final Log log = LogFactory.getLog(TransactionManagerImpl.class);

    // the broker on which transactions operate
    private Broker broker;
    // The prepared XA transactions.
    private TransactionStore transactionStore;
    // Maps clients to the txids that they created.
    private Map activeClients = new ConcurrentHashMap();
    // Maps txids to ActiveMQTransactions
    private Map localTxs = new ConcurrentHashMap();
    // Maps txids to ActiveMQTransactions
    private Map xaTxs = new ConcurrentHashMap();

    public TransactionManagerImpl(Broker broker, TransactionStore transactionStore) {
        this.transactionStore = transactionStore;
        this.broker = broker;
    }

    /**
     * @see org.activemq.service.TransactionManager#createLocalTransaction(org.activemq.broker.BrokerClient, String)
     */
    public Transaction createLocalTransaction(final BrokerClient client, final String txid) throws JMSException {
        AbstractTransaction t = new LocalTransactionCommand(localTxs, txid, transactionStore);
        localTxs.put(txid, t);
        return t;
    }

    /**
     * @see org.activemq.service.TransactionManager#createXATransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid)
     */
    public Transaction createXATransaction(final BrokerClient client, final ActiveMQXid xid) throws XAException {
     
      // The xa transaction may allready be running.
      Transaction tx = (Transaction) xaTxs.get(xid);
      if( tx == null ) {
          if(log.isDebugEnabled())
            log.debug("XA Transaction started: "+xid);
            tx = new XATransactionCommand(xid, xaTxs, transactionStore);
            xaTxs.put(xid, tx);
      }       
        return tx;
       
    }

    /**
     * @see org.activemq.service.TransactionManager#getLocalTransaction(String)
     */
    public Transaction getLocalTransaction(String txid) throws JMSException {
        Transaction tx = (Transaction) localTxs.get(txid);
        if (tx == null) {
            throw new JMSException("Transaction '" + txid
                    + "' has not been started.");
        }
        return tx;
    }

    /**
     * @see org.activemq.service.TransactionManager#getXATransaction(org.activemq.message.ActiveMQXid)
     */
    public Transaction getXATransaction(ActiveMQXid xid) throws XAException {
        Transaction tx = (Transaction) xaTxs.get(xid);
        if (tx == null) {
            XAException e = new XAException("Transaction '" + xid + "' has not been started.");
            e.errorCode = XAException.XAER_NOTA;
            throw e;
        }
        return tx;
    }

    /**
     * @see org.activemq.service.TransactionManager#getPreparedXATransactions()
     */
    public ActiveMQXid[] getPreparedXATransactions() throws XAException {
        ArrayList txs = new ArrayList(xaTxs.size());
        for (Iterator iter = xaTxs.keySet().iterator(); iter.hasNext();) {
            ActiveMQXid tx = (ActiveMQXid) iter.next();
            txs.add(tx);
        }
        ActiveMQXid rc[] = new ActiveMQXid[txs.size()];
        txs.toArray(rc);
        return rc;
    }


    /**
     * @see org.activemq.service.TransactionManager#cleanUpClient(org.activemq.broker.BrokerClient)
     */
    public void cleanUpClient(BrokerClient client) throws JMSException {
        // HRC: I don't think we need to keep track of the client's open transactions here...
        // It seems like BrokerClientImpl.close() allready cleans up open transactions.
        //
        List list = (List) activeClients.remove(client);
        if (list != null) {
            for (int i = 0; i < list.size(); i++) {
                try {
                    Object o = list.get(i);
                    if (o instanceof String) {
                        Transaction t = this.getLocalTransaction((String) o);
                        t.rollback();
                    }
                    else {
                        Transaction t = this.getXATransaction((ActiveMQXid) o);
                        t.rollback();
                    }
                }
                catch (Exception e) {
                    log.warn("ERROR Rolling back disconnected client's transactions: ", e);
                }
            }
            list.clear();
        }
    }

    /**
     * @see org.activemq.service.TransactionManager#recover(org.activemq.service.Transaction)
     */
    public void recover(Transaction transaction) {
        // first lets associate any transient data structurs with the
        // transaction which has recently been loaded from disk
        if (transaction instanceof XATransactionCommand) {
            XATransactionCommand xaTransaction = (XATransactionCommand) transaction;
            xaTransaction.initialise(xaTxs, transactionStore);
            xaTxs.put(transaction.getTransactionId(), transaction);
        }
    }

    public void start() throws JMSException {
        transactionStore.start();
        try {
            transactionStore.recover(new RecoveryListener(){
                public void recover(ActiveMQXid xid, ActiveMQMessage[] addedMessages, MessageAck[] aks) throws JMSException, XAException {
                    Transaction transaction = createXATransaction(null, xid);                   
                    for (int i = 0; i < addedMessages.length; i++) {
                        broker.sendMessage(null, addedMessages[i]);
                    }
                    for (int i = 0; i < aks.length; i++) {
                        broker.acknowledgeMessage(null, aks[i]);                   
                    }
                    transaction.prepare();
                }
            });
        } catch (XAException e) {
            throw JMSExceptionHelper.newJMSException("Recovery Failed: "+e.getMessage(), e);
        }
    }

    public void stop() throws JMSException {
        transactionStore.stop();
    }

}
TOP

Related Classes of org.activemq.service.impl.TransactionManagerImpl

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.