/*
* Copyright 2009 Red Hat, Inc.
* Red Hat licenses this file to you under the Apache License, version
* 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied. See the License for the specific language governing
* permissions and limitations under the License.
*/
package org.hornetq.tests.integration.ra;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.core.transaction.impl.XidImpl;
import org.hornetq.ra.HornetQResourceAdapter;
import org.hornetq.ra.inflow.HornetQActivationSpec;
import org.hornetq.utils.UUIDGenerator;
import javax.resource.ResourceException;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;
import java.lang.reflect.Method;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
* Created Jul 6, 2010
*/
public class HornetQMessageHandlerXATest extends HornetQRATestBase
{
@Override
public boolean isSecure()
{
return false;
}
public void testXACommit() throws Exception
{
HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
MyBootstrapContext ctx = new MyBootstrapContext();
qResourceAdapter.start(ctx);
HornetQActivationSpec spec = new HornetQActivationSpec();
spec.setResourceAdapter(qResourceAdapter);
spec.setUseJNDI(false);
spec.setDestinationType("javax.jms.Queue");
spec.setDestination(MDBQUEUE);
qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
CountDownLatch latch = new CountDownLatch(1);
XADummyEndpoint endpoint = new XADummyEndpoint(latch);
DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, true);
qResourceAdapter.endpointActivation(endpointFactory, spec);
ClientSession session = locator.createSessionFactory().createSession();
ClientProducer clientProducer = session.createProducer(MDBQUEUEPREFIXED);
ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeString("teststring");
clientProducer.send(message);
session.close();
latch.await(5, TimeUnit.SECONDS);
assertNotNull(endpoint.lastMessage);
assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "teststring");
endpoint.prepare();
endpoint.commit();
qResourceAdapter.endpointDeactivation(endpointFactory, spec);
qResourceAdapter.stop();
}
public void testXARollback() throws Exception
{
HornetQResourceAdapter qResourceAdapter = new HornetQResourceAdapter();
MyBootstrapContext ctx = new MyBootstrapContext();
qResourceAdapter.start(ctx);
HornetQActivationSpec spec = new HornetQActivationSpec();
spec.setResourceAdapter(qResourceAdapter);
spec.setMaxSession(1);
spec.setUseJNDI(false);
spec.setDestinationType("javax.jms.Queue");
spec.setDestination(MDBQUEUE);
qResourceAdapter.setConnectorClassName(INVM_CONNECTOR_FACTORY);
CountDownLatch latch = new CountDownLatch(1);
XADummyEndpoint endpoint = new XADummyEndpoint(latch);
DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, true);
qResourceAdapter.endpointActivation(endpointFactory, spec);
ClientSession session = locator.createSessionFactory().createSession();
ClientProducer clientProducer = session.createProducer(MDBQUEUEPREFIXED);
ClientMessage message = session.createMessage(true);
message.getBodyBuffer().writeString("teststring");
clientProducer.send(message);
session.close();
latch.await(5, TimeUnit.SECONDS);
assertNotNull(endpoint.lastMessage);
assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "teststring");
latch = new CountDownLatch(1);
endpoint.reset(latch);
endpoint.rollback();
latch.await(5, TimeUnit.SECONDS);
assertNotNull(endpoint.lastMessage);
assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "teststring");
qResourceAdapter.endpointDeactivation(endpointFactory, spec);
qResourceAdapter.stop();
}
class XADummyEndpoint extends DummyMessageEndpoint
{
private Xid xid;
public XADummyEndpoint(CountDownLatch latch)
{
super(latch);
xid = new XidImpl("xa1".getBytes(), 1, UUIDGenerator.getInstance().generateStringUUID().getBytes());
}
@Override
public void beforeDelivery(Method method) throws NoSuchMethodException, ResourceException
{
super.beforeDelivery(method);
try
{
xaResource.start(xid, XAResource.TMNOFLAGS);
}
catch (XAException e)
{
throw new ResourceException(e.getMessage(), e);
}
}
@Override
public void afterDelivery() throws ResourceException
{
try
{
xaResource.end(xid, XAResource.TMSUCCESS);
}
catch (XAException e)
{
throw new ResourceException(e.getMessage(), e);
}
super.afterDelivery();
}
public void rollback() throws XAException
{
xaResource.rollback(xid);
}
public void prepare() throws XAException
{
xaResource.prepare(xid);
}
public void commit() throws XAException
{
xaResource.commit(xid, false);
}
}
}