/*
* JBoss, Home of Professional Open Source
* Copyright 2006, JBoss Inc., and others contributors as indicated
* by the @authors tag. All rights reserved.
* See the copyright.txt in the distribution for a
* full listing of individual contributors.
* This copyrighted material is made available to anyone wishing to use,
* modify, copy, or redistribute it subject to the terms and conditions
* of the GNU Lesser General Public License, v. 2.1.
* This program is distributed in the hope that it will be useful, but WITHOUT A
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
* PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
* You should have received a copy of the GNU Lesser General Public License,
* v.2.1 along with this distribution; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
* MA 02110-1301, USA.
*
* (C) 2005-2006,
* @author daniel.brum@jboss.com
*/
package org.jboss.soa.esb.persistence.tests;
/**
* @author kstam
*
*
*/
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import java.io.File;
import java.net.URI;
import java.sql.Connection;
import java.sql.Statement;
import junit.framework.JUnit4TestAdapter;
import org.apache.log4j.Logger;
import org.apache.log4j.xml.DOMConfigurator;
import org.jboss.internal.soa.esb.couriers.MockCourier;
import org.jboss.internal.soa.esb.couriers.MockCourierFactory;
import org.jboss.internal.soa.esb.persistence.format.MessageStoreFactory;
import org.jboss.internal.soa.esb.services.registry.MockRegistry;
import org.jboss.soa.esb.Service;
import org.jboss.soa.esb.client.ServiceInvoker;
import org.jboss.soa.esb.common.Configuration;
import org.jboss.soa.esb.message.Message;
import org.jboss.soa.esb.message.format.MessageFactory;
import org.jboss.soa.esb.message.format.MessageType;
import org.jboss.soa.esb.persistence.manager.ConnectionManager;
import org.jboss.soa.esb.persistence.manager.ConnectionManagerFactory;
import org.jboss.soa.esb.services.persistence.MessageStore;
import org.jboss.soa.esb.services.persistence.MessageStoreException;
import org.jboss.soa.esb.services.persistence.RedeliverStore;
import org.jboss.soa.esb.testutils.HsqldbUtil;
import org.jboss.soa.esb.testutils.TestEnvironmentUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
public class RedeliverUnitTest
{
private static Logger log = Logger.getLogger(RedeliverUnitTest.class);
private static Object server;
/*
* Should succesfully deliver the message, and then remove the message from the store.
*/
@Test
public void redeliverOne() throws Exception
{
log.info("** redeliverOne");
RedeliverStore store = MessageStoreFactory.getInstance().getRedeliverStore();
assertEquals((store != null), true);
//first lets create an undeliverable message
Service service = new Service("cat", "service");
URI uid = createMessages(1, service, store);
store.redeliver(uid);
Message message = store.getMessage(uid);
//the message should no longer be in the store.
assertNull(message);
}
/*
* Adding a message to the store then, the selecting ALL and deliver it with one redeliver thread.
*/
@Test
public void redeliverOneThread() throws Exception
{
log.info("** redeliverOneThreads");
MessageStore store = MessageStoreFactory.getInstance().getMessageStore();
assertEquals((store != null), true);
//first lets create an undeliverable message
Service service = new Service("cat", "service");
URI uid = createMessages(1, service, store);
RedeliverClient redeliverClient = new RedeliverClient();
Thread t1 = new Thread(redeliverClient, "CLIENT-1");
t1.start();
long deadline=System.currentTimeMillis() + 20000;
boolean waiting=true;
while(waiting) {
if (redeliverClient.isDone()) {
waiting=false;
} else if (System.currentTimeMillis() > deadline){
//We're timing out for some reason. This is bad.
assertTrue(false);
}
}
assertEquals(redeliverClient.getCount(),1);
Message message = store.getMessage(uid);
assertNull(message);
}
/*
* Adding a message to the store then, the selecting ALL and deliver it with two redeliver threads.
*/
@Test
public void redeliverTwoThreads() throws Exception
{
log.info("** redeliverTwoThreads");
//first lets create an undeliverable message
MessageStore store = MessageStoreFactory.getInstance().getMessageStore();
assertEquals((store != null), true);
//first lets create an undeliverable message
Service service = new Service("cat", "service");
URI uid = createMessages(1, service, store);
RedeliverClient client1 = new RedeliverClient();
RedeliverClient client2 = new RedeliverClient();
Thread t1 = new Thread(client1, "CLIENT-1");
Thread t2 = new Thread(client2, "CLIENT-2");
t1.start();
t2.start();
long deadline=System.currentTimeMillis() + 20000;
boolean waiting=true;
while(waiting) {
if (client1.isDone() && client2.isDone()) {
waiting=false;
} else if (System.currentTimeMillis() > deadline){
//We're timing out for some reason. This is bad.
assertTrue(false);
}
}
log.info("Client1 processed " + client1.getCount() + " messages");
log.info("Client2 processed " + client2.getCount() + " messages");
//only *one* message should be delivered
assertEquals(1, client1.getCount() + client2.getCount());
//the message is delivered so should no longer be in the store
Message message = store.getMessage(uid);
assertNull(message);
}
@Test
public void redeliverFiftyTwoThreads() throws Exception
{
int numberOfMessages=50;
log.info("** redeliverFiftyTwoThreads");
//first lets create an undeliverable message
MessageStore store = MessageStoreFactory.getInstance().getMessageStore();
assertEquals((store != null), true);
//first lets create an undeliverable message
Service service = new Service("cat", "service");
URI uid = createMessages(numberOfMessages, service, store);
RedeliverClient client1 = new RedeliverClient();
RedeliverClient client2 = new RedeliverClient();
Thread t1 = new Thread(client1, "CLIENT-1");
Thread t2 = new Thread(client2, "CLIENT-2");
t1.start();
t2.start();
final long deadline = System.currentTimeMillis() + 20000 ;
waitForThread(t1, deadline) ;
waitForThread(t2, deadline) ;
if (!client1.isDone() || !client2.isDone()) {
//We're timing out for some reason. This is bad.
fail("Timeout waiting for client threads to terminate");
}
log.info("Client1 processed " + client1.getCount() + " messages");
log.info("Client2 processed " + client2.getCount() + " messages");
assertEquals(numberOfMessages, client1.getCount() + client2.getCount());
//make sure the last message was send (and removed from the store)
Message message = store.getMessage(uid);
assertNull(message);
}
private static final void waitForThread(final Thread th, final long deadline) {
final long period = deadline - System.currentTimeMillis() ;
if (period > 0) {
try {
th.join(period) ;
} catch (final InterruptedException ie) {
Thread.currentThread().interrupt() ;
}
}
}
/*
* Deliver to undeliverable Service
*/
@Test
public void redeliverOneUnreachable() throws Exception
{
log.info("** redeliverOneUnreachable");
RedeliverStore store = MessageStoreFactory.getInstance().getRedeliverStore();
assertEquals((store != null), true);
//first lets create an undeliverable message
Service service = new Service("cat", "unreachable_service");
URI uid = createMessages(1, service, store);
int maxRedeliveryCount = store.getMaxRedeliverCount();
for (int i=0; i<maxRedeliveryCount; i++) {
store.redeliver(uid);
Message message = store.getMessage(uid);
//the message should have a redeliveryCount of i+1
assertNotNull(message);
Integer deliverCount = (Integer) message.getProperties().getProperty(RedeliverStore.DELIVER_COUNT);
assertEquals(deliverCount, i+1);
}
//Now redelivering one more time should put it in the DLQ
store.redeliver(uid);
Message message = store.getMessage(uid, MessageStore.CLASSIFICATION_DLQ);
assertNotNull(message);
message = store.getMessage(uid, MessageStore.CLASSIFICATION_RDLVR);
assertNull(message);
}
private URI createMessages(int numberOfMessages, Service service, MessageStore store) throws MessageStoreException
{
URI uid=null;
for (int i=0; i<numberOfMessages; i++) {
Message msg = MessageFactory.getInstance().getMessage(MessageType.JAVA_SERIALIZED);
assertEquals((msg != null), true);
msg.getProperties().setProperty(ServiceInvoker.DELIVER_TO, service);
uid = store.addMessage(msg, MessageStore.CLASSIFICATION_RDLVR);
}
return uid;
}
@BeforeClass
public static void runBeforeAllTests()
{
MockCourierFactory.install();
MockRegistry.install();
MockCourier courier1 = new MockCourier(true);
MockRegistry.register("cat", "service", courier1);
MockCourier courier2 = new MockCourier(false);
MockRegistry.register("cat", "unreachable_service", courier2);
try {
File testResourceDir = TestEnvironmentUtil.findResourceDirectory("./product/services/jbossesb/src/test/resources/");
System.out.println("Current dir=" + testResourceDir.getCanonicalPath());
DOMConfigurator.configure(testResourceDir.getCanonicalPath() + "/log4j.xml");
File buildDir = TestEnvironmentUtil.findResourceDirectory("./product/services/jbossesb/build/");
File resourceDir = TestEnvironmentUtil.findResourceDirectory("./product/services/jbossesb/src/main/resources/");
System.setProperty("org.jboss.soa.esb.propertyFile", "jbossesb-unittest-properties.xml");
if ("org.hsqldb.jdbcDriver".equals(Configuration.getStoreDriver())) {
final String databaseFile = buildDir + "/hsqltestdb";
HsqldbUtil.dropHsqldb(databaseFile);
server = HsqldbUtil.startHsqldb(databaseFile, "jbossesb");
//Drop what is there now, if exists. We want to start fresh.
String sqlCreateCmd = TestEnvironmentUtil.readTextFile(new File(resourceDir.getCanonicalPath() + "/message-store-sql/hsqldb/create_database.sql"));
String sqlDropCmd = TestEnvironmentUtil.readTextFile(new File(resourceDir.getAbsolutePath() + "/message-store-sql/hsqldb/drop_database.sql"));
ConnectionManager mgr = ConnectionManagerFactory.getConnectionManager();
mgr.init();
Connection con = mgr.getConnection();
Statement stmnt = con.createStatement();
System.out.println("Dropping the schema if exist");
stmnt.execute(sqlDropCmd);
System.out.println("Creating the message store schema");
stmnt.execute(sqlCreateCmd);
} else if ("com.mysql.jdbc.Driver".equals(Configuration.getStoreDriver())) {
String sqlCreateCmd = TestEnvironmentUtil.readTextFile(new File(resourceDir.getCanonicalPath() + "/message-store-sql/mysql/create_database.sql"));
String sqlDropCmd = TestEnvironmentUtil.readTextFile(new File(resourceDir.getCanonicalPath() + "/message-store-sql/mysql/drop_database.sql"));
ConnectionManager mgr = ConnectionManagerFactory.getConnectionManager();
mgr.init();
Connection con = mgr.getConnection();
Statement stmnt = con.createStatement();
System.out.println("Dropping the schema if exist");
stmnt.execute(sqlDropCmd);
System.out.println("Creating the message store schema");
stmnt.execute(sqlCreateCmd);
} else if ("org.postgresql.Driver".equals(Configuration.getStoreDriver())) {
String sqlCreateCmd = TestEnvironmentUtil.readTextFile(new File(resourceDir.getCanonicalPath() + "/message-store-sql/postgresql/create_database.sql"));
String sqlDropCmd = TestEnvironmentUtil.readTextFile(new File(resourceDir.getCanonicalPath() + "/message-store-sql/postgresql/drop_database.sql"));
ConnectionManager mgr = ConnectionManagerFactory.getConnectionManager();
mgr.init();
Connection con = mgr.getConnection();
Statement stmnt = con.createStatement();
System.out.println("Dropping the schema if exist");
try {
stmnt.execute(sqlDropCmd);
} catch (Exception e) {}
System.out.println("Creating the message store schema");
stmnt.execute(sqlCreateCmd);
}
} catch (Throwable e) {
e.printStackTrace();
System.out.println("We should stop testing, since we don't have a db.");
assertTrue(false);
}
}
@AfterClass
public static void runAfterAllTests ()
{
try
{
if (Configuration.getStoreDriver().equals("org.hsqldb.jdbcDriver"))
HsqldbUtil.stopHsqldb(server);
}
catch (Exception e)
{ //
log.error( e );
}
MockRegistry.uninstall();
MockCourierFactory.uninstall();
}
public static junit.framework.Test suite ()
{
return new JUnit4TestAdapter(RedeliverUnitTest.class);
}
}