package org.activemq.usecases;
import javax.jms.*;
import junit.framework.TestCase;
import org.activemq.ActiveMQConnectionFactory;
import org.activemq.ActiveMQConnection;
import org.activemq.util.IdGenerator;
import org.activemq.store.jdbc.JDBCPersistenceAdapter;
import org.activemq.store.PersistenceAdapter;
import org.activemq.io.impl.DefaultWireFormat;
import org.activemq.broker.impl.BrokerContainerImpl;
import org.activemq.broker.impl.BrokerConnectorImpl;
import org.activemq.message.ActiveMQQueue;
import org.apache.derby.jdbc.EmbeddedDataSource;
import java.util.HashMap;
import java.util.Collections;
import java.util.Map;
/**
* This Unit Test is created to test the memory leakage when a new producer is created and closed for each message that was sent.
* The error occured after sending and receiving messages 65536 times.
* This test validate if the messages sent by producer is being received by the consumer.
* A new producer is created and closed for each message that was sent.
* The said procedure is done fore more than 65536 times.
*/
public final class NoDestinationErrorTest extends TestCase {
public static final String ACTIVEMQ_SERVER = "ActiveMQ Server";
public static final boolean TRANSACTED_FALSE = false;
public static final String TOOL_DEFAULT = "TOOL.DEFAULT";
public static final String LAST_MESSAGE = "LAST";
private Session producerSession;
private Session consumerSession;
private Destination queue;
private MessageProducer producer;
private MessageConsumer consumer;
private int producerCount = 0;
private int consumerCount = 0;
private int subjectCount = 0;
private int messageCount = 0;
private String brokerUrl = "tcp://localhost:61616";
private boolean isEmbeddedBroker = true;
public static Map ProducerMap = Collections.synchronizedMap(new HashMap());
private static int msgCounter = 0;
private boolean init = false;
private boolean testStillRunning = true;
private BrokerContainerImpl broker = null;
private boolean isDurable = true;
private boolean isTopic = false;
private boolean isPersistent = true;
public NoDestinationErrorTest() {
super();
}
/**
* Sets up the resources of the unit test.
*
* @throws Exception
*/
protected void setUp() throws Exception {
//Set up the broker
createBroker();
}
/**
* Clears up the resources used in the unit test.
*/
protected void tearDown() throws Exception {
//Shut down the broker
destoryBroker();
}
public void doTest() throws Exception {
Connection connection = createConnectionFactory(brokerUrl, isEmbeddedBroker);
connection.start();
Session session = createSession(connection, TRANSACTED_FALSE);
Destination destination = createDestination(session, "subject", isTopic);
consumer = session.createConsumer(destination);
for (int i=0; i<70000; i++) {
TextMessage sentMessage;
sentMessage = session.createTextMessage("message " + i);
producer = session.createProducer(destination);
producer.send(sentMessage);
producer.close();
TextMessage rcvMessage;
rcvMessage = (TextMessage)consumer.receive(i);
String message = rcvMessage.getText();
}
connection.close();
}
/**
* Sets up and starts the broker.
*/
private void createBroker() throws Exception {
broker = new BrokerContainerImpl("localhost");
broker.addConnector(new BrokerConnectorImpl(broker, "vm://localhost", new DefaultWireFormat()));
broker.setPersistenceAdapter(createPersistenceAdapter());
broker.start();
}
/**
* Closes the broker.
*/
private void destoryBroker() throws Exception {
if (broker != null) {
broker.stop();
}
}
/**
* Creates the connection to the broker.
*
* @param url - broker url.
* @param embeddedBroker - true if an embedded broker will be used.
* @return Connection - broker connection.
*/
private static Connection createConnectionFactory(String url,
boolean embeddedBroker) throws JMSException {
//Used to create a session from the default MQ server ActiveMQConnectionFactory.
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(url);
if (embeddedBroker) {
factory.setUseEmbeddedBroker(true);
}
factory.setTurboBoost(true);
ActiveMQConnection c = (ActiveMQConnection) factory.createConnection();
c.getPrefetchPolicy().setQueuePrefetch(1000);
c.getPrefetchPolicy().setQueueBrowserPrefetch(1000);
c.getPrefetchPolicy().setTopicPrefetch(1000);
c.getPrefetchPolicy().setDurableTopicPrefetch(1000);
return c;
}
/**
* Returns the persistence adapter.
* Sets up the testing database to be used when the messages are persistent.
* It attempts to recreate the tables everytime the test is executed.
*
* @return PersistenceAdapter - persistence adapter.
*/
protected PersistenceAdapter createPersistenceAdapter() {
EmbeddedDataSource ds = new EmbeddedDataSource();
ds.setDatabaseName("testdb");
if (!init) {
ds.setCreateDatabase("create");
}
JDBCPersistenceAdapter persistenceAdapter = new JDBCPersistenceAdapter(ds, new DefaultWireFormat());
if (!init) {
persistenceAdapter.setDropTablesOnStartup(true);
}
init = true;
return persistenceAdapter;
}
/**
* Creates the connection session.
*
* @param connection - broker connection.
* @param isTransacted - true if the session will be session transacted.
* otherwise the the session will be using auto acknowledge.
* @return Session - connection session.
*/
private static Session createSession(Connection connection,
boolean isTransacted) throws JMSException {
if (isTransacted) {
return connection.createSession(true, Session.SESSION_TRANSACTED);
} else {
return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
}
}
/**
* Creates the session destination.
*
* @param session - connection session.
* @param subject - destination name.
* @param isTopic - true if the destination is a topic,
* otherwise the destination is a queue.
* @return Destination - session destination.
*/
private static Destination createDestination(Session session,
String subject,
boolean isTopic) throws JMSException {
return session.createQueue(subject);
}
public void testSendReceive() throws Exception {
doTest();
}
}