Package org.hornetq.jms.tests

Source Code of org.hornetq.jms.tests.PersistenceTest

/*
* Copyright 2009 Red Hat, Inc.
* Red Hat licenses this file to you under the Apache License, version
* 2.0 (the "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*    http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.  See the License for the specific language governing
* permissions and limitations under the License.
*/

package org.hornetq.jms.tests;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.hornetq.jms.tests.util.ProxyAssertSupport;

/**
*
* A PersistenceTest

* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @version <tt>$Revision: 8611 $</tt>
*
* $Id: PersistenceTest.java 8611 2009-12-08 01:06:31Z timfox $
*
*/
public class PersistenceTest extends JMSTestCase
{
   // Constants -----------------------------------------------------

   // Static --------------------------------------------------------

   // Attributes ----------------------------------------------------

   // Constructors --------------------------------------------------

   // TestCase overrides -------------------------------------------

   // Public --------------------------------------------------------

   /**
    * Test that the messages in a persistent queue survive starting and stopping and server,
    *
    */
   public void testQueuePersistence() throws Exception
   {
      Connection conn = null;

      try
      {
         conn = JMSTestCase.cf.createConnection();
         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageProducer prod = sess.createProducer(HornetQServerTestCase.queue1);
         prod.setDeliveryMode(DeliveryMode.PERSISTENT);

         for (int i = 0; i < 10; i++)
         {
            TextMessage tm = sess.createTextMessage("message" + i);
            prod.send(tm);
         }

         conn.close();

         stop();

         startNoDelete();

         // HornetQ server restart implies new ConnectionFactory lookup
         deployAndLookupAdministeredObjects();

         conn = JMSTestCase.cf.createConnection();
         sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
         conn.start();
         MessageConsumer cons = sess.createConsumer(HornetQServerTestCase.queue1);
         for (int i = 0; i < 10; i++)
         {
            TextMessage tm = (TextMessage)cons.receive(3000);

            ProxyAssertSupport.assertNotNull(tm);
            if (tm == null)
            {
               break;
            }
            ProxyAssertSupport.assertEquals("message" + i, tm.getText());
         }
      }
      finally
      {
         if (conn != null)
         {
            conn.close();
         }
      }
   }

   /**
    * Test that the JMSRedelivered and delivery count survives a restart
    *
    */
   public void testJMSRedeliveredRestart() throws Exception
   {
      Connection conn = null;

      try
      {
         conn = JMSTestCase.cf.createConnection();
         Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageProducer prod = sess.createProducer(HornetQServerTestCase.queue1);
         prod.setDeliveryMode(DeliveryMode.PERSISTENT);

         for (int i = 0; i < 10; i++)
         {
            TextMessage tm = sess.createTextMessage("message" + i);
            prod.send(tm);
         }

         Session sess2 = conn.createSession(true, Session.SESSION_TRANSACTED);

         MessageConsumer cons = sess2.createConsumer(HornetQServerTestCase.queue1);

         conn.start();

         for (int i = 0; i < 10; i++)
         {
            TextMessage tm = (TextMessage)cons.receive(1000);

            ProxyAssertSupport.assertNotNull(tm);

            ProxyAssertSupport.assertEquals("message" + i, tm.getText());

            ProxyAssertSupport.assertFalse(tm.getJMSRedelivered());

            ProxyAssertSupport.assertEquals(1, tm.getIntProperty("JMSXDeliveryCount"));
         }

         // rollback
         sess2.rollback();

         for (int i = 0; i < 10; i++)
         {
            TextMessage tm = (TextMessage)cons.receive(1000);

            ProxyAssertSupport.assertNotNull(tm);

            ProxyAssertSupport.assertEquals("message" + i, tm.getText());

            ProxyAssertSupport.assertTrue(tm.getJMSRedelivered());

            ProxyAssertSupport.assertEquals(2, tm.getIntProperty("JMSXDeliveryCount"));
         }

         conn.close();

         stop();

         startNoDelete();

         // HornetQ server restart implies new ConnectionFactory lookup
         deployAndLookupAdministeredObjects();

         conn = JMSTestCase.cf.createConnection();
         sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
         conn.start();
         cons = sess.createConsumer(HornetQServerTestCase.queue1);
         for (int i = 0; i < 10; i++)
         {
            TextMessage tm = (TextMessage)cons.receive(3000);

            ProxyAssertSupport.assertNotNull(tm);

            ProxyAssertSupport.assertEquals("message" + i, tm.getText());

            ProxyAssertSupport.assertTrue(tm.getJMSRedelivered());

            ProxyAssertSupport.assertEquals(3, tm.getIntProperty("JMSXDeliveryCount"));
         }
      }
      finally
      {
         if (conn != null)
         {
            conn.close();
         }
      }
   }

   /**
    * First test that message order survives a restart
    */
   public void testMessageOrderPersistence_1() throws Exception
   {
      Connection conn = null;

      try
      {
         conn = JMSTestCase.cf.createConnection();
         Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageProducer prod = sessSend.createProducer(HornetQServerTestCase.queue1);

         TextMessage m0 = sessSend.createTextMessage("a");
         TextMessage m1 = sessSend.createTextMessage("b");
         TextMessage m2 = sessSend.createTextMessage("c");
         TextMessage m3 = sessSend.createTextMessage("d");
         TextMessage m4 = sessSend.createTextMessage("e");
         TextMessage m5 = sessSend.createTextMessage("f");
         TextMessage m6 = sessSend.createTextMessage("g");
         TextMessage m7 = sessSend.createTextMessage("h");
         TextMessage m8 = sessSend.createTextMessage("i");
         TextMessage m9 = sessSend.createTextMessage("j");

         prod.send(m0, DeliveryMode.PERSISTENT, 0, 0);
         prod.send(m1, DeliveryMode.PERSISTENT, 1, 0);
         prod.send(m2, DeliveryMode.PERSISTENT, 2, 0);
         prod.send(m3, DeliveryMode.PERSISTENT, 3, 0);
         prod.send(m4, DeliveryMode.PERSISTENT, 4, 0);
         prod.send(m5, DeliveryMode.PERSISTENT, 5, 0);
         prod.send(m6, DeliveryMode.PERSISTENT, 6, 0);
         prod.send(m7, DeliveryMode.PERSISTENT, 7, 0);
         prod.send(m8, DeliveryMode.PERSISTENT, 8, 0);
         prod.send(m9, DeliveryMode.PERSISTENT, 9, 0);

         conn.close();

         stop();

         startNoDelete();

         // HornetQ server restart implies new ConnectionFactory lookup
         deployAndLookupAdministeredObjects();

         conn = JMSTestCase.cf.createConnection();
         Session sessReceive = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
         conn.start();
         MessageConsumer cons = sessReceive.createConsumer(HornetQServerTestCase.queue1);

         {
            TextMessage t = (TextMessage)cons.receive(1000);
            ProxyAssertSupport.assertNotNull(t);
            ProxyAssertSupport.assertEquals("j", t.getText());
         }
         {
            TextMessage t = (TextMessage)cons.receive(1000);
            ProxyAssertSupport.assertNotNull(t);
            ProxyAssertSupport.assertEquals("i", t.getText());
         }
         {
            TextMessage t = (TextMessage)cons.receive(1000);
            ProxyAssertSupport.assertNotNull(t);
            ProxyAssertSupport.assertEquals("h", t.getText());
         }
         {
            TextMessage t = (TextMessage)cons.receive(1000);
            ProxyAssertSupport.assertNotNull(t);
            ProxyAssertSupport.assertEquals("g", t.getText());
         }
         {
            TextMessage t = (TextMessage)cons.receive(1000);
            ProxyAssertSupport.assertNotNull(t);
            ProxyAssertSupport.assertEquals("f", t.getText());
         }
         {
            TextMessage t = (TextMessage)cons.receive(1000);
            ProxyAssertSupport.assertNotNull(t);
            ProxyAssertSupport.assertEquals("e", t.getText());
         }
         {
            TextMessage t = (TextMessage)cons.receive(1000);
            ProxyAssertSupport.assertNotNull(t);
            ProxyAssertSupport.assertEquals("d", t.getText());
         }
         {
            TextMessage t = (TextMessage)cons.receive(1000);
            ProxyAssertSupport.assertNotNull(t);
            ProxyAssertSupport.assertEquals("c", t.getText());
         }
         {
            TextMessage t = (TextMessage)cons.receive(1000);
            ProxyAssertSupport.assertNotNull(t);
            ProxyAssertSupport.assertEquals("b", t.getText());
         }
         {
            TextMessage t = (TextMessage)cons.receive(1000);
            ProxyAssertSupport.assertNotNull(t);
            ProxyAssertSupport.assertEquals("a", t.getText());
         }
         {
            TextMessage t = (TextMessage)cons.receive(500);
            ProxyAssertSupport.assertNull(t);
         }
      }
      finally
      {
         if (conn != null)
         {
            conn.close();
         }
      }
   }

   /**
    * Second test that message order survives a restart
    */
   public void testMessageOrderPersistence_2() throws Exception
   {
      Connection conn = null;

      try
      {
         conn = JMSTestCase.cf.createConnection();
         Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageProducer prod = sessSend.createProducer(HornetQServerTestCase.queue1);

         TextMessage m0 = sessSend.createTextMessage("a");
         TextMessage m1 = sessSend.createTextMessage("b");
         TextMessage m2 = sessSend.createTextMessage("c");
         TextMessage m3 = sessSend.createTextMessage("d");
         TextMessage m4 = sessSend.createTextMessage("e");
         TextMessage m5 = sessSend.createTextMessage("f");
         TextMessage m6 = sessSend.createTextMessage("g");
         TextMessage m7 = sessSend.createTextMessage("h");
         TextMessage m8 = sessSend.createTextMessage("i");
         TextMessage m9 = sessSend.createTextMessage("j");

         prod.send(m0, DeliveryMode.PERSISTENT, 0, 0);
         prod.send(m1, DeliveryMode.PERSISTENT, 0, 0);
         prod.send(m2, DeliveryMode.PERSISTENT, 0, 0);
         prod.send(m3, DeliveryMode.PERSISTENT, 3, 0);
         prod.send(m4, DeliveryMode.PERSISTENT, 3, 0);
         prod.send(m5, DeliveryMode.PERSISTENT, 4, 0);
         prod.send(m6, DeliveryMode.PERSISTENT, 4, 0);
         prod.send(m7, DeliveryMode.PERSISTENT, 5, 0);
         prod.send(m8, DeliveryMode.PERSISTENT, 5, 0);
         prod.send(m9, DeliveryMode.PERSISTENT, 6, 0);

         conn.close();

         stop();

         startNoDelete();

         deployAndLookupAdministeredObjects();

         conn = JMSTestCase.cf.createConnection();
         Session sessReceive = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
         conn.start();
         MessageConsumer cons = sessReceive.createConsumer(HornetQServerTestCase.queue1);

         {
            TextMessage t = (TextMessage)cons.receive(1000);
            ProxyAssertSupport.assertNotNull(t);
            ProxyAssertSupport.assertEquals("j", t.getText());
         }
         {
            TextMessage t = (TextMessage)cons.receive(1000);
            ProxyAssertSupport.assertNotNull(t);
            ProxyAssertSupport.assertEquals("h", t.getText());
         }
         {
            TextMessage t = (TextMessage)cons.receive(1000);
            ProxyAssertSupport.assertNotNull(t);
            ProxyAssertSupport.assertEquals("i", t.getText());
         }
         {
            TextMessage t = (TextMessage)cons.receive(1000);
            ProxyAssertSupport.assertNotNull(t);
            ProxyAssertSupport.assertEquals("f", t.getText());
         }
         {
            TextMessage t = (TextMessage)cons.receive(1000);
            ProxyAssertSupport.assertNotNull(t);
            ProxyAssertSupport.assertEquals("g", t.getText());
         }
         {
            TextMessage t = (TextMessage)cons.receive(1000);
            ProxyAssertSupport.assertNotNull(t);
            ProxyAssertSupport.assertEquals("d", t.getText());
         }
         {
            TextMessage t = (TextMessage)cons.receive(1000);
            ProxyAssertSupport.assertNotNull(t);
            ProxyAssertSupport.assertEquals("e", t.getText());
         }
         {
            TextMessage t = (TextMessage)cons.receive(1000);
            ProxyAssertSupport.assertNotNull(t);
            ProxyAssertSupport.assertEquals("a", t.getText());
         }
         {
            TextMessage t = (TextMessage)cons.receive(1000);
            ProxyAssertSupport.assertNotNull(t);
            ProxyAssertSupport.assertEquals("b", t.getText());
         }
         {
            TextMessage t = (TextMessage)cons.receive(1000);
            ProxyAssertSupport.assertNotNull(t);
            ProxyAssertSupport.assertEquals("c", t.getText());
         }
         {
            TextMessage t = (TextMessage)cons.receiveNoWait();
            ProxyAssertSupport.assertNull(t);
         }
      }
      finally
      {
         if (conn != null)
         {
            conn.close();
         }
      }
   }

   /*
    * Test durable subscription state survives a server crash
    */
   public void testDurableSubscriptionPersistence_1() throws Exception
   {
      Connection conn = null;

      try
      {
         conn = JMSTestCase.cf.createConnection();
         conn.setClientID("five");

         Session s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

         MessageConsumer ds = s.createDurableSubscriber(HornetQServerTestCase.topic1, "sub", null, false);

         MessageProducer p = s.createProducer(HornetQServerTestCase.topic1);
         p.setDeliveryMode(DeliveryMode.PERSISTENT);
         TextMessage tm = s.createTextMessage("thebody");

         p.send(tm);
         log.debug("message sent");

         conn.close();

         stop();

         startNoDelete();

         deployAndLookupAdministeredObjects();

         conn = JMSTestCase.cf.createConnection();
         conn.setClientID("five");

         s = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
         conn.start();

         ds = s.createDurableSubscriber(HornetQServerTestCase.topic1, "sub", null, false);

         TextMessage rm = (TextMessage)ds.receive(3000);
         ProxyAssertSupport.assertNotNull(rm);
         ProxyAssertSupport.assertEquals("thebody", rm.getText());

         ds.close();

         s.unsubscribe("sub");
      }
      finally
      {
         if (conn != null)
         {
            conn.close();
         }
      }
   }

   /*
    * Test durable subscription state survives a restart
    */
   public void testDurableSubscriptionPersistence_2() throws Exception
   {
      Connection conn = null;

      try
      {
         conn = JMSTestCase.cf.createConnection();
         conn.setClientID("Sausages");

         Session sessConsume = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);

         MessageConsumer sub1 = sessConsume.createDurableSubscriber(HornetQServerTestCase.topic1, "sub1", null, false);
         MessageConsumer sub2 = sessConsume.createDurableSubscriber(HornetQServerTestCase.topic1, "sub2", null, false);
         MessageConsumer sub3 = sessConsume.createDurableSubscriber(HornetQServerTestCase.topic1, "sub3", null, false);

         Session sessSend = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
         MessageProducer prod = sessSend.createProducer(HornetQServerTestCase.topic1);
         prod.setDeliveryMode(DeliveryMode.PERSISTENT);

         for (int i = 0; i < 10; i++)
         {
            TextMessage tm = sessSend.createTextMessage("message" + i);
            prod.send(tm);
         }

         conn.close();

         stop();

         startNoDelete();

         // HornetQ server restart implies new ConnectionFactory lookup
         deployAndLookupAdministeredObjects();

         conn = JMSTestCase.cf.createConnection();
         conn.setClientID("Sausages");

         sessConsume = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
         conn.start();

         sub1 = sessConsume.createDurableSubscriber(HornetQServerTestCase.topic1, "sub1", null, false);
         sub2 = sessConsume.createDurableSubscriber(HornetQServerTestCase.topic1, "sub2", null, false);
         sub3 = sessConsume.createDurableSubscriber(HornetQServerTestCase.topic1, "sub3", null, false);

         for (int i = 0; i < 10; i++)
         {
            TextMessage tm1 = (TextMessage)sub1.receive(3000);
            ProxyAssertSupport.assertNotNull(tm1);
            if (tm1 == null)
            {
               break;
            }
            ProxyAssertSupport.assertEquals("message" + i, tm1.getText());

            TextMessage tm2 = (TextMessage)sub2.receive(3000);
            ProxyAssertSupport.assertNotNull(tm2);
            if (tm2 == null)
            {
               break;
            }
            ProxyAssertSupport.assertEquals("message" + i, tm2.getText());

            TextMessage tm3 = (TextMessage)sub3.receive(3000);
            ProxyAssertSupport.assertNotNull(tm3);
            if (tm3 == null)
            {
               break;
            }
            ProxyAssertSupport.assertEquals("message" + i, tm3.getText());
         }

         sub1.close();
         sub2.close();
         sub3.close();

         sessConsume.unsubscribe("sub1");
         sessConsume.unsubscribe("sub2");
         sessConsume.unsubscribe("sub3");
      }
      finally
      {
         if (conn != null)
         {
            conn.close();
         }
      }
   }

   // Package protected ---------------------------------------------

   // Protected -----------------------------------------------------

   // Private -------------------------------------------------------

   // Inner classes -------------------------------------------------

}
TOP

Related Classes of org.hornetq.jms.tests.PersistenceTest

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.