/*
* Copyright 2009 Red Hat, Inc.
* Red Hat licenses this file to you under the Apache License, version
* 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package org.hornetq.tests.integration.xa;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import junit.framework.Assert;
import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientConsumer;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.MessageHandler;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.impl.XidImpl;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.utils.UUIDGenerator;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
*/
public class BasicXaTest extends ServiceTestBase
{
private static Logger log = Logger.getLogger(BasicXaTest.class);
private final Map<String, AddressSettings> addressSettings = new HashMap<String, AddressSettings>();
private HornetQServer messagingService;
private ClientSession clientSession;
private ClientSessionFactory sessionFactory;
private Configuration configuration;
private final SimpleString atestq = new SimpleString("BasicXaTestq");
@Override
protected void setUp() throws Exception
{
super.setUp();
clearData();
addressSettings.clear();
configuration = createDefaultConfig(true);
configuration.setSecurityEnabled(false);
configuration.setJournalMinFiles(2);
configuration.setPagingDirectory(getPageDir());
messagingService = createServer(false, configuration, -1, -1, addressSettings);
// start the server
messagingService.start();
sessionFactory = createInVMFactory();
clientSession = sessionFactory.createSession(true, false, false);
clientSession.createQueue(atestq, atestq, null, true);
}
@Override
protected void tearDown() throws Exception
{
if (clientSession != null)
{
try
{
clientSession.close();
}
catch (HornetQException e1)
{
//
}
}
if (messagingService != null && messagingService.isStarted())
{
try
{
messagingService.stop();
}
catch (Exception e1)
{
//
}
}
messagingService = null;
clientSession = null;
super.tearDown();
}
public void testSendWithoutXID() throws Exception
{
// Since both resources have same RM, TM will probably use 1PC optimization
ClientSessionFactory factory = createInVMFactory();
ClientSession session = null;
try
{
session = factory.createSession(true, false, false);
session.createQueue("Test", "Test");
ClientProducer prod = session.createProducer("Test");
prod.send(session.createMessage(true));
session.start();
ClientConsumer cons = session.createConsumer("Test");
assertNull("Send went through an invalid XA Session", cons.receiveImmediate());
}
finally
{
factory.close();
session.close();
}
}
public void testACKWithoutXID() throws Exception
{
// Since both resources have same RM, TM will probably use 1PC optimization
ClientSessionFactory factory = createInVMFactory();
ClientSession session = null;
try
{
session = factory.createSession(false, true, true);
session.createQueue("Test", "Test");
ClientProducer prod = session.createProducer("Test");
prod.send(session.createMessage(true));
session.close();
session = factory.createSession(true, false, false);
session.start();
ClientConsumer cons = session.createConsumer("Test");
ClientMessage msg = cons.receive(5000);
assertNotNull(msg);
msg.acknowledge();
session.close();
session = factory.createSession(false, false, false);
session.start();
cons = session.createConsumer("Test");
msg = cons.receiveImmediate();
assertNotNull("Acknowledge went through invalid XA Session", msg);
assertNull(cons.receiveImmediate());
}
finally
{
factory.close();
session.close();
}
}
public void testIsSameRM() throws Exception
{
ClientSessionFactory nettyFactory = createNettyFactory();
validateRM(nettyFactory, nettyFactory);
validateRM(sessionFactory, sessionFactory);
validateRM(nettyFactory, sessionFactory);
}
private void validateRM(final ClientSessionFactory factory1, final ClientSessionFactory factory2) throws Exception
{
ClientSession session1 = factory1.createSession(true, false, false);
ClientSession session2 = factory2.createSession(true, false, false);
if (factory1 == factory2)
{
Assert.assertTrue(session1.isSameRM(session2));
}
else
{
Assert.assertFalse(session1.isSameRM(session2));
}
session1.close();
session2.close();
}
public void testXAInterleaveResourceSuspendWorkCommit() throws Exception
{
Xid xid = newXID();
Xid xid2 = newXID();
ClientProducer clientProducer = clientSession.createProducer(atestq);
ClientSession recSession = sessionFactory.createSession();
recSession.start();
ClientConsumer clientConsumer = recSession.createConsumer(atestq);
ClientMessage m1 = createTextMessage(clientSession, "m1");
ClientMessage m2 = createTextMessage(clientSession, "m2");
clientSession.start(xid, XAResource.TMNOFLAGS);
clientProducer.send(m1);
clientSession.end(xid, XAResource.TMSUSPEND);
clientSession.start(xid2, XAResource.TMNOFLAGS);
clientProducer.send(m2);
clientSession.end(xid, XAResource.TMSUCCESS);
clientSession.commit(xid, true);
ClientMessage message = clientConsumer.receiveImmediate();
assertNotNull(message);
message = clientConsumer.receiveImmediate();
assertNull(message);
clientSession.end(xid2, XAResource.TMSUCCESS);
clientSession.commit(xid2, true);
message = clientConsumer.receiveImmediate();
assertNotNull(message);
}
public void testXAInterleaveResourceRollbackAfterPrepare() throws Exception
{
Xid xid = newXID();
Xid xid2 = newXID();
Xid xid3 = newXID();
ClientProducer clientProducer = clientSession.createProducer(atestq);
ClientConsumer clientConsumer = clientSession.createConsumer(atestq);
ClientMessage m1 = createTextMessage(clientSession, "m1");
clientSession.start(xid, XAResource.TMNOFLAGS);
clientProducer.send(m1);
clientSession.end(xid, XAResource.TMSUCCESS);
clientSession.prepare(xid);
clientSession.commit(xid, false);
clientSession.start();
clientSession.start(xid2, XAResource.TMNOFLAGS);
ClientMessage m2 = clientConsumer.receiveImmediate();
assertNotNull(m2);
clientSession.end(xid2, XAResource.TMSUCCESS);
clientSession.prepare(xid2);
clientSession.rollback(xid2);
clientSession.start(xid3, XAResource.TMNOFLAGS);
m2 = clientConsumer.receiveImmediate();
assertNotNull(m2);
clientSession.end(xid3, XAResource.TMSUCCESS);
clientSession.prepare(xid3);
clientSession.commit(xid3, false);
}
public void testSendPrepareDoesntRollbackOnClose() throws Exception
{
Xid xid = newXID();
ClientMessage m1 = createTextMessage(clientSession, "m1");
ClientMessage m2 = createTextMessage(clientSession, "m2");
ClientMessage m3 = createTextMessage(clientSession, "m3");
ClientMessage m4 = createTextMessage(clientSession, "m4");
ClientProducer clientProducer = clientSession.createProducer(atestq);
clientSession.start(xid, XAResource.TMNOFLAGS);
clientProducer.send(m1);
clientProducer.send(m2);
clientProducer.send(m3);
clientProducer.send(m4);
clientSession.end(xid, XAResource.TMSUCCESS);
clientSession.prepare(xid);
clientSession.close();
clientSession = sessionFactory.createSession(true, false, false);
log.info("committing");
clientSession.commit(xid, false);
clientSession.start();
ClientConsumer clientConsumer = clientSession.createConsumer(atestq);
ClientMessage m = clientConsumer.receive(1000);
Assert.assertNotNull(m);
Assert.assertEquals(m.getBodyBuffer().readString(), "m1");
m = clientConsumer.receive(1000);
Assert.assertNotNull(m);
Assert.assertEquals(m.getBodyBuffer().readString(), "m2");
m = clientConsumer.receive(1000);
Assert.assertNotNull(m);
Assert.assertEquals(m.getBodyBuffer().readString(), "m3");
m = clientConsumer.receive(1000);
Assert.assertNotNull(m);
Assert.assertEquals(m.getBodyBuffer().readString(), "m4");
}
public void testReceivePrepareDoesntRollbackOnClose() throws Exception
{
Xid xid = newXID();
ClientSession clientSession2 = sessionFactory.createSession(false, true, true);
ClientProducer clientProducer = clientSession2.createProducer(atestq);
ClientMessage m1 = createTextMessage(clientSession2, "m1");
ClientMessage m2 = createTextMessage(clientSession2, "m2");
ClientMessage m3 = createTextMessage(clientSession2, "m3");
ClientMessage m4 = createTextMessage(clientSession2, "m4");
clientProducer.send(m1);
clientProducer.send(m2);
clientProducer.send(m3);
clientProducer.send(m4);
clientSession.start(xid, XAResource.TMNOFLAGS);
clientSession.start();
ClientConsumer clientConsumer = clientSession.createConsumer(atestq);
ClientMessage m = clientConsumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals(m.getBodyBuffer().readString(), "m1");
m = clientConsumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals(m.getBodyBuffer().readString(), "m2");
m = clientConsumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals(m.getBodyBuffer().readString(), "m3");
m = clientConsumer.receive(1000);
Assert.assertNotNull(m);
m.acknowledge();
Assert.assertEquals(m.getBodyBuffer().readString(), "m4");
clientSession.end(xid, XAResource.TMSUCCESS);
clientSession.prepare(xid);
clientSession.close();
clientSession = sessionFactory.createSession(true, false, false);
clientSession.commit(xid, false);
clientSession.start();
clientConsumer = clientSession.createConsumer(atestq);
m = clientConsumer.receiveImmediate();
Assert.assertNull(m);
clientSession2.close();
}
public void testReceiveRollback() throws Exception
{
int numSessions = 100;
ClientSession clientSession2 = sessionFactory.createSession(false, true, true);
ClientProducer clientProducer = clientSession2.createProducer(atestq);
for (int i = 0; i < 10 * numSessions; i++)
{
clientProducer.send(createTextMessage(clientSession2, "m" + i));
}
ClientSession[] clientSessions = new ClientSession[numSessions];
ClientConsumer[] clientConsumers = new ClientConsumer[numSessions];
TxMessageHandler[] handlers = new TxMessageHandler[numSessions];
CountDownLatch latch = new CountDownLatch(numSessions * AddressSettings.DEFAULT_MAX_DELIVERY_ATTEMPTS);
for (int i = 0; i < clientSessions.length; i++)
{
clientSessions[i] = sessionFactory.createSession(true, false, false);
clientConsumers[i] = clientSessions[i].createConsumer(atestq);
handlers[i] = new TxMessageHandler(clientSessions[i], latch);
clientConsumers[i].setMessageHandler(handlers[i]);
}
for (ClientSession session : clientSessions)
{
session.start();
}
Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
for (TxMessageHandler messageHandler : handlers)
{
Assert.assertFalse(messageHandler.failedToAck);
}
clientSession2.close();
for (ClientSession session : clientSessions)
{
session.close();
}
}
public void testSendMultipleQueues() throws Exception
{
multipleQueuesInternalTest(true, false, false, false, false);
}
public void testSendMultipleQueuesOnePhase() throws Exception
{
multipleQueuesInternalTest(true, false, false, false, true);
multipleQueuesInternalTest(false, false, true, false, true);
}
public void testSendMultipleQueuesOnePhaseJoin() throws Exception
{
multipleQueuesInternalTest(true, false, false, true, true);
multipleQueuesInternalTest(false, false, true, true, true);
}
public void testSendMultipleQueuesTwoPhaseJoin() throws Exception
{
multipleQueuesInternalTest(true, false, false, true, false);
multipleQueuesInternalTest(false, false, true, true, false);
}
public void testSendMultipleQueuesRecreate() throws Exception
{
multipleQueuesInternalTest(true, false, true, false, false);
}
public void testSendMultipleSuspend() throws Exception
{
multipleQueuesInternalTest(true, true, false, false, false);
}
public void testSendMultipleSuspendRecreate() throws Exception
{
multipleQueuesInternalTest(true, true, true, false, false);
}
public void testSendMultipleSuspendErrorCheck() throws Exception
{
ClientSession session = null;
session = sessionFactory.createSession(true, false, false);
Xid xid = newXID();
session.start(xid, XAResource.TMNOFLAGS);
try
{
session.start(xid, XAResource.TMRESUME);
Assert.fail("XAException expected");
}
catch (XAException e)
{
Assert.assertEquals(XAException.XAER_PROTO, e.errorCode);
}
session.close();
}
public void testEmptyXID() throws Exception
{
Xid xid = newXID();
ClientSession session = sessionFactory.createSession(true, false, false);
session.start(xid, XAResource.TMNOFLAGS);
session.end(xid, XAResource.TMSUCCESS);
session.rollback(xid);
session.close();
messagingService.stop();
// do the same test with a file persistence now
messagingService = createServer(true, configuration, -1, -1, addressSettings);
messagingService.start();
sessionFactory = createInVMFactory();
xid = newXID();
session = sessionFactory.createSession(true, false, false);
session.start(xid, XAResource.TMNOFLAGS);
session.end(xid, XAResource.TMSUCCESS);
session.rollback(xid);
xid = newXID();
session.start(xid, XAResource.TMNOFLAGS);
session.end(xid, XAResource.TMSUCCESS);
session.prepare(xid);
session.commit(xid, false);
session.close();
xid = newXID();
session = sessionFactory.createSession(true, false, false);
session.start(xid, XAResource.TMNOFLAGS);
session.end(xid, XAResource.TMSUCCESS);
session.prepare(xid);
session.rollback(xid);
session.close();
messagingService.start();
sessionFactory = createInVMFactory();
xid = newXID();
session = sessionFactory.createSession(true, false, false);
session.start(xid, XAResource.TMNOFLAGS);
session.end(xid, XAResource.TMSUCCESS);
session.rollback(xid);
session.close();
messagingService.stop();
messagingService.start();
// This is not really necessary... But since the server has stopped, I would prefer to keep recreating the factory
sessionFactory = createInVMFactory();
session = sessionFactory.createSession(true, false, false);
Xid[] xids = session.recover(XAResource.TMSTARTRSCAN);
Assert.assertEquals(0, xids.length);
session.close();
}
public void testFailXID() throws Exception
{
Xid xid = newXID();
ClientSession session = sessionFactory.createSession(true, false, false);
session.start(xid, XAResource.TMNOFLAGS);
session.end(xid, XAResource.TMFAIL);
session.rollback(xid);
session.close();
}
public void testForgetUnknownXID() throws Exception
{
try
{
clientSession.forget(newXID());
Assert.fail("should throw a XAERR_NOTA XAException");
}
catch (XAException e)
{
Assert.assertEquals(XAException.XAER_NOTA, e.errorCode);
}
}
public void testForgetHeuristicallyCommittedXID() throws Exception
{
Xid xid = newXID();
clientSession.start(xid, XAResource.TMNOFLAGS);
clientSession.end(xid, XAResource.TMSUCCESS);
clientSession.prepare(xid);
String[] preparedTransactions = messagingService.getHornetQServerControl().listPreparedTransactions();
Assert.assertEquals(1, preparedTransactions.length);
System.out.println(preparedTransactions[0]);
Assert.assertTrue(messagingService.getHornetQServerControl()
.commitPreparedTransaction(XidImpl.toBase64String(xid)));
Assert.assertEquals(1, messagingService.getHornetQServerControl().listHeuristicCommittedTransactions().length);
clientSession.forget(xid);
Assert.assertEquals(0, messagingService.getHornetQServerControl().listHeuristicCommittedTransactions().length);
}
public void testForgetHeuristicallyRolledBackXID() throws Exception
{
Xid xid = newXID();
clientSession.start(xid, XAResource.TMNOFLAGS);
clientSession.end(xid, XAResource.TMSUCCESS);
clientSession.prepare(xid);
String[] preparedTransactions = messagingService.getHornetQServerControl().listPreparedTransactions();
Assert.assertEquals(1, preparedTransactions.length);
System.out.println(preparedTransactions[0]);
Assert.assertTrue(messagingService.getHornetQServerControl()
.rollbackPreparedTransaction(XidImpl.toBase64String(xid)));
Assert.assertEquals(1, messagingService.getHornetQServerControl().listHeuristicRolledBackTransactions().length);
clientSession.forget(xid);
Assert.assertEquals(0, messagingService.getHornetQServerControl().listHeuristicRolledBackTransactions().length);
}
public void testCommitHeuristicallyCommittedXID() throws Exception
{
doCompleteHeuristicallyCompletedXID(true, true);
}
public void testCommitHeuristicallyRolledBackXID() throws Exception
{
doCompleteHeuristicallyCompletedXID(true, false);
}
public void testRollbacktHeuristicallyCommittedXID() throws Exception
{
doCompleteHeuristicallyCompletedXID(false, true);
}
public void testRollbackHeuristicallyRolledBackXID() throws Exception
{
doCompleteHeuristicallyCompletedXID(false, false);
}
public void testSimpleJoin() throws Exception
{
SimpleString ADDRESS1 = new SimpleString("Address-1");
SimpleString ADDRESS2 = new SimpleString("Address-2");
clientSession.createQueue(ADDRESS1, ADDRESS1, true);
clientSession.createQueue(ADDRESS2, ADDRESS2, true);
Xid xid = newXID();
ClientSession sessionA = sessionFactory.createSession(true, false, false);
sessionA.start(xid, XAResource.TMNOFLAGS);
ClientSession sessionB = sessionFactory.createSession(true, false, false);
sessionB.start(xid, XAResource.TMJOIN);
ClientProducer prodA = sessionA.createProducer(ADDRESS1);
ClientProducer prodB = sessionB.createProducer(ADDRESS2);
for (int i = 0; i < 100; i++)
{
prodA.send(createTextMessage(sessionA, "A" + i));
prodB.send(createTextMessage(sessionB, "B" + i));
}
sessionA.end(xid, XAResource.TMSUCCESS);
sessionB.end(xid, XAResource.TMSUCCESS);
sessionB.close();
sessionA.commit(xid, true);
sessionA.close();
xid = newXID();
clientSession.start(xid, XAResource.TMNOFLAGS);
ClientConsumer cons1 = clientSession.createConsumer(ADDRESS1);
ClientConsumer cons2 = clientSession.createConsumer(ADDRESS2);
clientSession.start();
for (int i = 0; i < 100; i++)
{
ClientMessage msg = cons1.receive(1000);
Assert.assertNotNull(msg);
Assert.assertEquals("A" + i, getTextMessage(msg));
msg.acknowledge();
msg = cons2.receive(1000);
Assert.assertNotNull(msg);
Assert.assertEquals("B" + i, getTextMessage(msg));
msg.acknowledge();
}
Assert.assertNull(cons1.receiveImmediate());
Assert.assertNull(cons2.receiveImmediate());
clientSession.end(xid, XAResource.TMSUCCESS);
clientSession.commit(xid, true);
clientSession.close();
}
/**
* @throws HornetQException
* @throws XAException
*/
protected void multipleQueuesInternalTest(final boolean createQueues,
final boolean suspend,
final boolean recreateSession,
final boolean isJoinSession,
final boolean onePhase) throws Exception
{
int NUMBER_OF_MSGS = 100;
int NUMBER_OF_QUEUES = 10;
ClientSession session = null;
SimpleString ADDRESS = new SimpleString("Address");
ClientSession newJoinSession = null;
try
{
session = sessionFactory.createSession(true, false, false);
if (createQueues)
{
for (int i = 0; i < NUMBER_OF_QUEUES; i++)
{
session.createQueue(ADDRESS, ADDRESS.concat(Integer.toString(i)), true);
if (isJoinSession)
{
clientSession.createQueue(ADDRESS.concat("-join"), ADDRESS.concat("-join." + i), true);
}
}
}
for (int tr = 0; tr < 2; tr++)
{
Xid xid = newXID();
session.start(xid, XAResource.TMNOFLAGS);
ClientProducer prod = session.createProducer(ADDRESS);
for (int nmsg = 0; nmsg < NUMBER_OF_MSGS; nmsg++)
{
ClientMessage msg = createTextMessage(session, "SimpleMessage" + nmsg);
prod.send(msg);
}
if (suspend)
{
session.end(xid, XAResource.TMSUSPEND);
session.start(xid, XAResource.TMRESUME);
}
prod.send(createTextMessage(session, "one more"));
prod.close();
if (isJoinSession)
{
newJoinSession = sessionFactory.createSession(true, false, false);
// This is a basic condition, or a real TM wouldn't be able to join both sessions in a single
// transactions
Assert.assertTrue(session.isSameRM(newJoinSession));
newJoinSession.start(xid, XAResource.TMJOIN);
// The Join Session will have its own queue, as it's not possible to guarantee ordering since this
// producer will be using a different session
ClientProducer newProd = newJoinSession.createProducer(ADDRESS.concat("-join"));
newProd.send(createTextMessage(newJoinSession, "After Join"));
}
session.end(xid, XAResource.TMSUCCESS);
if (isJoinSession)
{
newJoinSession.end(xid, XAResource.TMSUCCESS);
newJoinSession.close();
}
if (!onePhase)
{
session.prepare(xid);
}
if (recreateSession)
{
session.close();
session = sessionFactory.createSession(true, false, false);
}
if (tr == 0)
{
session.rollback(xid);
}
else
{
session.commit(xid, onePhase);
}
}
for (int i = 0; i < 2; i++)
{
Xid xid = newXID();
session.start(xid, XAResource.TMNOFLAGS);
for (int nqueues = 0; nqueues < NUMBER_OF_QUEUES; nqueues++)
{
ClientConsumer consumer = session.createConsumer(ADDRESS.concat(Integer.toString(nqueues)));
session.start();
for (int nmsg = 0; nmsg < NUMBER_OF_MSGS; nmsg++)
{
ClientMessage msg = consumer.receive(1000);
Assert.assertNotNull(msg);
Assert.assertEquals("SimpleMessage" + nmsg, getTextMessage(msg));
msg.acknowledge();
}
ClientMessage msg = consumer.receive(1000);
Assert.assertNotNull(msg);
Assert.assertEquals("one more", getTextMessage(msg));
msg.acknowledge();
if (suspend)
{
session.end(xid, XAResource.TMSUSPEND);
session.start(xid, XAResource.TMRESUME);
}
Assert.assertEquals("one more", getTextMessage(msg));
if (isJoinSession)
{
ClientSession newSession = sessionFactory.createSession(true, false, false);
newSession.start(xid, XAResource.TMJOIN);
newSession.start();
ClientConsumer newConsumer = newSession.createConsumer(ADDRESS.concat("-join." + nqueues));
msg = newConsumer.receive(1000);
Assert.assertNotNull(msg);
Assert.assertEquals("After Join", getTextMessage(msg));
msg.acknowledge();
newSession.end(xid, XAResource.TMSUCCESS);
newSession.close();
}
Assert.assertNull(consumer.receiveImmediate());
consumer.close();
}
session.end(xid, XAResource.TMSUCCESS);
session.prepare(xid);
if (recreateSession)
{
session.close();
session = sessionFactory.createSession(true, false, false);
}
if (i == 0)
{
session.rollback(xid);
}
else
{
session.commit(xid, false);
}
}
}
finally
{
if (session != null)
{
session.close();
}
}
}
private void doCompleteHeuristicallyCompletedXID(final boolean isCommit, final boolean heuristicCommit) throws Exception
{
Xid xid = newXID();
clientSession.start(xid, XAResource.TMNOFLAGS);
clientSession.end(xid, XAResource.TMSUCCESS);
clientSession.prepare(xid);
String[] preparedTransactions = messagingService.getHornetQServerControl().listPreparedTransactions();
Assert.assertEquals(1, preparedTransactions.length);
if (heuristicCommit)
{
Assert.assertTrue(messagingService.getHornetQServerControl()
.commitPreparedTransaction(XidImpl.toBase64String(xid)));
Assert.assertEquals(1, messagingService.getHornetQServerControl().listHeuristicCommittedTransactions().length);
}
else
{
Assert.assertTrue(messagingService.getHornetQServerControl()
.rollbackPreparedTransaction(XidImpl.toBase64String(xid)));
Assert.assertEquals(1, messagingService.getHornetQServerControl().listHeuristicRolledBackTransactions().length);
}
Assert.assertEquals(0, messagingService.getHornetQServerControl().listPreparedTransactions().length);
try
{
if (isCommit)
{
clientSession.commit(xid, false);
}
else
{
clientSession.rollback(xid);
}
Assert.fail("neither commit not rollback must succeed on a heuristically completed tx");
}
catch (XAException e)
{
if (heuristicCommit)
{
Assert.assertEquals(XAException.XA_HEURCOM, e.errorCode);
}
else
{
Assert.assertEquals(XAException.XA_HEURRB, e.errorCode);
}
}
if (heuristicCommit)
{
Assert.assertEquals(1, messagingService.getHornetQServerControl().listHeuristicCommittedTransactions().length);
}
else
{
Assert.assertEquals(1, messagingService.getHornetQServerControl().listHeuristicRolledBackTransactions().length);
}
}
class TxMessageHandler implements MessageHandler
{
boolean failedToAck = false;
final ClientSession session;
private final CountDownLatch latch;
public TxMessageHandler(final ClientSession session, final CountDownLatch latch)
{
this.latch = latch;
this.session = session;
}
public void onMessage(final ClientMessage message)
{
Xid xid = new XidImpl(UUIDGenerator.getInstance().generateStringUUID().getBytes(),
1,
UUIDGenerator.getInstance().generateStringUUID().getBytes());
try
{
session.start(xid, XAResource.TMNOFLAGS);
}
catch (XAException e)
{
e.printStackTrace();
}
try
{
message.acknowledge();
}
catch (HornetQException e)
{
BasicXaTest.log.error("Failed to process message", e);
}
try
{
session.end(xid, XAResource.TMSUCCESS);
session.rollback(xid);
}
catch (Exception e)
{
e.printStackTrace();
failedToAck = true;
try
{
session.close();
}
catch (HornetQException e1)
{
//
}
}
latch.countDown();
}
}
}