Package org.jboss.soa.esb.persistence.tests

Source Code of org.jboss.soa.esb.persistence.tests.RedeliverUnitTest

/*
* JBoss, Home of Professional Open Source
* Copyright 2006, JBoss Inc., and others contributors as indicated
* by the @authors tag. All rights reserved.
* See the copyright.txt in the distribution for a
* full listing of individual contributors.
* This copyrighted material is made available to anyone wishing to use,
* modify, copy, or redistribute it subject to the terms and conditions
* of the GNU Lesser General Public License, v. 2.1.
* This program is distributed in the hope that it will be useful, but WITHOUT A
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
* PARTICULAR PURPOSE.  See the GNU Lesser General Public License for more details.
* You should have received a copy of the GNU Lesser General Public License,
* v.2.1 along with this distribution; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
* MA  02110-1301, USA.
*
* (C) 2005-2006,
* @author daniel.brum@jboss.com
*/

package org.jboss.soa.esb.persistence.tests;

/**
* @author kstam
*
*
*/
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;

import java.io.File;
import java.net.URI;
import java.sql.Connection;
import java.sql.Statement;

import junit.framework.JUnit4TestAdapter;

import org.apache.log4j.Logger;
import org.apache.log4j.xml.DOMConfigurator;
import org.jboss.internal.soa.esb.couriers.MockCourier;
import org.jboss.internal.soa.esb.couriers.MockCourierFactory;
import org.jboss.internal.soa.esb.persistence.format.MessageStoreFactory;
import org.jboss.internal.soa.esb.services.registry.MockRegistry;
import org.jboss.soa.esb.Service;
import org.jboss.soa.esb.client.ServiceInvoker;
import org.jboss.soa.esb.common.Configuration;
import org.jboss.soa.esb.message.Message;
import org.jboss.soa.esb.message.format.MessageFactory;
import org.jboss.soa.esb.message.format.MessageType;
import org.jboss.soa.esb.persistence.manager.ConnectionManager;
import org.jboss.soa.esb.persistence.manager.ConnectionManagerFactory;
import org.jboss.soa.esb.services.persistence.MessageStore;
import org.jboss.soa.esb.services.persistence.MessageStoreException;
import org.jboss.soa.esb.services.persistence.RedeliverStore;
import org.jboss.soa.esb.testutils.HsqldbUtil;
import org.jboss.soa.esb.testutils.TestEnvironmentUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;

public class RedeliverUnitTest
{

  private static Logger log = Logger.getLogger(RedeliverUnitTest.class);
 
  private static Object server;
  
    /*
     * Should succesfully deliver the message, and then remove the message from the store.
     */
    @Test
  public void redeliverOne() throws Exception
  {
        log.info("** redeliverOne");
        RedeliverStore store = MessageStoreFactory.getInstance().getRedeliverStore();
        assertEquals((store != null), true);
    //first lets create an undeliverable message
        Service service = new Service("cat", "service");
        URI uid = createMessages(1, service, store);
   
        store.redeliver(uid);
        Message message = store.getMessage(uid);
        //the message should no longer be in the store.
        assertNull(message);
  }
    /*
     * Adding a message to the store then, the selecting ALL and deliver it with one redeliver thread.
     */
    @Test
    public void redeliverOneThread() throws Exception
    {
        log.info("** redeliverOneThreads");
        MessageStore store = MessageStoreFactory.getInstance().getMessageStore();
        assertEquals((store != null), true);
        //first lets create an undeliverable message
        Service service = new Service("cat", "service");
        URI uid = createMessages(1, service, store);
        RedeliverClient redeliverClient = new RedeliverClient();
        Thread t1 = new Thread(redeliverClient, "CLIENT-1");
        t1.start();
       
        long deadline=System.currentTimeMillis() + 20000;
        boolean waiting=true;
        while(waiting) {
            if (redeliverClient.isDone()) {
                waiting=false;
            } else if (System.currentTimeMillis() > deadline){
                //We're timing out for some reason. This is bad.
                assertTrue(false);
            }
        }
        assertEquals(redeliverClient.getCount(),1);
        Message message = store.getMessage(uid)
        assertNull(message);
    }
    /*
     * Adding a message to the store then, the selecting ALL and deliver it with two redeliver threads.
     */
    @Test
    public void redeliverTwoThreads() throws Exception
    {
        log.info("** redeliverTwoThreads");
        //first lets create an undeliverable message
        MessageStore store = MessageStoreFactory.getInstance().getMessageStore();
        assertEquals((store != null), true);
        //first lets create an undeliverable message
        Service service = new Service("cat", "service");
        URI uid = createMessages(1, service, store);
        RedeliverClient client1 = new RedeliverClient();
        RedeliverClient client2 = new RedeliverClient();
        Thread t1 = new Thread(client1, "CLIENT-1");
        Thread t2 = new Thread(client2, "CLIENT-2");
        t1.start();
        t2.start();
       
        long deadline=System.currentTimeMillis() + 20000;
        boolean waiting=true;
        while(waiting) {
           
            if (client1.isDone() && client2.isDone()) {
                waiting=false;
            } else if (System.currentTimeMillis() > deadline){
                //We're timing out for some reason. This is bad.
                assertTrue(false);
            }
        }
        log.info("Client1 processed " + client1.getCount() + " messages");
        log.info("Client2 processed " + client2.getCount() + " messages");
        //only *one* message should be delivered
        assertEquals(1, client1.getCount() + client2.getCount());
        //the  message is delivered so should no longer be in the store
        Message message = store.getMessage(uid)
        assertNull(message);
    }
   
    @Test
    public void redeliverFiftyTwoThreads() throws Exception
    {
        int numberOfMessages=50;
        log.info("** redeliverFiftyTwoThreads");
        //first lets create an undeliverable message
        MessageStore store = MessageStoreFactory.getInstance().getMessageStore();
        assertEquals((store != null), true);
        //first lets create an undeliverable message
        Service service = new Service("cat", "service");
        URI uid = createMessages(numberOfMessages, service, store);
        RedeliverClient client1 = new RedeliverClient();
        RedeliverClient client2 = new RedeliverClient();
        Thread t1 = new Thread(client1, "CLIENT-1");
        Thread t2 = new Thread(client2, "CLIENT-2");
        t1.start();
        t2.start();
       
        final long deadline = System.currentTimeMillis() + 20000 ;
        waitForThread(t1, deadline) ;
        waitForThread(t2, deadline) ;
        if (!client1.isDone() || !client2.isDone()) {
            //We're timing out for some reason. This is bad.
            fail("Timeout waiting for client threads to terminate");
        }
        log.info("Client1 processed " + client1.getCount() + " messages");
        log.info("Client2 processed " + client2.getCount() + " messages");
        assertEquals(numberOfMessages, client1.getCount() + client2.getCount());
        //make sure the last message was send (and removed from the store)
        Message message = store.getMessage(uid)
        assertNull(message);
    }
   
    private static final void waitForThread(final Thread th, final long deadline) {
        final long period = deadline - System.currentTimeMillis() ;
        if (period > 0) {
            try {
                th.join(period) ;
            } catch (final InterruptedException ie) {
                Thread.currentThread().interrupt() ;
            }
        }
    }
    /*
     * Deliver to undeliverable Service
     */
    @Test
    public void redeliverOneUnreachable() throws Exception
    {
        log.info("** redeliverOneUnreachable");
        RedeliverStore store = MessageStoreFactory.getInstance().getRedeliverStore();
        assertEquals((store != null), true);
        //first lets create an undeliverable message
        Service service = new Service("cat", "unreachable_service");
        URI uid = createMessages(1, service, store);
       
        int maxRedeliveryCount = store.getMaxRedeliverCount();
        for (int i=0; i<maxRedeliveryCount; i++) {
            store.redeliver(uid);
            Message message = store.getMessage(uid);
            //the message should have a redeliveryCount of i+1
            assertNotNull(message);
            Integer deliverCount = (Integer) message.getProperties().getProperty(RedeliverStore.DELIVER_COUNT);
            assertEquals(deliverCount, i+1);
        }
        //Now redelivering one more time should put it in the DLQ
        store.redeliver(uid);
        Message message = store.getMessage(uid, MessageStore.CLASSIFICATION_DLQ);
        assertNotNull(message);
        message = store.getMessage(uid, MessageStore.CLASSIFICATION_RDLVR);
        assertNull(message);
    }
   
   
    private URI createMessages(int numberOfMessages, Service service, MessageStore store) throws MessageStoreException
    {
        URI uid=null;
        for (int i=0; i<numberOfMessages; i++) {
            Message msg = MessageFactory.getInstance().getMessage(MessageType.JAVA_SERIALIZED);
            assertEquals((msg != null), true);
            msg.getProperties().setProperty(ServiceInvoker.DELIVER_TO, service);
            uid = store.addMessage(msg, MessageStore.CLASSIFICATION_RDLVR);
        }
        return uid;
    }
   
    @BeforeClass
    public static void runBeforeAllTests()
    {
        MockCourierFactory.install();
        MockRegistry.install();
        MockCourier courier1 = new MockCourier(true);
        MockRegistry.register("cat", "service", courier1);
        MockCourier courier2 = new MockCourier(false);
        MockRegistry.register("cat", "unreachable_service", courier2);
       
        try {
            File testResourceDir = TestEnvironmentUtil.findResourceDirectory("./product/services/jbossesb/src/test/resources/");
            System.out.println("Current dir=" + testResourceDir.getCanonicalPath());
            DOMConfigurator.configure(testResourceDir.getCanonicalPath() + "/log4j.xml");
            File buildDir = TestEnvironmentUtil.findResourceDirectory("./product/services/jbossesb/build/");
            File resourceDir = TestEnvironmentUtil.findResourceDirectory("./product/services/jbossesb/src/main/resources/");
            System.setProperty("org.jboss.soa.esb.propertyFile", "jbossesb-unittest-properties.xml");                   
            if ("org.hsqldb.jdbcDriver".equals(Configuration.getStoreDriver())) {
                final String databaseFile = buildDir + "/hsqltestdb";
                HsqldbUtil.dropHsqldb(databaseFile);
                server = HsqldbUtil.startHsqldb(databaseFile, "jbossesb");
               
                //Drop what is there now, if exists. We want to start fresh.               
                String sqlCreateCmd    = TestEnvironmentUtil.readTextFile(new File(resourceDir.getCanonicalPath() + "/message-store-sql/hsqldb/create_database.sql"));
                String sqlDropCmd      = TestEnvironmentUtil.readTextFile(new File(resourceDir.getAbsolutePath() + "/message-store-sql/hsqldb/drop_database.sql"));
               
                ConnectionManager mgr = ConnectionManagerFactory.getConnectionManager();
                mgr.init();
                Connection con = mgr.getConnection();
                Statement stmnt = con.createStatement();
                System.out.println("Dropping the schema if exist");
                stmnt.execute(sqlDropCmd);
                System.out.println("Creating the message store schema");
                stmnt.execute(sqlCreateCmd);
            } else if ("com.mysql.jdbc.Driver".equals(Configuration.getStoreDriver())) {
               
                String sqlCreateCmd    = TestEnvironmentUtil.readTextFile(new File(resourceDir.getCanonicalPath() + "/message-store-sql/mysql/create_database.sql"));
                String sqlDropCmd      = TestEnvironmentUtil.readTextFile(new File(resourceDir.getCanonicalPath() + "/message-store-sql/mysql/drop_database.sql"));
               
                ConnectionManager mgr = ConnectionManagerFactory.getConnectionManager();
                mgr.init();
                Connection con = mgr.getConnection();
                Statement stmnt = con.createStatement();
                System.out.println("Dropping the schema if exist");
                stmnt.execute(sqlDropCmd);
                System.out.println("Creating the message store schema");
                stmnt.execute(sqlCreateCmd);
            } else if ("org.postgresql.Driver".equals(Configuration.getStoreDriver())) {
               
                String sqlCreateCmd    = TestEnvironmentUtil.readTextFile(new File(resourceDir.getCanonicalPath() + "/message-store-sql/postgresql/create_database.sql"));
                String sqlDropCmd      = TestEnvironmentUtil.readTextFile(new File(resourceDir.getCanonicalPath() + "/message-store-sql/postgresql/drop_database.sql"));
               
                ConnectionManager mgr = ConnectionManagerFactory.getConnectionManager();
                mgr.init();
                Connection con = mgr.getConnection();
                Statement stmnt = con.createStatement();
                System.out.println("Dropping the schema if exist");
                try {
                    stmnt.execute(sqlDropCmd);
                } catch (Exception e) {}
                System.out.println("Creating the message store schema");
                stmnt.execute(sqlCreateCmd);
            }
        } catch (Throwable e) {
            e.printStackTrace();
            System.out.println("We should stop testing, since we don't have a db.");
            assertTrue(false);
        }
       
    }
  @AfterClass
  public static void runAfterAllTests ()
  {
       
    try
    {
      if (Configuration.getStoreDriver().equals("org.hsqldb.jdbcDriver"))
        HsqldbUtil.stopHsqldb(server);
    }
    catch (Exception e)
    { //
      log.error( e );
    }
       
         MockRegistry.uninstall();
         MockCourierFactory.uninstall();
  }

  public static junit.framework.Test suite ()
  {
    return new JUnit4TestAdapter(RedeliverUnitTest.class);
  }

}
TOP

Related Classes of org.jboss.soa.esb.persistence.tests.RedeliverUnitTest

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.