Package org.jboss.test.messaging.jms.clustering

Source Code of org.jboss.test.messaging.jms.clustering.DistributedTopicTest$TestListener

/*
* JBoss, Home of Professional Open Source
* Copyright 2005, JBoss Inc., and individual contributors as indicated
* by the @authors tag. See the copyright.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.test.messaging.jms.clustering;

import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.InvalidDestinationException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.Destination;
import javax.jms.JMSException;

import org.jboss.test.messaging.tools.ServerManagement;
import java.util.ArrayList;


/**
*
* A DistributedTopicTest
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @version <tt>$Revision: 3902 $</tt>
*
* $Id: DistributedTopicTest.java 3902 2008-03-20 23:15:59Z clebert.suconic@jboss.com $
*
*/
public class DistributedTopicTest extends ClusteringTestBase
{

   // Constants -----------------------------------------------------

   // Static --------------------------------------------------------

   // Attributes ----------------------------------------------------

   // Constructors --------------------------------------------------

   public DistributedTopicTest(String name)
   {
      super(name);
   }

   // Public --------------------------------------------------------

   public void testClusteredTopicNonDurableNonPersistent() throws Exception
   {
      clusteredTopicNonDurable(false);
   }

   public void testClusteredTopicNonDurablePersistent() throws Exception
   {
      clusteredTopicNonDurable(true);
   }

   public void testClusteredTopicNonDurableNoLocalNonPersistent() throws Exception
   {
      clusteredTopicNoLocal(false);
   }

   public void testClusteredTopicNonDurableNoLocalPersistent() throws Exception
   {
     clusteredTopicNoLocal(true);
   }

   public void testClusteredTopicNonDurableWithSelectorsNonPersistent() throws Exception
   {
      clusteredTopicNonDurableWithSelectors(false);
   }

   public void testClusteredTopicNonDurableWithSelectorsPersistent() throws Exception
   {
      clusteredTopicNonDurableWithSelectors(true);
   }

   public void testClusteredTopicDurableNonPersistent() throws Exception
   {
      clusteredTopicDurable(false);
   }

   public void testClusteredTopicDurablePersistent() throws Exception
   {
      clusteredTopicDurable(true);
   }

   public void testClusteredTopicSharedDurableLocalConsumerNonPersistent() throws Exception
   {
      clusteredTopicSharedDurableLocalConsumer(false);
   }

   public void testClusteredTopicSharedDurableLocalConsumerPersistent() throws Exception
   {
      clusteredTopicSharedDurableLocalConsumer(true);
   }

   public void testClusteredTopicSharedDurableNoLocalSubNonPersistent() throws Exception
   {
      clusteredTopicSharedDurableNoLocalSub(false);
   }

   public void testClusteredTopicSharedDurableNoLocalSubPersistent() throws Exception
   {
      clusteredTopicSharedDurableNoLocalSub(true);
   }

   public void testNonClusteredTopicNonDurableNonPersistent() throws Exception
   {
     nonClusteredTopicNonDurable(false);
   }

   public void testNonClusteredTopicNonDurablePersistent() throws Exception
   {
     nonClusteredTopicNonDurable(true);
   }

   public void testNonClusteredTopicDurableNonPersistent() throws Exception
   {
     nonClusteredTopicDurable(false);
   }

   public void testNonClusteredTopicDurablePersistent() throws Exception
   {
     nonClusteredTopicDurable(true);
   }

   public void testFloodSubscriptions() throws Exception
   {
      Connection conn0 = this.createConnectionOnServer(cf, 0);
      Connection conn1 = this.createConnectionOnServer(cf, 1);
      Connection conn2 = this.createConnectionOnServer(cf, 2);

      try
      {
         checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});

         conn0.setClientID("c1");
         conn1.setClientID("c1");
         conn2.setClientID("c1");

         Session session [] = new Session[3];
         session[0] = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
         session[1] = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
         session[2] = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);

         conn0.start();
         conn1.start();
         conn2.start();

         ArrayList<MessageConsumer> consumersArray[] = new ArrayList[3];


         MessageProducer prod = session[2].createProducer(topic[0]);
         prod.setDeliveryMode(DeliveryMode.PERSISTENT);

         int NUMBER_OF_MESSAGES = 10;
         int NUMBER_OF_SUBSCRIPTIONS = 50;

         int sessId=0;
         for (Session sess : session)
         {
            consumersArray[sessId] = new ArrayList<MessageConsumer>();
            for (int i=0;i<NUMBER_OF_SUBSCRIPTIONS;i++)
            {
               MessageConsumer consumer = sess.createDurableSubscriber(topic[sessId], "sess_" + sessId + "_" + i);
               consumersArray[sessId].add(consumer);
            }

            sessId++;
         }


         for (int i=0;i<NUMBER_OF_MESSAGES;i++)
         {
            log.info("Sending message " + i);
            prod.send(session[0].createTextMessage("test" + i));
         }


         assertEquals(NUMBER_OF_SUBSCRIPTIONS * 3, getNoSubscriptions(topic[0]));

         int messageRead = 0;

         for (ArrayList<MessageConsumer> consumers: consumersArray)
         {
            for (MessageConsumer consumer: consumers)
            {
               TextMessage msg = null;
               for (int i=0;i<NUMBER_OF_MESSAGES;i++)
               {
                  msg = (TextMessage)consumer.receive(5000);
                  assertNotNull(msg);
                  log.info("Msg:" + msg + " text - " + msg.getText());
                  assertEquals("test" + i, msg.getText());
                  messageRead ++;
               }
            }
         }

         assertEquals(NUMBER_OF_SUBSCRIPTIONS * NUMBER_OF_MESSAGES * 3, messageRead);

         MessageProducer prod1 = session[1].createProducer(topic[0]);

         for (ArrayList<MessageConsumer> consumers: consumersArray)
         {
            for (MessageConsumer consumer: consumers)
            {
               consumer.close();
            }
         }

         for (sessId = 0; sessId < 3; sessId++)
         {
            for (int i=0;i<NUMBER_OF_SUBSCRIPTIONS;i++)
            {
               session[sessId].unsubscribe("sess_" + sessId + "_" + i);
            }
         }

         checkNoSubscriptions(topic[0]);

      }
      finally
      {
         try { if (conn0 != null) conn0.close(); } catch (Exception ignored){}
         try { if (conn1 != null) conn1.close(); } catch (Exception ignored){}
         try { if (conn2 != null) conn2.close(); } catch (Exception ignored){}
      }
   }

   // http://jira.jboss.org/jira/browse/JBMESSAGING-1216
   public void testDestinationTypeOnMessage() throws Exception
   {
      Connection connection0 = null;
      Connection connection1 = null;
      Connection connection2 = null;

      Session session0 = null;
      Session session1 = null;
      Session session2 = null;
     
      Session sessionProducer = null;

      try
      {
         connection0 = createConnectionOnServer(cf, 0);
         connection1 = createConnectionOnServer(cf, 1);
         connection2 = createConnectionOnServer(cf, 1);

         connection0.setClientID("aCustomer");
         connection1.setClientID("aCustomer");
         connection2.setClientID("aCustomer");


         session0 = connection0.createSession(true,
               Session.AUTO_ACKNOWLEDGE);
        
         sessionProducer = connection0.createSession(true, Session.AUTO_ACKNOWLEDGE);

         MessageConsumer subscriber0 = session0.createDurableSubscriber(
               topic[0], "sub");
         TestListener messageListener0 = new TestListener(0);
         subscriber0.setMessageListener(messageListener0);

         MessageProducer publisher = sessionProducer.createProducer(topic[0]);
         publisher.setDeliveryMode(DeliveryMode.PERSISTENT);

         session1 = connection1.createSession(true,
               Session.AUTO_ACKNOWLEDGE);

         MessageConsumer subscriber1 = session1.createDurableSubscriber(
               topic[0], "sub2");


         TestListener messageListener1 = new TestListener(1);
         subscriber1.setMessageListener(messageListener1);

         session2 = connection2.createSession(true, Session.AUTO_ACKNOWLEDGE);
         MessageConsumer subscriber2 = session2.createDurableSubscriber(topic[0], "sub3");
         TestListener messageListener2 = new TestListener(2);
         subscriber2.setMessageListener(messageListener2);


         connection0.start();
         connection1.start();
         connection2.start();


         TextMessage message = session0.createTextMessage("Hello!");
         log.info("Sending message");
         publisher.send(message);
         log.info("message sent " + message);
         sessionProducer.commit();

         Message m0 = messageListener0.waitForMessage();

         message = (TextMessage) m0;
         assertTrue(m0.getJMSDestination() instanceof Topic);

         Topic topicDest = (Topic)m0.getJMSDestination();
         assertEquals(topic[0].getTopicName(), topicDest.getTopicName());
         assertEquals("Hello!", message.getText());

         Message m1 = messageListener1.waitForMessage();
         message = (TextMessage) m1;

         assertTrue(m1.getJMSDestination() instanceof Topic);
         topicDest = (Topic)m1.getJMSDestination();
         log.info("******** topicName = " + topicDest.getTopicName());
         assertEquals(topic[1].getTopicName(), topicDest.getTopicName());

         assertEquals("Hello!", message.getText());


         Message m2 = messageListener2.waitForMessage();
         message = (TextMessage) m2;
         Destination d2 = m2.getJMSDestination();
         assertTrue(d2 instanceof Topic);
         assertEquals(topic[0].getTopicName(),((Topic)d2).getTopicName());
        
         log.info("Going to commit 0");
         session0.commit();
         session1.commit();
         session2.commit();

         subscriber0.close();
         subscriber1.close();
         subscriber2.close();
        
         assertEquals(1, messageListener0.received);
         assertEquals(1, messageListener1.received);
         assertEquals(1, messageListener2.received);

         session0.unsubscribe("sub");
         session1.unsubscribe("sub2");
         session2.unsubscribe("sub3");


      } finally
      {
         try
         {
            if (connection0 != null)
            {
               connection0.close();
            }
         } catch (JMSException e)
         {
            throw e;
         }

         try
         {
            if (connection1 != null)
            {
               connection1.close();
            }
         } catch (JMSException e)
         {
            throw e;
         }

         try
         {
            if (connection2 != null)
            {
               connection2.close();
            }
         } catch (JMSException e)
         {
            throw e;
         }

      }
   }

   // Package protected ---------------------------------------------

   // Protected -----------------------------------------------------

   protected void setUp() throws Exception
   {
      nodeCount = 3;

      super.setUp();

      ServerManagement.deployTopic("nonClusteredTopic", "nonClusteredTopic", 300000, 3000, 3000, 0, false);

      ServerManagement.deployTopic("nonClusteredTopic", "nonClusteredTopic", 300000, 3000, 3000, 1, false);

      ServerManagement.deployTopic("nonClusteredTopic", "nonClusteredTopic", 300000, 3000, 3000, 2, false);
   }

   // Private -------------------------------------------------------

   /*
    * Create non durable subscriptions on all nodes of the cluster.
    * Ensure all messages are receive as appropriate
    */
   private void clusteredTopicNonDurable(boolean persistent) throws Exception
   {
      Connection conn0 = null;
      Connection conn1 = null;
      Connection conn2 = null;
      try
      {
         //This will create 3 different connection on 3 different nodes, since
         //the cf is clustered
         conn0 = this.createConnectionOnServer(cf, 0);
         conn1 = this.createConnectionOnServer(cf, 1);
         conn2 = this.createConnectionOnServer(cf, 2);

         checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});

         Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);

         MessageConsumer cons0 = sess0.createConsumer(topic[0]);
         MessageConsumer cons1 = sess1.createConsumer(topic[1]);
         MessageConsumer cons2 = sess2.createConsumer(topic[2]);
         MessageConsumer cons3 = sess0.createConsumer(topic[0]);
         MessageConsumer cons4 = sess1.createConsumer(topic[1]);

         conn0.start();
         conn1.start();
         conn2.start();

         // Send at node 0

         MessageProducer prod = sess0.createProducer(topic[0]);

         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);

         final int NUM_MESSAGES = 100;

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = sess0.createTextMessage("message" + i);

            prod.send(tm);
         }

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = (TextMessage)cons0.receive(3000);

            assertNotNull(tm);

            assertEquals("message" + i, tm.getText());
         }

         Message msg = cons0.receive(3000);

         assertNull(msg);

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = (TextMessage)cons1.receive(3000);

            assertNotNull(tm);


            assertEquals("message" + i, tm.getText());
         }

         msg = cons1.receive(3000);

         assertNull(msg);

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = (TextMessage)cons2.receive(3000);

            assertNotNull(tm);

            assertEquals("message" + i, tm.getText());
         }

         msg = cons2.receive(3000);

         assertNull(msg);

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = (TextMessage)cons3.receive(3000);

            assertNotNull(tm);

            assertEquals("message" + i, tm.getText());
         }

         msg = cons3.receive(3000);

         assertNull(msg);

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = (TextMessage)cons4.receive(3000);

            assertNotNull(tm);

            assertEquals("message" + i, tm.getText());
         }

         msg = cons4.receive(3000);

         assertNull(msg);
      }
      finally
      {
         if (conn0 != null)
         {
            conn0.close();
         }

         if (conn1 != null)
         {
            conn1.close();
         }

         if (conn2 != null)
         {
            conn2.close();
         }
      }
   }


   /* Test with noLocal set to true */
   private void clusteredTopicNoLocal(boolean persistent) throws Exception
   {
      Connection conn0 = null;
      Connection conn1 = null;
      Connection conn2 = null;
      try
      {
         //This will create 3 different connection on 3 different nodes, since
         //the cf is clustered
         conn0 = this.createConnectionOnServer(cf, 0);
         conn1 = this.createConnectionOnServer(cf, 1);
         conn2 = this.createConnectionOnServer(cf, 2);

         checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});

         Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);

         MessageConsumer cons0 = sess0.createConsumer(topic[0], null, true);
         MessageConsumer cons1 = sess1.createConsumer(topic[1], null, true);
         MessageConsumer cons2 = sess2.createConsumer(topic[2], null, true);

         conn0.start();
         conn1.start();
         conn2.start();

         // Send at node 0

         MessageProducer prod = sess0.createProducer(topic[0]);

         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);

         final int NUM_MESSAGES = 100;

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = sess0.createTextMessage("message" + i);

            prod.send(tm);
         }

         Message msg = cons0.receive(3000);

         assertNull(msg);

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = (TextMessage)cons1.receive(3000);

            assertNotNull(tm);


            assertEquals("message" + i, tm.getText());
         }

         msg = cons1.receive(3000);

         assertNull(msg);

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = (TextMessage)cons2.receive(3000);

            assertNotNull(tm);

            assertEquals("message" + i, tm.getText());
         }

         msg = cons2.receive(3000);

         assertNull(msg);
      }
      finally
      {
         if (conn0 != null)
         {
            conn0.close();
         }

         if (conn1 != null)
         {
            conn1.close();
         }

         if (conn2 != null)
         {
            conn2.close();
         }
      }
   }

   /*
    * Create non durable subscriptions on all nodes of the cluster.
    * Include some with selectors
    * Ensure all messages are receive as appropriate
    */
   private void clusteredTopicNonDurableWithSelectors(boolean persistent) throws Exception
   {
      Connection conn0 = null;
      Connection conn1 = null;
      Connection conn2 = null;

      try
      {
         //This will create 3 different connection on 3 different nodes, since
         //the cf is clustered
        conn0 = this.createConnectionOnServer(cf, 0);
         conn1 = this.createConnectionOnServer(cf, 1);
         conn2 = this.createConnectionOnServer(cf, 2);

         checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});

         Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);

         MessageConsumer cons0 = sess0.createConsumer(topic[0]);
         MessageConsumer cons1 = sess1.createConsumer(topic[1]);
         MessageConsumer cons2 = sess2.createConsumer(topic[2]);
         MessageConsumer cons3 = sess0.createConsumer(topic[0], "COLOUR='red'");
         MessageConsumer cons4 = sess1.createConsumer(topic[1], "COLOUR='blue'");

         conn0.start();
         conn1.start();
         conn2.start();

         // Send at node 0

         MessageProducer prod = sess0.createProducer(topic[0]);

         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);

         final int NUM_MESSAGES = 100;

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = sess0.createTextMessage("message" + i);

            int c = i % 3;
            if (c == 0)
            {
               tm.setStringProperty("COLOUR", "red");
            }
            else if (c == 1)
            {
               tm.setStringProperty("COLOUR", "blue");
            }

            prod.send(tm);
         }

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = (TextMessage)cons0.receive(3000);

            assertNotNull(tm);

            assertEquals("message" + i, tm.getText());
         }

         Message msg = cons0.receive(3000);

         assertNull(msg);

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = (TextMessage)cons1.receive(3000);

            assertNotNull(tm);

            assertEquals("message" + i, tm.getText());
         }

         msg = cons1.receive(3000);

         assertNull(msg);

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = (TextMessage)cons2.receive(3000);

            assertNotNull(tm);

            assertEquals("message" + i, tm.getText());
         }

         msg = cons2.receive(3000);

         assertNull(msg);

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            int c = i % 3;

            if (c == 0)
            {
               TextMessage tm = (TextMessage)cons3.receive(3000);

               assertNotNull(tm);

               assertEquals("message" + i, tm.getText());
            }
         }

         msg = cons3.receive(3000);

         assertNull(msg);

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            int c = i % 3;

            if (c == 1)
            {
               TextMessage tm = (TextMessage)cons4.receive(3000);

               assertNotNull(tm);

               assertEquals("message" + i, tm.getText());
            }
         }

         msg = cons4.receive(3000);

         assertNull(msg);
      }
      finally
      {
         if (conn0 != null)
         {
            conn0.close();
         }

         if (conn1 != null)
         {
            conn1.close();
         }

         if (conn2 != null)
         {
            conn2.close();
         }
      }
   }

   private void clusteredTopicDurable(boolean persistent) throws Exception
   {
      Connection conn0 = null;
      Connection conn1 = null;
      Connection conn2 = null;

      try
      {
         // This will create 3 different connection on 3 different nodes, since the cf is clustered
        conn0 = this.createConnectionOnServer(cf, 0);
         conn1 = this.createConnectionOnServer(cf, 1);
         conn2 = this.createConnectionOnServer(cf, 2);

         checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});

         conn0.setClientID("wib1");
         conn1.setClientID("wib1");
         conn2.setClientID("wib1");

         Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);

         try
         {
            sess0.unsubscribe("alpha");
         }
         catch (Exception ignore) {}
         try
         {
            sess1.unsubscribe("beta");
         }
         catch (Exception ignore) {}
         try
         {
            sess2.unsubscribe("gamma");
         }
         catch (Exception ignore) {}
         try
         {
            sess0.unsubscribe("delta");
         }
         catch (Exception ignore) {}
         try
         {
            sess1.unsubscribe("epsilon");
         }
         catch (Exception ignore) {}

         MessageConsumer alpha = sess0.createDurableSubscriber(topic[0], "alpha");

         MessageConsumer beta = sess1.createDurableSubscriber(topic[1], "beta");

         MessageConsumer gamma = sess2.createDurableSubscriber(topic[2], "gamma");

         MessageConsumer delta = sess0.createDurableSubscriber(topic[0], "delta");

         MessageConsumer epsilon = sess1.createDurableSubscriber(topic[1], "epsilon");

         Thread.sleep(2000);

         conn0.start();
         conn1.start();
         conn2.start();

         // Send at node 0 - and make sure the messages are consumable from all the durable subs

         MessageProducer prod = sess0.createProducer(topic[0]);

         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);

         final int NUM_MESSAGES = 100;

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            prod.send(sess0.createTextMessage("message" + i));
         }

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = (TextMessage)alpha.receive(3000);
            assertNotNull(tm);
            assertEquals("message" + i, tm.getText());
         }

         Message msg = alpha.receive(3000);
         assertNull(msg);

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = (TextMessage)beta.receive(3000);
            assertNotNull(tm);
            assertEquals("message" + i, tm.getText());
         }

         msg = beta.receive(3000);
         assertNull(msg);

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = (TextMessage)gamma.receive(3000);
            assertNotNull(tm);
            assertEquals("message" + i, tm.getText());
         }

         msg = gamma.receive(3000);
         assertNull(msg);

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = (TextMessage)delta.receive(3000);
            assertNotNull(tm);
            assertEquals("message" + i, tm.getText());
         }

         msg = delta.receive(3000);
         assertNull(msg);

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = (TextMessage)epsilon.receive(3000);
            assertNotNull(tm);
            assertEquals("message" + i, tm.getText());
         }

         msg = epsilon.receive(3000);
         assertNull(msg);

         //close beta
         beta.close();

         // Create another beta - this one node 0
         MessageConsumer beta0 = sess0.createDurableSubscriber(topic[0], "beta");

         //And one node node1
         MessageConsumer beta1 = sess1.createDurableSubscriber(topic[1], "beta");

         //Now send some more messages at node 2

         MessageProducer prod2 = sess2.createProducer(topic[2]);

         prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            prod2.send(sess1.createTextMessage("message2-" + i));
         }

         //They should be round - robined - but we don't know which one will get them first

         int offset = 0;

         for (int i = 0; i < NUM_MESSAGES / 2; i++)
         {
            TextMessage tm = (TextMessage)beta0.receive(3000);
            assertNotNull(tm);

            if (tm.getText().substring("message2-".length()).equals("1"))
            {
              offset = 1;
            }

            assertEquals("message2-" + (i * 2 + offset), tm.getText());
         }

         msg = beta0.receive(3000);
         assertNull(msg);

         if (offset == 1)
         {
           offset = 0;
         }
         else
         {
           offset = 1;
         }

         for (int i = 0; i < NUM_MESSAGES / 2; i++)
         {
            TextMessage tm = (TextMessage)beta1.receive(3000);
            assertNotNull(tm);
            assertEquals("message2-" + (i * 2 + offset), tm.getText());
         }

         msg = beta1.receive(3000);
         assertNull(msg);

         //Send some more at node 0

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            prod.send(sess1.createTextMessage("message3-" + i));
         }

         //This should go straight to the local queue

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = (TextMessage)beta0.receive(3000);
            assertNotNull(tm);
            assertEquals("message3-" + i, tm.getText());
         }

         msg = beta0.receive(3000);
         assertNull(msg);

         //So now we have a beta on node 1 and a beta on node 0 and the messages are on node2

         beta0.close();
         beta1.close();

         alpha.close();
         beta.close();
         gamma.close();
         delta.close();
         epsilon.close();

         sess0.unsubscribe("alpha");
         sess1.unsubscribe("beta");
         sess2.unsubscribe("gamma");
         sess0.unsubscribe("delta");
         sess1.unsubscribe("epsilon");

      }
      finally
      {
         if (conn0 != null)
         {
            conn0.close();
         }

         if (conn1 != null)
         {
            conn1.close();
         }

         if (conn2 != null)
         {
            conn2.close();
         }
      }
   }


   /*
    * Create shared durable subs on multiple nodes, the local instance should always get the message
    */
   private void clusteredTopicSharedDurableLocalConsumer(boolean persistent) throws Exception
   {
      Connection conn1 = null;
      Connection conn2 = null;
      Connection conn3 = null;
      try

      {
         //This will create 3 different connection on 3 different nodes, since
         //the cf is clustered
        conn1 = this.createConnectionOnServer(cf, 0);
         conn2 = this.createConnectionOnServer(cf, 1);
         conn3 = this.createConnectionOnServer(cf, 2);

         checkConnectionsDifferentServers(new Connection[] {conn1, conn2, conn3});
         conn1.setClientID("wib1");
         conn2.setClientID("wib1");
         conn3.setClientID("wib1");

         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);

         try
         {
            sess1.unsubscribe("sub");
         }
         catch (Exception ignore) {}
         try
         {
            sess2.unsubscribe("sub");
         }
         catch (Exception ignore) {}
         try
         {
            sess3.unsubscribe("sub");
         }
         catch (Exception ignore) {}

         MessageConsumer cons1 = sess1.createDurableSubscriber(topic[0], "sub");

         MessageConsumer cons2 = sess2.createDurableSubscriber(topic[1], "sub");

         MessageConsumer cons3 = sess3.createDurableSubscriber(topic[2], "sub");

         conn1.start();
         conn2.start();
         conn3.start();

         // Send at node 0

         MessageProducer prod = sess1.createProducer(topic[0]);

         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);

         final int NUM_MESSAGES = 100;

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = sess1.createTextMessage("message" + i);

            prod.send(tm);
         }

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = (TextMessage)cons1.receive(3000);

            assertNotNull(tm);

            assertEquals("message" + i, tm.getText());
         }

         Message m = cons2.receive(3000);

         assertNull(m);

         m = cons3.receive(3000);

         assertNull(m);

         // Send at node 1

         MessageProducer prod1 = sess2.createProducer(topic[1]);

         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = sess3.createTextMessage("message" + i);

            prod1.send(tm);
         }

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = (TextMessage)cons2.receive(3000);

            assertNotNull(tm);

            assertEquals("message" + i, tm.getText());
         }

         m = cons1.receive(3000);

         assertNull(m);

         m = cons3.receive(3000);

         assertNull(m);

         // Send at node 2

         MessageProducer prod2 = sess3.createProducer(topic[2]);

         prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = sess3.createTextMessage("message" + i);

            prod2.send(tm);
         }

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = (TextMessage)cons3.receive(3000);

            assertNotNull(tm);

            assertEquals("message" + i, tm.getText());
         }

         m = cons1.receive(3000);

         assertNull(m);

         m = cons2.receive(3000);

         assertNull(m);

         cons1.close();
         cons2.close();

         //Try and unsubscribe now - this should fail since there is still a consumer open on another node

         try
         {
           sess1.unsubscribe("sub");

           fail("Did not throw IllegalStateException");
         }
         catch (javax.jms.IllegalStateException e)
         {
           //Ok
         }

         cons3.close();

         // Need to unsubscribe on any node that the durable sub was created on

         sess1.unsubscribe("sub");

         //Next unsubscribe should fail since it's already unsubscribed from a different node of the cluster
         try
         {
           sess2.unsubscribe("sub");

           fail("Did not throw InvalidDestinationException");
         }
         catch (InvalidDestinationException e)
         {
           //Ok
         }

         try
         {
           sess3.unsubscribe("sub");

           fail("Did not throw InvalidDestinationException");
         }
         catch (InvalidDestinationException e)
         {
           //Ok
         }
      }
      finally
      {
         if (conn1 != null)
         {
            conn1.close();
         }

         if (conn2 != null)
         {
            conn2.close();
         }

         if (conn3 != null)
         {
            conn3.close();
         }
      }
   }



   /*
    * Create shared durable subs on multiple nodes, but without sub on local node
    * should round robin
    * note that this test assumes round robin
    */
   private void clusteredTopicSharedDurableNoLocalSub(boolean persistent) throws Exception
   {
      Connection conn1 = null;
      Connection conn2 = null;
      Connection conn3 = null;

      try
      {
         //This will create 3 different connection on 3 different nodes, since
         //the cf is clustered
        conn1 = this.createConnectionOnServer(cf, 0);
         conn2 = this.createConnectionOnServer(cf, 1);
         conn3 = this.createConnectionOnServer(cf, 2);

         checkConnectionsDifferentServers(new Connection[] {conn1, conn2, conn3});

         conn2.setClientID("wib1");
         conn3.setClientID("wib1");

         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Session sess3 = conn3.createSession(false, Session.AUTO_ACKNOWLEDGE);

         try
         {
            sess2.unsubscribe("sub");
         }
         catch (Exception ignore) {}

         MessageConsumer cons1 = sess2.createDurableSubscriber(topic[1], "sub");
         MessageConsumer cons2 = sess3.createDurableSubscriber(topic[2], "sub");

         conn2.start();
         conn3.start();

         // Send at node 0

         //Should round robin between the other 2 since there is no active consumer on sub  on node 0

         MessageProducer prod = sess1.createProducer(topic[0]);

         prod.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);

         final int NUM_MESSAGES = 100;

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = sess1.createTextMessage("message2-" + i);

            prod.send(tm);
         }


         int offset = 0;

         for (int i = 0; i < NUM_MESSAGES / 2; i++)
         {
            TextMessage tm = (TextMessage)cons1.receive(3000);
            assertNotNull(tm);

            if (tm.getText().substring("message2-".length()).equals("1"))
            {
              offset = 1;
            }

            assertEquals("message2-" + (i * 2 + offset), tm.getText());
         }

         Message msg = cons1.receive(3000);
         assertNull(msg);

         if (offset == 1)
         {
           offset = 0;
         }
         else
         {
           offset = 1;
         }

         for (int i = 0; i < NUM_MESSAGES / 2; i++)
         {
            TextMessage tm = (TextMessage)cons2.receive(3000);
            assertNotNull(tm);
            assertEquals("message2-" + (i * 2 + offset), tm.getText());
         }

         msg = cons2.receive(3000);
         assertNull(msg);

         cons1.close();
         cons2.close();

         sess2.unsubscribe("sub");

         try
         {
           sess3.unsubscribe("sub");
           fail("Should already be unsubscribed");
         }
         catch (InvalidDestinationException e)
         {
           //Ok - the previous unsubscribe should do a cluster wide unsubscribe
         }

      }
      finally
      {
         if (conn1 != null)
         {
            conn1.close();
         }

         if (conn2 != null)
         {
            conn2.close();
         }

         if (conn3 != null)
         {
            conn3.close();
         }
      }
   }


   private void nonClusteredTopicNonDurable(boolean persistent) throws Exception
   {
      Connection conn0 = null;
      Connection conn1 = null;
      Connection conn2 = null;

     // Deploy three non clustered topics with same name on different nodes

      try
      {
         Topic topic0 = (Topic)ic[0].lookup("/nonClusteredTopic");
         Topic topic1 = (Topic)ic[1].lookup("/nonClusteredTopic");
         Topic topic2 = (Topic)ic[2].lookup("/nonClusteredTopic");

         //This will create 3 different connection on 3 different nodes, since
         //the cf is clustered
         conn0 = this.createConnectionOnServer(cf, 0);
         conn1 = this.createConnectionOnServer(cf, 1);
         conn2 = this.createConnectionOnServer(cf, 2);

         checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});

         Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);

         conn0.start();
         conn1.start();
         conn2.start();

         MessageConsumer cons0 = sess0.createConsumer(topic0);
         MessageConsumer cons1 = sess1.createConsumer(topic1);
         MessageConsumer cons2 = sess2.createConsumer(topic2);

         // ==============
         // Send at node 0

         MessageProducer prod0 = sess0.createProducer(topic0);

         prod0.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);

         final int NUM_MESSAGES = 100;

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = sess0.createTextMessage("message" + i);

            prod0.send(tm);
         }

         // Try and consume at node 1

         Message m = cons1.receive(3000);

         assertNull(m);

         //And at node 2

         m = cons2.receive(3000);

         assertNull(m);

         // Now consume at node 0

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = (TextMessage)cons0.receive(3000);

            assertNotNull(tm);

            assertEquals("message" + i, tm.getText());
         }

         m = cons0.receive(3000);

         assertNull(m);

         // ==============
         // Send at node 1

         MessageProducer prod1 = sess1.createProducer(topic1);

         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = sess1.createTextMessage("message" + i);

            prod1.send(tm);
         }

         // Try and consume at node 0

         m = cons0.receive(3000);

         assertNull(m);

         //And at node 2

         m = cons2.receive(3000);

         assertNull(m);

         // Now consume at node 1

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = (TextMessage)cons1.receive(3000);

            assertNotNull(tm);

            assertEquals("message" + i, tm.getText());
         }

         m = cons1.receive(3000);

         assertNull(m);

         // ==============
         // Send at node 2

         MessageProducer prod2 = sess2.createProducer(topic2);

         prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = sess2.createTextMessage("message" + i);

            prod2.send(tm);
         }

         // Try and consume at node 0

         m = cons0.receive(3000);

         assertNull(m);

         //And at node 1

         m = cons1.receive(3000);

         assertNull(m);

         // Now consume at node 2

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = (TextMessage)cons2.receive(3000);

            assertNotNull(tm);

            assertEquals("message" + i, tm.getText());
         }

         m = cons2.receive(3000);

         assertNull(m);
      }
      finally
      {
         if (conn0 != null)
         {
            conn0.close();
         }

         if (conn1 != null)
         {
            conn1.close();
         }

         if (conn2 != null)
         {
            conn2.close();
         }

         ServerManagement.undeployQueue("nonClusteredTopic", 0);

         ServerManagement.undeployQueue("nonClusteredTopic", 1);

         ServerManagement.undeployQueue("nonClusteredTopic", 2);
      }
   }


   private void nonClusteredTopicDurable(boolean persistent) throws Exception
   {
      Connection conn0 = null;
      Connection conn1 = null;
      Connection conn2 = null;

     // Deploy three non clustered topics with same name on different nodes

      try
      {

         Topic topic0 = (Topic)ic[0].lookup("/nonClusteredTopic");
         Topic topic1 = (Topic)ic[1].lookup("/nonClusteredTopic");
         Topic topic2 = (Topic)ic[2].lookup("/nonClusteredTopic");

         //This will create 3 different connection on 3 different nodes, since
         //the cf is clustered
         conn0 = this.createConnectionOnServer(cf, 0);
         conn1 = this.createConnectionOnServer(cf, 1);
         conn2 = this.createConnectionOnServer(cf, 2);

         conn0.setClientID("cl123");
         conn1.setClientID("cl123");
         conn2.setClientID("cl123");

         checkConnectionsDifferentServers(new Connection[] {conn0, conn1, conn2});

         Session sess0 = conn0.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
         Session sess2 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);

         conn0.start();
         conn1.start();
         conn2.start();

         MessageConsumer cons0 = sess0.createDurableSubscriber(topic0, "mysub1");
         MessageConsumer cons1 = sess1.createDurableSubscriber(topic1, "mysub1");
         MessageConsumer cons2 = sess2.createDurableSubscriber(topic2, "mysub1");
         cons0.close();
         cons1.close();
         cons2.close();

         // ==============
         // Send at node 0

         MessageProducer prod0 = sess0.createProducer(topic0);

         prod0.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);

         final int NUM_MESSAGES = 100;

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = sess0.createTextMessage("message" + i);

            prod0.send(tm);
         }

         // Try and consume at node 1

         cons1 = sess1.createDurableSubscriber(topic1, "mysub1");

         Message m = cons1.receive(3000);

         assertNull(m);

         cons1.close();

         //And at node 2

         cons2 = sess2.createDurableSubscriber(topic2, "mysub1");

         m = cons2.receive(3000);

         assertNull(m);

         cons2.close();

         // Now consume at node 0

         cons0 = sess0.createDurableSubscriber(topic0, "mysub1");

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = (TextMessage)cons0.receive(3000);

            assertNotNull(tm);

            assertEquals("message" + i, tm.getText());
         }

         m = cons0.receive(3000);

         assertNull(m);

         cons0.close();

         // ==============
         // Send at node 1

         MessageProducer prod1 = sess1.createProducer(topic1);

         prod1.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = sess1.createTextMessage("message" + i);

            prod1.send(tm);
         }

         cons0 = sess0.createDurableSubscriber(topic0, "mysub1");

         // Try and consume at node 0

         m = cons0.receive(3000);

         assertNull(m);

         cons0.close();

         //And at node 2

         cons2 = sess2.createDurableSubscriber(topic2, "mysub1");

         m = cons2.receive(3000);

         assertNull(m);

         cons2.close();

         // Now consume at node 1

         cons1 = sess1.createDurableSubscriber(topic1, "mysub1");

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = (TextMessage)cons1.receive(3000);

            assertNotNull(tm);

            assertEquals("message" + i, tm.getText());
         }

         m = cons1.receive(3000);

         assertNull(m);

         cons1.close();

         // ==============
         // Send at node 2

         MessageProducer prod2 = sess2.createProducer(topic2);

         prod2.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = sess2.createTextMessage("message" + i);

            prod2.send(tm);
         }

         cons0 = sess0.createDurableSubscriber(topic0, "mysub1");

         // Try and consume at node 0

         m = cons0.receive(3000);

         assertNull(m);

         cons0.close();

         sess0.unsubscribe("mysub1");

         //And at node 1

         cons1 = sess1.createDurableSubscriber(topic1, "mysub1");

         m = cons1.receive(3000);

         assertNull(m);

         cons1.close();

         sess1.unsubscribe("mysub1");

         // Now consume at node 2

         cons2 = sess2.createDurableSubscriber(topic2, "mysub1");

         for (int i = 0; i < NUM_MESSAGES; i++)
         {
            TextMessage tm = (TextMessage)cons2.receive(3000);

            assertNotNull(tm);

            assertEquals("message" + i, tm.getText());
         }

         m = cons2.receive(3000);

         assertNull(m);

         cons2.close();

         sess2.unsubscribe("mysub1");
      }
      finally
      {
         if (conn0 != null)
         {
            conn0.close();
         }

         if (conn1 != null)
         {
            conn1.close();
         }

         if (conn2 != null)
         {
            conn2.close();
         }

         ServerManagement.undeployQueue("nonClusteredTopic", 0);

         ServerManagement.undeployQueue("nonClusteredTopic", 1);

         ServerManagement.undeployQueue("nonClusteredTopic", 2);
      }
   }


   // Inner classes -------------------------------------------------

   public class TestListener implements MessageListener
   {
      // Constants -----------------------------------------------------

      // Static --------------------------------------------------------

      // Attributes ----------------------------------------------------
     
      int received;

      private Message message;
     
      private int id;

      // Constructors --------------------------------------------------

      public TestListener(int id)
      {
         this.id=id;
      }

      // MessageListener implementation --------------------------------

      public synchronized void onMessage(Message message)
      {
         log.info("TestListener ID(" + id + ") Received message " + message);
         this.message = message;
         received ++;
         notifyAll();
      }

      // Public --------------------------------------------------------

      public synchronized Message waitForMessage()
      {
         if (message != null)
         {
            return message;
         }
         else
         {

            try
            {
               wait(5000);
            }
            catch(InterruptedException e)
            {
               // OK
            }

            return message;
         }

      }

      // Package protected ---------------------------------------------

      // Protected -----------------------------------------------------

      // Private -------------------------------------------------------

      // Inner classes -------------------------------------------------

   }

}
TOP

Related Classes of org.jboss.test.messaging.jms.clustering.DistributedTopicTest$TestListener

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.