/*
* JBoss, Home of Professional Open Source
* Copyright 2006, JBoss Inc., and individual contributors as indicated
* by the @authors tag. See the copyright.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.internal.soa.esb.rosetta.pooling;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import javax.jms.XAConnection;
import javax.jms.XAConnectionFactory;
import javax.jms.XASession;
import javax.naming.Context;
import javax.transaction.Status;
import javax.transaction.Synchronization;
import javax.transaction.xa.XAResource;
import junit.framework.Assert;
import junit.framework.JUnit4TestAdapter;
import org.jboss.internal.soa.esb.rosetta.pooling.handlers.WMQConnectionExceptionHandler;
import org.jboss.soa.esb.addressing.eprs.JMSEpr;
import org.jboss.soa.esb.common.Environment;
import org.jboss.soa.esb.common.TransactionStrategy;
import org.jboss.soa.esb.common.TransactionStrategyException;
import org.jboss.soa.esb.common.TransactionStrategy.NullTransactionStrategy;
import org.jboss.soa.esb.helpers.NamingContextPool;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockejb.jms.MockQueue;
import org.mockejb.jms.MockTopic;
import org.mockejb.jndi.MockContextFactory;
/**
* Unit tests for handling specific error conditions in JmsConnectionPool
* and JBMQ acknowledge behaviour.
*
* @author <a href='kevin.conner@jboss.com'>Kevin Conner</a>
*/
public class JmsConnectionPoolUnitTest
{
private static final String CONNECTION_FACTORY = "ConnectionFactory" ;
private static final String QUEUE_NAME = "testQueue" ;
private static final String TOPIC_NAME = "testTopic" ;
@Before
public void setUp()
throws Exception
{
MockContextFactory.setAsInitial();
final Context ctx = NamingContextPool.getNamingContext(null);
try
{
ctx.rebind(CONNECTION_FACTORY, new MockXAConnectionFactory());
}
finally
{
NamingContextPool.releaseNamingContext(ctx) ;
}
System.setProperty(Environment.JNDI_SERVER_CONTEXT_FACTORY, System.getProperty(Context.INITIAL_CONTEXT_FACTORY)) ;
}
@After
public void tearDown()
throws Exception
{
MockContextFactory.revertSetAsInitial();
}
@Test
public void testSessionAcknowledgeBehaviour()
throws Exception
{
MockSessionInvocationHandler.changeAcknowledgeMode = true ;
try
{
final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
final int acknowledgeMode = Session.CLIENT_ACKNOWLEDGE ;
Assert.assertEquals("current pool free count", 0, pool.getFreeSessionsInPool(acknowledgeMode)) ;
Assert.assertEquals("current pool in use count", 0, pool.getInUseSessionsInPool(acknowledgeMode)) ;
final JmsSession session = pool.getSession(acknowledgeMode) ;
Assert.assertEquals("Session class", JmsSession.class, session.getClass()) ;
Assert.assertEquals("current pool free count", 0, pool.getFreeSessionsInPool(acknowledgeMode)) ;
Assert.assertEquals("current pool in use count", 1, pool.getInUseSessionsInPool(acknowledgeMode)) ;
Assert.assertTrue("Different acknowledge mode", acknowledgeMode != session.getAcknowledgeMode()) ;
pool.closeSession(session) ;
Assert.assertEquals("current pool free count", 1, pool.getFreeSessionsInPool(acknowledgeMode)) ;
Assert.assertEquals("current pool in use count", 0, pool.getInUseSessionsInPool(acknowledgeMode)) ;
}
finally
{
MockSessionInvocationHandler.changeAcknowledgeMode = false ;
}
}
@Test
public void testSessionRepeatableAcquire()
throws Exception
{
final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
final JmsSession session = pool.getSession() ;
Assert.assertEquals("Session class", JmsSession.class, session.getClass()) ;
pool.closeSession(session) ;
final JmsSession session2 = pool.getSession() ;
Assert.assertEquals("Session class", JmsSession.class, session2.getClass()) ;
Assert.assertSame("Same session returned", session.getSession(), session2.getSession()) ;
}
@Test
public void testSessionRetry()
throws Exception
{
final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
MockConnectionInvocationHandler.throwFault = true ;
final JmsSession session ;
try
{
session = pool.getSession() ;
Assert.assertFalse("fault should have been thrown", MockConnectionInvocationHandler.throwFault) ;
}
finally
{
MockConnectionInvocationHandler.throwFault = false ;
}
Assert.assertEquals("Session class", JmsSession.class, session.getClass()) ;
pool.closeSession(session) ;
}
@Test
public void testSessionQueueBrowserRetry()
throws Exception
{
executeSessionQueueBrowserRetry(false) ;
}
@Test
public void testWSMQSessionQueueBrowserRetry()
throws Exception
{
executeSessionQueueBrowserRetry(true) ;
}
public void executeSessionQueueBrowserRetry(final boolean wsmq)
throws Exception
{
final Queue queue = new MockQueue(QUEUE_NAME) ;
final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
final JmsSession session = pool.getSession() ;
Assert.assertEquals("Session class", JmsSession.class, session.getClass()) ;
final QueueBrowser queueBrowser = session.createBrowser(queue) ;
queueBrowser.close() ;
setFault(wsmq, true) ;
try
{
session.createBrowser(queue) ;
Assert.fail("Expected JmsConnectionFailureException") ;
}
catch (final JmsConnectionFailureException jmse) {} // expected
finally
{
setFault(wsmq, false) ;
}
pool.closeSession(session) ;
final JmsSession session2 = pool.getSession() ;
Assert.assertEquals("Session class", JmsSession.class, session2.getClass()) ;
Assert.assertNotSame("Session class", session, session2) ;
}
@Test
public void testSessionMessageConsumerRetry()
throws Exception
{
executeSessionMessageConsumerRetry(false) ;
}
@Test
public void testWSMQSessionMessageConsumerRetry()
throws Exception
{
executeSessionMessageConsumerRetry(true) ;
}
public void executeSessionMessageConsumerRetry(final boolean wsmq)
throws Exception
{
final Queue queue = new MockQueue(QUEUE_NAME) ;
final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
final JmsSession session = pool.getSession() ;
Assert.assertEquals("Session class", JmsSession.class, session.getClass()) ;
final MessageConsumer messageConsumer = session.createConsumer(queue) ;
messageConsumer.close() ;
setFault(wsmq, true) ;
try
{
session.createConsumer(queue) ;
Assert.fail("Expected JmsConnectionFailureException") ;
}
catch (final JmsConnectionFailureException jmse) {} // expected
finally
{
setFault(wsmq, false) ;
}
pool.closeSession(session) ;
final JmsSession session2 = pool.getSession() ;
Assert.assertEquals("Session class", JmsSession.class, session2.getClass()) ;
Assert.assertNotSame("Session class", session, session2) ;
}
@Test
public void testSessionTopicSubscriberRetry()
throws Exception
{
executeSessionTopicSubscriberRetry(false) ;
}
@Test
public void testWSMQSessionTopicSubscriberRetry()
throws Exception
{
executeSessionTopicSubscriberRetry(true) ;
}
public void executeSessionTopicSubscriberRetry(final boolean wsmq)
throws Exception
{
final Topic topic = new MockTopic(TOPIC_NAME) ;
final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
final JmsSession session = pool.getSession() ;
Assert.assertEquals("Session class", JmsSession.class, session.getClass()) ;
final TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "testSessionTopicSubscriberRetry") ;
topicSubscriber.close() ;
setFault(wsmq, true) ;
try
{
session.createDurableSubscriber(topic, "testSessionTopicSubscriberRetry") ;
Assert.fail("Expected JmsConnectionFailureException") ;
}
catch (final JmsConnectionFailureException jmse) {} // expected
finally
{
setFault(wsmq, false) ;
}
pool.closeSession(session) ;
final JmsSession session2 = pool.getSession() ;
Assert.assertEquals("Session class", JmsSession.class, session2.getClass()) ;
Assert.assertNotSame("Session class", session, session2) ;
}
@Test
public void testSessionMessageProducerRetry()
throws Exception
{
executeSessionMessageProducerRetry(false) ;
}
@Test
public void testWSMQSessionMessageProducerRetry()
throws Exception
{
executeSessionMessageProducerRetry(true) ;
}
public void executeSessionMessageProducerRetry(final boolean wsmq)
throws Exception
{
final Queue queue = new MockQueue(QUEUE_NAME) ;
final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
final JmsSession session = pool.getSession() ;
Assert.assertEquals("Session class", JmsSession.class, session.getClass()) ;
final MessageProducer messageProducer = session.createProducer(queue) ;
messageProducer.close() ;
setFault(wsmq, true) ;
try
{
session.createProducer(queue) ;
Assert.fail("Expected JmsConnectionFailureException") ;
}
catch (final JmsConnectionFailureException jmse) {} // expected
finally
{
setFault(wsmq, false) ;
}
pool.closeSession(session) ;
final JmsSession session2 = pool.getSession() ;
Assert.assertEquals("Session class", JmsSession.class, session2.getClass()) ;
Assert.assertNotSame("Session class", session, session2) ;
}
@Test
public void testXASessionAcknowledgeBehaviour()
throws Exception
{
executeXASessionAcknowledgeBehaviour(false) ;
}
@Test
public void testWSMQXASessionAcknowledgeBehaviour()
throws Exception
{
executeXASessionAcknowledgeBehaviour(true) ;
}
public void executeXASessionAcknowledgeBehaviour(final boolean wsmq)
throws Exception
{
final TransactionStrategy transactionStrategy = TransactionStrategy.getTransactionStrategy(true) ;
TransactionStrategy.setTransactionStrategy(new MockActiveTransactionStrategy()) ;
MockSessionInvocationHandler.changeAcknowledgeMode = true ;
try
{
final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
// transactional sessions are requested with transacted acknowledge mode
final int acknowledgeMode = JmsConnectionPool.XA_TRANSACTED ;
Assert.assertEquals("current pool free count", 0, pool.getFreeSessionsInPool(acknowledgeMode)) ;
Assert.assertEquals("current pool in use count", 0, pool.getInUseSessionsInPool(acknowledgeMode)) ;
final JmsSession session = pool.getSession() ;
Assert.assertEquals("Session class", JmsXASession.class, session.getClass()) ;
Assert.assertEquals("current pool free count", 0, pool.getFreeSessionsInPool(acknowledgeMode)) ;
Assert.assertEquals("current pool in use count", 1, pool.getInUseSessionsInPool(acknowledgeMode)) ;
Assert.assertTrue("Different acknowledge mode", acknowledgeMode != session.getAcknowledgeMode()) ;
pool.closeSession(session) ;
Assert.assertEquals("current pool free count", 1, pool.getFreeSessionsInPool(acknowledgeMode)) ;
Assert.assertEquals("current pool in use count", 0, pool.getInUseSessionsInPool(acknowledgeMode)) ;
}
finally
{
MockSessionInvocationHandler.changeAcknowledgeMode = false ;
TransactionStrategy.setTransactionStrategy(transactionStrategy) ;
}
}
@Test
public void testXASessionRetry()
throws Exception
{
final TransactionStrategy transactionStrategy = TransactionStrategy.getTransactionStrategy(true) ;
TransactionStrategy.setTransactionStrategy(new MockActiveTransactionStrategy()) ;
try
{
final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
MockConnectionInvocationHandler.throwFault = true ;
final JmsSession session ;
try
{
session = pool.getSession() ;
Assert.assertFalse("fault should have been thrown", MockConnectionInvocationHandler.throwFault) ;
}
finally
{
MockConnectionInvocationHandler.throwFault = false ;
}
Assert.assertEquals("Session class", JmsXASession.class, session.getClass()) ;
pool.closeSession(session) ;
}
finally
{
TransactionStrategy.setTransactionStrategy(transactionStrategy) ;
}
}
@Test
public void testXASessionQueueBrowserRetry()
throws Exception
{
executeXASessionQueueBrowserRetry(false) ;
}
@Test
public void testWSMQXASessionQueueBrowserRetry()
throws Exception
{
executeXASessionQueueBrowserRetry(true) ;
}
public void executeXASessionQueueBrowserRetry(final boolean wsmq)
throws Exception
{
final TransactionStrategy transactionStrategy = TransactionStrategy.getTransactionStrategy(true) ;
TransactionStrategy.setTransactionStrategy(new MockActiveTransactionStrategy()) ;
try
{
final Queue queue = new MockQueue(QUEUE_NAME) ;
final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
final JmsSession session = pool.getSession() ;
Assert.assertEquals("Session class", JmsXASession.class, session.getClass()) ;
final QueueBrowser queueBrowser = session.createBrowser(queue) ;
queueBrowser.close() ;
setFault(wsmq, true) ;
try
{
session.createBrowser(queue) ;
Assert.fail("Expected JmsConnectionFailureException") ;
}
catch (final JmsConnectionFailureException jmse) {} // expected
finally
{
setFault(wsmq, false) ;
}
pool.closeSession(session) ;
final JmsSession session2 = pool.getSession() ;
Assert.assertEquals("Session class", JmsXASession.class, session2.getClass()) ;
Assert.assertNotSame("Session class", session, session2) ;
}
finally
{
TransactionStrategy.setTransactionStrategy(transactionStrategy) ;
}
}
@Test
public void testXASessionMessageConsumerRetry()
throws Exception
{
executeXASessionMessageConsumerRetry(false) ;
}
@Test
public void testWSMQXASessionMessageConsumerRetry()
throws Exception
{
executeXASessionMessageConsumerRetry(true) ;
}
public void executeXASessionMessageConsumerRetry(final boolean wsmq)
throws Exception
{
final TransactionStrategy transactionStrategy = TransactionStrategy.getTransactionStrategy(true) ;
TransactionStrategy.setTransactionStrategy(new MockActiveTransactionStrategy()) ;
try
{
final Queue queue = new MockQueue(QUEUE_NAME) ;
final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
final JmsSession session = pool.getSession() ;
Assert.assertEquals("Session class", JmsXASession.class, session.getClass()) ;
final MessageConsumer messageConsumer = session.createConsumer(queue) ;
messageConsumer.close() ;
setFault(wsmq, true) ;
try
{
session.createConsumer(queue) ;
Assert.fail("Expected JmsConnectionFailureException") ;
}
catch (final JmsConnectionFailureException jmse) {} // expected
finally
{
setFault(wsmq, false) ;
}
pool.closeSession(session) ;
final JmsSession session2 = pool.getSession() ;
Assert.assertEquals("Session class", JmsXASession.class, session2.getClass()) ;
Assert.assertNotSame("Session class", session, session2) ;
}
finally
{
TransactionStrategy.setTransactionStrategy(transactionStrategy) ;
}
}
@Test
public void testXASessionTopicSubscriberRetry()
throws Exception
{
executeXASessionTopicSubscriberRetry(false) ;
}
@Test
public void testWSMQXASessionTopicSubscriberRetry()
throws Exception
{
executeXASessionTopicSubscriberRetry(true) ;
}
public void executeXASessionTopicSubscriberRetry(final boolean wsmq)
throws Exception
{
final TransactionStrategy transactionStrategy = TransactionStrategy.getTransactionStrategy(true) ;
TransactionStrategy.setTransactionStrategy(new MockActiveTransactionStrategy()) ;
try
{
final Topic topic = new MockTopic(TOPIC_NAME) ;
final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
final JmsSession session = pool.getSession() ;
Assert.assertEquals("Session class", JmsXASession.class, session.getClass()) ;
final TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic, "testSessionTopicSubscriberRetry") ;
topicSubscriber.close() ;
setFault(wsmq, true) ;
try
{
session.createDurableSubscriber(topic, "testSessionTopicSubscriberRetry") ;
Assert.fail("Expected JmsConnectionFailureException") ;
}
catch (final JmsConnectionFailureException jmse) {} // expected
finally
{
setFault(wsmq, false) ;
}
pool.closeSession(session) ;
final JmsSession session2 = pool.getSession() ;
Assert.assertEquals("Session class", JmsXASession.class, session2.getClass()) ;
Assert.assertNotSame("Session class", session, session2) ;
}
finally
{
TransactionStrategy.setTransactionStrategy(transactionStrategy) ;
}
}
@Test
public void testXASessionMessageProducerRetry()
throws Exception
{
executeXASessionMessageProducerRetry(false) ;
}
@Test
public void testWSMQXASessionMessageProducerRetry()
throws Exception
{
executeXASessionMessageProducerRetry(true) ;
}
public void executeXASessionMessageProducerRetry(final boolean wsmq)
throws Exception
{
final TransactionStrategy transactionStrategy = TransactionStrategy.getTransactionStrategy(true) ;
TransactionStrategy.setTransactionStrategy(new MockActiveTransactionStrategy()) ;
try
{
final Queue queue = new MockQueue(QUEUE_NAME) ;
final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
final JmsSession session = pool.getSession() ;
Assert.assertEquals("Session class", JmsXASession.class, session.getClass()) ;
final MessageProducer messageProducer = session.createProducer(queue) ;
messageProducer.close() ;
setFault(wsmq, true) ;
try
{
session.createProducer(queue) ;
Assert.fail("Expected JmsConnectionFailureException") ;
}
catch (final JmsConnectionFailureException jmse) {} // expected
finally
{
setFault(wsmq, false) ;
}
pool.closeSession(session) ;
final JmsSession session2 = pool.getSession() ;
Assert.assertEquals("Session class", JmsXASession.class, session2.getClass()) ;
Assert.assertNotSame("Session class", session, session2) ;
}
finally
{
TransactionStrategy.setTransactionStrategy(transactionStrategy) ;
}
}
@Test
public void testXaSessionReuse()
throws Exception
{
final TransactionStrategy transactionStrategy = TransactionStrategy.getTransactionStrategy(true) ;
final MockJTATransactionStrategy jtaTransactionStrategy = new MockJTATransactionStrategy() ;
TransactionStrategy.setTransactionStrategy(jtaTransactionStrategy) ;
try
{
final int acknowledgeMode = JmsConnectionPool.XA_TRANSACTED ;
final JmsConnectionPool pool = new JmsConnectionPool(getPoolEnv()) ;
Assert.assertEquals("current pool free count", 0, pool.getFreeSessionsInPool(acknowledgeMode)) ;
Assert.assertEquals("current pool in use count", 0, pool.getInUseSessionsInPool(acknowledgeMode)) ;
jtaTransactionStrategy.begin() ;
final JmsSession firstSession = pool.getSession() ;
Assert.assertEquals("current pool free count", 0, pool.getFreeSessionsInPool(acknowledgeMode)) ;
Assert.assertEquals("current pool in use count", 1, pool.getInUseSessionsInPool(acknowledgeMode)) ;
jtaTransactionStrategy.terminateTransaction(true) ;
// make sure it is still in use
Assert.assertEquals("current pool free count", 0, pool.getFreeSessionsInPool(acknowledgeMode)) ;
Assert.assertEquals("current pool in use count", 1, pool.getInUseSessionsInPool(acknowledgeMode)) ;
// should throw exception, no transaction
try
{
firstSession.createProducer(null) ;
Assert.fail("exception expected, no active transaction") ;
}
catch (final JMSException jmse) {}
Assert.assertEquals("number of synchronizations", 0, jtaTransactionStrategy.numSynchronizations()) ;
Assert.assertEquals("number of resources", 0, jtaTransactionStrategy.numResources()) ;
jtaTransactionStrategy.begin() ;
// no exception as it should enlist
final MessageProducer firstProducer = firstSession.createProducer(null) ;
Assert.assertEquals("number of synchronizations", 1, jtaTransactionStrategy.numSynchronizations()) ;
Assert.assertEquals("number of resources", 1, jtaTransactionStrategy.numResources()) ;
final JmsSession secondSession = pool.getSession() ;
Assert.assertEquals("current pool free count", 0, pool.getFreeSessionsInPool(acknowledgeMode)) ;
Assert.assertEquals("current pool in use count", 1, pool.getInUseSessionsInPool(acknowledgeMode)) ;
final MessageProducer secondProducer = secondSession.createProducer(null) ;
Assert.assertEquals("number of synchronizations", 1, jtaTransactionStrategy.numSynchronizations()) ;
Assert.assertEquals("number of resources", 1, jtaTransactionStrategy.numResources()) ;
pool.closeSession(secondSession) ;
Assert.assertEquals("current pool free count", 0, pool.getFreeSessionsInPool(acknowledgeMode)) ;
Assert.assertEquals("current pool in use count", 1, pool.getInUseSessionsInPool(acknowledgeMode)) ;
secondProducer.close() ;
jtaTransactionStrategy.terminateTransaction(true) ;
// Shouldn't be back in the pool as there is still one reference open
Assert.assertEquals("current pool free count", 0, pool.getFreeSessionsInPool(acknowledgeMode)) ;
Assert.assertEquals("current pool in use count", 1, pool.getInUseSessionsInPool(acknowledgeMode)) ;
// should throw exception, no transaction
try
{
firstProducer.getDestination() ;
Assert.fail("exception expected, no active transaction") ;
}
catch (final JMSException jmse) {}
// close should be okay
firstProducer.close() ;
pool.closeSession(firstSession) ;
Assert.assertEquals("current pool free count", 1, pool.getFreeSessionsInPool(acknowledgeMode)) ;
Assert.assertEquals("current pool in use count", 0, pool.getInUseSessionsInPool(acknowledgeMode)) ;
}
finally
{
TransactionStrategy.setTransactionStrategy(transactionStrategy) ;
}
}
private Map<String, String> getPoolEnv()
{
final Map<String, String> env = new HashMap<String, String>() ;
env.put(JMSEpr.CONNECTION_FACTORY_TAG, CONNECTION_FACTORY);
return env ;
}
private void setFault(final boolean wsmq, final boolean value)
{
if (wsmq)
{
MockSessionInvocationHandler.throwWSMQFault = value ;
}
else
{
MockSessionInvocationHandler.throwFault = value ;
}
}
static class MockXAConnectionFactory implements XAConnectionFactory, ConnectionFactory
{
public XAConnection createXAConnection()
throws JMSException
{
return (XAConnection)Proxy.newProxyInstance(MockXAConnectionFactory.class.getClassLoader(), new Class[] {XAConnection.class},
new MockConnectionInvocationHandler()) ;
}
public XAConnection createXAConnection(final String user, final String password)
throws JMSException
{
return createXAConnection() ;
}
public Connection createConnection()
throws JMSException
{
return (Connection)Proxy.newProxyInstance(MockXAConnectionFactory.class.getClassLoader(), new Class[] {Connection.class},
new MockConnectionInvocationHandler()) ;
}
public Connection createConnection(final String user, final String password)
throws JMSException
{
return createConnection() ;
}
}
static final class MockConnectionInvocationHandler implements InvocationHandler
{
private ExceptionListener exceptionListener ;
static boolean throwFault ;
MockConnectionInvocationHandler()
{
}
public Object invoke(final Object proxy, final Method method, final Object[] args)
throws Throwable
{
final String methodName = method.getName() ;
if ("setExceptionListener".equals(methodName))
{
exceptionListener = (ExceptionListener)args[0] ;
return null ;
}
else if ("getExceptionListener".equals(methodName))
{
return exceptionListener ;
}
else if ("createSession".equals(methodName))
{
checkFault() ;
final Integer acknowledgeMode = (Integer)args[1] ;
return Proxy.newProxyInstance(MockConnectionInvocationHandler.class.getClassLoader(), new Class[] {Session.class},
new MockSessionInvocationHandler(acknowledgeMode)) ;
}
else if ("createXASession".equals(methodName))
{
checkFault() ;
return Proxy.newProxyInstance(MockConnectionInvocationHandler.class.getClassLoader(), new Class[] {XASession.class},
new MockSessionInvocationHandler(Session.SESSION_TRANSACTED)) ;
}
else if ("toString".equals(methodName))
{
return toString() ;
}
else if ("hashCode".equals(methodName))
{
return hashCode() ;
}
else if ("equals".equals(methodName))
{
return equals(args[0]) ;
}
else
{
System.out.println("Connection method " + method.getName() + " called") ;
return null ;
}
}
void fireExceptionListener(final JMSException exception)
{
if (exceptionListener != null)
{
exceptionListener.onException(exception) ;
}
}
private void checkFault()
throws JMSException
{
if (throwFault)
{
final JMSException exception = new JMSException("Test exception") ;
exception.initCause(new IllegalStateException("JMS IllegalStateException")) ;
// clear it down to allow retry
throwFault = false ;
throw exception ;
}
}
}
static final class MockSessionInvocationHandler implements InvocationHandler
{
private final Integer acknowledgeMode ;
static boolean throwFault ;
static boolean throwWSMQFault ;
static boolean changeAcknowledgeMode ;
MockSessionInvocationHandler(final Integer acknowledgeMode)
{
this.acknowledgeMode = acknowledgeMode ;
}
public Object invoke(final Object proxy, final Method method, final Object[] args)
throws Throwable
{
final String methodName = method.getName() ;
if ("getAcknowledgeMode".equals(methodName))
{
if (changeAcknowledgeMode)
{
return (acknowledgeMode + 1) & 3 ;
}
else
{
return acknowledgeMode ;
}
}
else if ("createBrowser".equals(methodName))
{
checkFault() ;
return Proxy.newProxyInstance(MockSessionInvocationHandler.class.getClassLoader(), new Class[] {QueueBrowser.class},
new MockNullInvocationHandler()) ;
}
else if ("createConsumer".equals(methodName))
{
checkFault() ;
return Proxy.newProxyInstance(MockSessionInvocationHandler.class.getClassLoader(), new Class[] {MessageConsumer.class},
new MockNullInvocationHandler()) ;
}
else if ("createDurableSubscriber".equals(methodName))
{
checkFault() ;
return Proxy.newProxyInstance(MockSessionInvocationHandler.class.getClassLoader(), new Class[] {TopicSubscriber.class},
new MockNullInvocationHandler()) ;
}
else if ("createProducer".equals(methodName))
{
checkFault() ;
return Proxy.newProxyInstance(MockSessionInvocationHandler.class.getClassLoader(), new Class[] {MessageProducer.class},
new MockNullInvocationHandler()) ;
}
else if ("toString".equals(methodName))
{
return toString() ;
}
else if ("hashCode".equals(methodName))
{
return hashCode() ;
}
else if ("equals".equals(methodName))
{
return equals(args[0]) ;
}
else
{
System.out.println("Session method " + method.getName() + " called") ;
return null ;
}
}
private void checkFault()
throws JMSException
{
if (throwFault)
{
final JMSException exception = new JMSException("Test exception") ;
exception.initCause(new IllegalStateException("JMS IllegalStateException")) ;
throw exception ;
}
else if (throwWSMQFault)
{
final JMSException exception = new JMSException("Test exception", WMQConnectionExceptionHandler.WMQ_ERROR_CODE_CONNECTION_FAILURE) ;
exception.setLinkedException(new Exception(WMQConnectionExceptionHandler.WMQ_LINKED_EXCEPTION_CONNECTION_FAILURE)) ;
throw exception ;
}
}
}
static final class MockNullInvocationHandler implements InvocationHandler
{
public Object invoke(final Object proxy, final Method method, final Object[] args)
throws Throwable
{
final String methodName = method.getName() ;
if ("hashCode".equals(methodName))
{
return System.identityHashCode(proxy) ;
}
else if ("equals".equals(methodName))
{
return proxy == args[0] ;
}
else
{
return null ;
}
}
}
private static final class MockActiveTransactionStrategy extends NullTransactionStrategy
{
@Override
public boolean isActive()
throws TransactionStrategyException
{
return true ;
}
@Override
public void registerSynchronization(final Synchronization sync)
throws TransactionStrategyException
{
}
@Override
public void enlistResource(final XAResource resource)
throws TransactionStrategyException
{
}
@Override
public Object getTransaction()
throws TransactionStrategyException
{
return this ;
}
}
private static final class MockJTATransactionStrategy extends NullTransactionStrategy
{
private Object tx ;
private ArrayList<XAResource> resources = new ArrayList<XAResource>() ;
private ArrayList<Synchronization> synchronizations = new ArrayList<Synchronization>() ;
public void begin()
throws TransactionStrategyException
{
tx = new Object() ;
}
@Override
public boolean isActive()
throws TransactionStrategyException
{
return (tx != null) ;
}
@Override
public void registerSynchronization(final Synchronization sync)
throws TransactionStrategyException
{
synchronizations.add(sync) ;
}
@Override
public void enlistResource(final XAResource resource)
throws TransactionStrategyException
{
resources.add(resource) ;
}
@Override
public Object getTransaction()
throws TransactionStrategyException
{
return tx ;
}
public void terminateTransaction(final boolean commit)
{
final int status = (commit ? Status.STATUS_COMMITTED : Status.STATUS_ROLLEDBACK) ;
for(Synchronization sync: synchronizations)
{
sync.beforeCompletion() ;
}
tx = null ;
resources.clear() ;
for(Synchronization sync: synchronizations)
{
sync.afterCompletion(status) ;
}
synchronizations.clear();
}
public int numSynchronizations()
{
return synchronizations.size() ;
}
public int numResources()
{
return resources.size() ;
}
}
public static junit.framework.Test suite()
{
return new JUnit4TestAdapter(JmsConnectionPoolUnitTest.class);
}
}