/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.activemq.service.impl;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.jms.JMSException;
import javax.transaction.xa.XAException;
import org.activemq.broker.Broker;
import org.activemq.broker.BrokerClient;
import org.activemq.message.ActiveMQMessage;
import org.activemq.message.ActiveMQXid;
import org.activemq.message.MessageAck;
import org.activemq.service.Transaction;
import org.activemq.service.TransactionManager;
import org.activemq.store.TransactionStore;
import org.activemq.store.TransactionStore.RecoveryListener;
import org.activemq.util.JMSExceptionHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
/**
* @version $Revision: 1.1.1.1 $
*/
public class TransactionManagerImpl extends TransactionManager {
private static final Log log = LogFactory.getLog(TransactionManagerImpl.class);
// the broker on which transactions operate
private Broker broker;
// The prepared XA transactions.
private TransactionStore transactionStore;
// Maps clients to the txids that they created.
private Map activeClients = new ConcurrentHashMap();
// Maps txids to ActiveMQTransactions
private Map localTxs = new ConcurrentHashMap();
// Maps txids to ActiveMQTransactions
private Map xaTxs = new ConcurrentHashMap();
public TransactionManagerImpl(Broker broker, TransactionStore transactionStore) {
this.transactionStore = transactionStore;
this.broker = broker;
}
/**
* @see org.activemq.service.TransactionManager#createLocalTransaction(org.activemq.broker.BrokerClient, String)
*/
public Transaction createLocalTransaction(final BrokerClient client, final String txid) throws JMSException {
AbstractTransaction t = new LocalTransactionCommand(localTxs, txid, transactionStore);
localTxs.put(txid, t);
return t;
}
/**
* @see org.activemq.service.TransactionManager#createXATransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid)
*/
public Transaction createXATransaction(final BrokerClient client, final ActiveMQXid xid) throws XAException {
// The xa transaction may allready be running.
Transaction tx = (Transaction) xaTxs.get(xid);
if( tx == null ) {
if(log.isDebugEnabled())
log.debug("XA Transaction started: "+xid);
tx = new XATransactionCommand(xid, xaTxs, transactionStore);
xaTxs.put(xid, tx);
}
return tx;
}
/**
* @see org.activemq.service.TransactionManager#getLocalTransaction(String)
*/
public Transaction getLocalTransaction(String txid) throws JMSException {
Transaction tx = (Transaction) localTxs.get(txid);
if (tx == null) {
throw new JMSException("Transaction '" + txid
+ "' has not been started.");
}
return tx;
}
/**
* @see org.activemq.service.TransactionManager#getXATransaction(org.activemq.message.ActiveMQXid)
*/
public Transaction getXATransaction(ActiveMQXid xid) throws XAException {
Transaction tx = (Transaction) xaTxs.get(xid);
if (tx == null) {
XAException e = new XAException("Transaction '" + xid + "' has not been started.");
e.errorCode = XAException.XAER_NOTA;
throw e;
}
return tx;
}
/**
* @see org.activemq.service.TransactionManager#getPreparedXATransactions()
*/
public ActiveMQXid[] getPreparedXATransactions() throws XAException {
ArrayList txs = new ArrayList(xaTxs.size());
for (Iterator iter = xaTxs.keySet().iterator(); iter.hasNext();) {
ActiveMQXid tx = (ActiveMQXid) iter.next();
txs.add(tx);
}
ActiveMQXid rc[] = new ActiveMQXid[txs.size()];
txs.toArray(rc);
return rc;
}
/**
* @see org.activemq.service.TransactionManager#cleanUpClient(org.activemq.broker.BrokerClient)
*/
public void cleanUpClient(BrokerClient client) throws JMSException {
// HRC: I don't think we need to keep track of the client's open transactions here...
// It seems like BrokerClientImpl.close() allready cleans up open transactions.
//
List list = (List) activeClients.remove(client);
if (list != null) {
for (int i = 0; i < list.size(); i++) {
try {
Object o = list.get(i);
if (o instanceof String) {
Transaction t = this.getLocalTransaction((String) o);
t.rollback();
}
else {
Transaction t = this.getXATransaction((ActiveMQXid) o);
t.rollback();
}
}
catch (Exception e) {
log.warn("ERROR Rolling back disconnected client's transactions: ", e);
}
}
list.clear();
}
}
/**
* @see org.activemq.service.TransactionManager#recover(org.activemq.service.Transaction)
*/
public void recover(Transaction transaction) {
// first lets associate any transient data structurs with the
// transaction which has recently been loaded from disk
if (transaction instanceof XATransactionCommand) {
XATransactionCommand xaTransaction = (XATransactionCommand) transaction;
xaTransaction.initialise(xaTxs, transactionStore);
xaTxs.put(transaction.getTransactionId(), transaction);
}
}
public void start() throws JMSException {
transactionStore.start();
try {
transactionStore.recover(new RecoveryListener(){
public void recover(ActiveMQXid xid, ActiveMQMessage[] addedMessages, MessageAck[] aks) throws JMSException, XAException {
Transaction transaction = createXATransaction(null, xid);
for (int i = 0; i < addedMessages.length; i++) {
broker.sendMessage(null, addedMessages[i]);
}
for (int i = 0; i < aks.length; i++) {
broker.acknowledgeMessage(null, aks[i]);
}
transaction.prepare();
}
});
} catch (XAException e) {
throw JMSExceptionHelper.newJMSException("Recovery Failed: "+e.getMessage(), e);
}
}
public void stop() throws JMSException {
transactionStore.stop();
}
}