Package org.hornetq.tests.integration.cluster.failover

Source Code of org.hornetq.tests.integration.cluster.failover.ClusterWithBackupFailoverTestBase

/*
* JBoss, Home of Professional Open Source
* Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
* by the @authors tag. See the copyright.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/

package org.hornetq.tests.integration.cluster.failover;

import java.util.Map;
import java.util.Set;

import org.hornetq.api.core.TransportConfiguration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.cluster.BroadcastGroup;
import org.hornetq.core.server.cluster.impl.ClusterManagerImpl;
import org.hornetq.spi.core.protocol.RemotingConnection;
import org.hornetq.tests.integration.cluster.distribution.ClusterTestBase;
import org.hornetq.tests.util.ServiceTestBase;

/**
*
* A ClusterWithBackupFailoverTest
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
*
* Created 9 Mar 2009 16:31:21
*
*
*/
public abstract class ClusterWithBackupFailoverTestBase extends ClusterTestBase
{
   private static final Logger log = Logger.getLogger(ClusterWithBackupFailoverTestBase.class);

   protected abstract void setupCluster(final boolean forwardWhenNoConsumers) throws Exception;

   protected abstract void setupServers() throws Exception;

   @Override
   protected void setUp() throws Exception
   {
      super.setUp();

      //FailoverManagerImpl.enableDebug();

      setupServers();
   }

   @Override
   protected void tearDown() throws Exception
   {
      stopServers();

     // FailoverManagerImpl.disableDebug();

      super.tearDown();
   }

   protected boolean isNetty()
   {
      return false;
   }

   public void testFailLiveNodes() throws Exception
   {
         setupCluster();

         startServers(3, 4, 5, 0, 1, 2);

         setupSessionFactory(0, 3, isNetty(), false);
         setupSessionFactory(1, 4, isNetty(), false);
         setupSessionFactory(2, 5, isNetty(), false);

         createQueue(0, "queues.testaddress", "queue0", null, true);
         createQueue(1, "queues.testaddress", "queue0", null, true);
         createQueue(2, "queues.testaddress", "queue0", null, true);

         addConsumer(0, 0, "queue0", null);
         addConsumer(1, 1, "queue0", null);
         addConsumer(2, 2, "queue0", null);

         waitForBindings(0, "queues.testaddress", 1, 1, true);
         waitForBindings(1, "queues.testaddress", 1, 1, true);
         waitForBindings(2, "queues.testaddress", 1, 1, true);

         waitForBindings(0, "queues.testaddress", 2, 2, false);
         waitForBindings(1, "queues.testaddress", 2, 2, false);
         waitForBindings(2, "queues.testaddress", 2, 2, false);

         send(0, "queues.testaddress", 10, false, null);
         verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);

         send(1, "queues.testaddress", 10, false, null);
         verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);

         send(2, "queues.testaddress", 10, false, null);
         verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);

         failNode(0);

         // live nodes
         waitForBindings(1, "queues.testaddress", 1, 1, true);
         waitForBindings(2, "queues.testaddress", 1, 1, true);
         // activated backup nodes
         waitForBindings(3, "queues.testaddress", 1, 1, true);

         // live nodes
         waitForBindings(1, "queues.testaddress", 2, 2, false);
         waitForBindings(2, "queues.testaddress", 2, 2, false);
         // activated backup nodes
         waitForBindings(3, "queues.testaddress", 2, 2, false);

         ClusterWithBackupFailoverTestBase.log.info("** now sending");

         send(0, "queues.testaddress", 10, false, null);
         verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);

         send(1, "queues.testaddress", 10, false, null);
         verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);

         send(2, "queues.testaddress", 10, false, null);
         verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);

         failNode(1);

         // live nodes
         waitForBindings(2, "queues.testaddress", 1, 1, true);
         // activated backup nodes
         waitForBindings(3, "queues.testaddress", 1, 1, true);
         waitForBindings(4, "queues.testaddress", 1, 1, true);

         // live nodes
         waitForBindings(2, "queues.testaddress", 2, 2, false);
         // activated backup nodes
         waitForBindings(3, "queues.testaddress", 2, 2, false);
         waitForBindings(4, "queues.testaddress", 2, 2, false);

         send(0, "queues.testaddress", 10, false, null);
         verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);

         send(1, "queues.testaddress", 10, false, null);
         verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);

         send(2, "queues.testaddress", 10, false, null);
         verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);

         failNode(2);

         // activated backup nodes
         waitForBindings(3, "queues.testaddress", 1, 1, true);
         waitForBindings(4, "queues.testaddress", 1, 1, true);
         waitForBindings(5, "queues.testaddress", 1, 1, true);

         // activated backup nodes
         waitForBindings(3, "queues.testaddress", 2, 2, false);
         waitForBindings(4, "queues.testaddress", 2, 2, false);
         waitForBindings(5, "queues.testaddress", 2, 2, false);

         send(0, "queues.testaddress", 10, false, null);
         verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);

         send(1, "queues.testaddress", 10, false, null);
         verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);

         send(2, "queues.testaddress", 10, false, null);
         verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);

         removeConsumer(0);
         removeConsumer(1);
         removeConsumer(2);

         stopServers();

         ClusterWithBackupFailoverTestBase.log.info("*** test done");
   }
  
   public void testFailBackupNodes() throws Exception
   {
      setupCluster();

      startServers(3, 4, 5, 0, 1, 2);

      setupSessionFactory(0, 3, isNetty(), false);
      setupSessionFactory(1, 4, isNetty(), false);
      setupSessionFactory(2, 5, isNetty(), false);

      createQueue(0, "queues.testaddress", "queue0", null, true);
      createQueue(1, "queues.testaddress", "queue0", null, true);
      createQueue(2, "queues.testaddress", "queue0", null, true);

      addConsumer(0, 0, "queue0", null);
      addConsumer(1, 1, "queue0", null);
      addConsumer(2, 2, "queue0", null);

      waitForBindings(0, "queues.testaddress", 1, 1, true);
      waitForBindings(1, "queues.testaddress", 1, 1, true);
      waitForBindings(2, "queues.testaddress", 1, 1, true);

      waitForBindings(0, "queues.testaddress", 2, 2, false);
      waitForBindings(1, "queues.testaddress", 2, 2, false);
      waitForBindings(2, "queues.testaddress", 2, 2, false);

      send(0, "queues.testaddress", 10, false, null);
      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);

      send(1, "queues.testaddress", 10, false, null);
      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);

      send(2, "queues.testaddress", 10, false, null);
      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);

      failNode(3);

      waitForBindings(0, "queues.testaddress", 1, 1, true);
      waitForBindings(1, "queues.testaddress", 1, 1, true);
      waitForBindings(2, "queues.testaddress", 1, 1, true);

      waitForBindings(0, "queues.testaddress", 2, 2, false);
      waitForBindings(1, "queues.testaddress", 2, 2, false);
      waitForBindings(2, "queues.testaddress", 2, 2, false);

      send(0, "queues.testaddress", 10, false, null);
      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);

      send(1, "queues.testaddress", 10, false, null);
      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);

      send(2, "queues.testaddress", 10, false, null);
      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);

      failNode(4);

      waitForBindings(0, "queues.testaddress", 1, 1, true);
      waitForBindings(1, "queues.testaddress", 1, 1, true);
      waitForBindings(2, "queues.testaddress", 1, 1, true);

      waitForBindings(0, "queues.testaddress", 2, 2, false);
      waitForBindings(1, "queues.testaddress", 2, 2, false);
      waitForBindings(2, "queues.testaddress", 2, 2, false);

      send(0, "queues.testaddress", 10, false, null);
      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);

      send(1, "queues.testaddress", 10, false, null);
      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);

      send(2, "queues.testaddress", 10, false, null);
      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);

      failNode(5);

      waitForBindings(0, "queues.testaddress", 1, 1, true);
      waitForBindings(1, "queues.testaddress", 1, 1, true);
      waitForBindings(2, "queues.testaddress", 1, 1, true);

      waitForBindings(0, "queues.testaddress", 2, 2, false);
      waitForBindings(1, "queues.testaddress", 2, 2, false);
      waitForBindings(2, "queues.testaddress", 2, 2, false);

      send(0, "queues.testaddress", 10, false, null);
      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);

      send(1, "queues.testaddress", 10, false, null);
      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);

      send(2, "queues.testaddress", 10, false, null);
      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);

     

      removeConsumer(0);
      removeConsumer(1);
      removeConsumer(2);

      stopServers();

      ClusterWithBackupFailoverTestBase.log.info("*** test done");
   }

   protected void setupCluster() throws Exception
   {
      setupCluster(false);
   }

   protected void stopServers() throws Exception
   {

      closeAllServerLocatorsFactories();
      closeAllConsumers();

      closeAllSessionFactories();

      stopServers(0, 1, 2, 3, 4, 5);
   }

   protected void failNode(final int node) throws Exception
   {
      ClusterWithBackupFailoverTestBase.log.info("*** failing node " + node);

      Map<String, Object> params = generateParams(node, isNetty());

      TransportConfiguration serverTC;

      if (isNetty())
      {
         serverTC = new TransportConfiguration(ServiceTestBase.NETTY_CONNECTOR_FACTORY, params);
      }
      else
      {
         serverTC = new TransportConfiguration(ServiceTestBase.INVM_CONNECTOR_FACTORY, params);
      }

      HornetQServer server = getServer(node);

      // Prevent remoting service taking any more connections
      server.getRemotingService().freeze();

      if (server.getClusterManager() != null)
      {
         // Stop it broadcasting
         for (BroadcastGroup group : server.getClusterManager().getBroadcastGroups())
         {
            group.stop();
         }
      }
      Set<RemotingConnection> connections = server.getRemotingService().getConnections();
      for (RemotingConnection remotingConnection : connections)
      {
         remotingConnection.destroy();
         server.getRemotingService().removeConnection(remotingConnection.getID());
      }

      ClusterManagerImpl clusterManager = (ClusterManagerImpl) server.getClusterManager();
      clusterManager.clear();

      server.stop(true);
   }

   public void testFailAllNodes() throws Exception
   {
      setupCluster();

      startServers(3, 4, 5, 0, 1, 2);

      setupSessionFactory(0, 3, isNetty(), false);
      setupSessionFactory(1, 4, isNetty(), false);
      setupSessionFactory(2, 5, isNetty(), false);

      createQueue(0, "queues.testaddress", "queue0", null, true);
      createQueue(1, "queues.testaddress", "queue0", null, true);
      createQueue(2, "queues.testaddress", "queue0", null, true);

      addConsumer(0, 0, "queue0", null);
      addConsumer(1, 1, "queue0", null);
      addConsumer(2, 2, "queue0", null);

      waitForBindings(0, "queues.testaddress", 1, 1, true);
      waitForBindings(1, "queues.testaddress", 1, 1, true);
      waitForBindings(2, "queues.testaddress", 1, 1, true);

      waitForBindings(0, "queues.testaddress", 2, 2, false);
      waitForBindings(1, "queues.testaddress", 2, 2, false);
      waitForBindings(2, "queues.testaddress", 2, 2, false);

      send(0, "queues.testaddress", 10, false, null);
      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);

      send(1, "queues.testaddress", 10, false, null);
      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);

      send(2, "queues.testaddress", 10, false, null);
      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);

      failNode(0);

      // live nodes
      waitForBindings(1, "queues.testaddress", 1, 1, true);
      waitForBindings(2, "queues.testaddress", 1, 1, true);
      // activated backup nodes
      waitForBindings(3, "queues.testaddress", 1, 1, true);

      // live nodes
      waitForBindings(1, "queues.testaddress", 2, 2, false);
      waitForBindings(2, "queues.testaddress", 2, 2, false);
      // activated backup nodes
      waitForBindings(3, "queues.testaddress", 2, 2, false);

      ClusterWithBackupFailoverTestBase.log.info("** now sending");

      send(0, "queues.testaddress", 10, false, null);
      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);

      send(1, "queues.testaddress", 10, false, null);
      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);

      send(2, "queues.testaddress", 10, false, null);
      verifyReceiveRoundRobinInSomeOrder(true, 10, 0, 1, 2);

      removeConsumer(0);
      failNode(3);

      // live nodes
      waitForBindings(1, "queues.testaddress", 1, 1, true);
      waitForBindings(2, "queues.testaddress", 1, 1, true);

      // live nodes
      waitForBindings(1, "queues.testaddress", 1, 1, false);
      waitForBindings(2, "queues.testaddress", 1, 1, false);

      send(1, "queues.testaddress", 10, false, null);
      verifyReceiveRoundRobinInSomeOrder(true, 10, 1, 2);

      send(2, "queues.testaddress", 10, false, null);
      verifyReceiveRoundRobinInSomeOrder(true, 10, 1, 2);

      failNode(1);

      // live nodes
      waitForBindings(2, "queues.testaddress", 1, 1, true);
      // activated backup nodes
      waitForBindings(4, "queues.testaddress", 1, 1, true);

      // live nodes
      waitForBindings(2, "queues.testaddress", 1, 1, false);
      // activated backup nodes
      waitForBindings(4, "queues.testaddress", 1, 1, false);

      send(1, "queues.testaddress", 10, false, null);
      verifyReceiveRoundRobinInSomeOrder(true, 10, 1, 2);

      send(2, "queues.testaddress", 10, false, null);
      verifyReceiveRoundRobinInSomeOrder(true, 10, 1, 2);

      removeConsumer(1);
      failNode(4);

      // live nodes
      waitForBindings(2, "queues.testaddress", 1, 1, true);
      // live nodes
      waitForBindings(2, "queues.testaddress", 1, 0, false);
    
      send(2, "queues.testaddress", 10, false, null);
      verifyReceiveRoundRobinInSomeOrder(true, 10, 2);

      failNode(2);

      // live nodes
      waitForBindings(5, "queues.testaddress", 1, 1, true);
      // live nodes
      waitForBindings(5, "queues.testaddress", 0, 0, false);

      send(2, "queues.testaddress", 10, false, null);
      verifyReceiveRoundRobinInSomeOrder(true, 10, 2);

      removeConsumer(2);
      failNode(5);

      stopServers();

      ClusterWithBackupFailoverTestBase.log.info("*** test done");
   }
}
TOP

Related Classes of org.hornetq.tests.integration.cluster.failover.ClusterWithBackupFailoverTestBase

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.