Package org.jboss.test.messaging.jms.clustering

Source Code of org.jboss.test.messaging.jms.clustering.ClusteringTestBase$SimpleFailoverListener

/*
   * JBoss, Home of Professional Open Source
   * Copyright 2005, JBoss Inc., and individual contributors as indicated
   * by the @authors tag. See the copyright.txt in the distribution for a
   * full listing of individual contributors.
   *
   * This is free software; you can redistribute it and/or modify it
   * under the terms of the GNU Lesser General Public License as
   * published by the Free Software Foundation; either version 2.1 of
   * the License, or (at your option) any later version.
   *
   * This software is distributed in the hope that it will be useful,
   * but WITHOUT ANY WARRANTY; without even the implied warranty of
   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
   * Lesser General Public License for more details.
   *
   * You should have received a copy of the GNU Lesser General Public
   * License along with this software; if not, write to the Free
   * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
   * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
   */

package org.jboss.test.messaging.jms.clustering;

import java.util.Iterator;
import java.util.Map;

import javax.jms.Connection;
import javax.jms.Queue;
import javax.jms.Topic;
import javax.naming.InitialContext;

import org.jboss.jms.client.FailoverEvent;
import org.jboss.jms.client.FailoverListener;
import org.jboss.jms.client.JBossConnection;
import org.jboss.jms.client.JBossConnectionFactory;
import org.jboss.jms.client.delegate.ClientClusteredConnectionFactoryDelegate;
import org.jboss.jms.client.delegate.DelegateSupport;
import org.jboss.jms.client.state.ConnectionState;
import org.jboss.jms.tx.ResourceManagerFactory;
import org.jboss.test.messaging.MessagingTestCase;
import org.jboss.test.messaging.tools.ServerManagement;
import org.jboss.test.messaging.tools.container.ServiceAttributeOverrides;

import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;

/**
* @author <a href="mailto:tim.fox@jboss.org">Tim Fox</a>
* @author <a href="mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
* @author <a href="mailto:ovidiu@feodorov.com">Ovidiu Feodorov</a>
* @version <tt>$Revision: 2928 $</tt>
* $Id: ClusteringTestBase.java 2928 2007-07-27 00:33:55Z timfox $
*/
public class ClusteringTestBase extends MessagingTestCase
{
   // Constants ------------------------------------------------------------------------------------

   // Static ---------------------------------------------------------------------------------------

   // Attributes -----------------------------------------------------------------------------------
 
   protected String config = "all+http";

   protected static InitialContext[] ic;
   protected static Queue queue[];
   protected static Topic topic[];
  
   // No need to have multiple connection factories since a clustered connection factory will create
   // connections in a round robin fashion on different servers.

   protected static JBossConnectionFactory cf;
  
   protected int nodeCount = 4;
   protected ServiceAttributeOverrides overrides;
  
   protected static ServiceAttributeOverrides currentOverrides;
  
   // Constructors ---------------------------------------------------------------------------------

   public ClusteringTestBase(String name)
   {
      super(name);
   }
  
   // Public ---------------------------------------------------------------------------------------

   // Package protected ----------------------------------------------------------------------------

   // Protected ------------------------------------------------------------------------------------

   protected int getFailoverNodeForNode(JBossConnectionFactory factory, int nodeID)
   {
     Integer l = (Integer)((ClientClusteredConnectionFactoryDelegate)(factory.getDelegate())).getFailoverMap().get(new Integer(nodeID));
    
      return l.intValue();
   }
  
   protected int getNodeThatFailsOverOnto(JBossConnectionFactory factory, int nodeID)
   {
     Map map = ((ClientClusteredConnectionFactoryDelegate)(factory.getDelegate())).getFailoverMap();
    
     Iterator iter = map.entrySet().iterator();
    
     while (iter.hasNext())
     {
       Map.Entry entry = (Map.Entry)iter.next();
      
       int val = ((Integer)entry.getValue()).intValue();
       int key = ((Integer)entry.getKey()).intValue();
      
       if (val == nodeID)
       {
         return key;
       }
     }
    
     throw new IllegalStateException("Cannot find node that fails over onto " + nodeID);
   }
  
   protected void setUp() throws Exception
   {
      super.setUp();           
            
      log.info("node count is " + nodeCount);
     
      boolean changed = false;
     
      if (ic != null && ic.length < nodeCount)
      {
        log.info("Node count has increased from " + ic.length + " to " + nodeCount);
        //Node count has increased
        InitialContext[] oldIc = ic;
        ic = new InitialContext[nodeCount];
        Queue[] oldQueue = queue;
        queue = new Queue[nodeCount];
        Topic[] oldTopic = topic;
        topic = new Topic[nodeCount];
        for (int i = 0; i < oldIc.length; i++)
        {
          ic[i] = oldIc[i];
          queue[i] = oldQueue[i];
          topic[i] = oldTopic[i];
        }
      }
      else if (ic != null && ic.length > nodeCount)
      {
        log.info("Node count has decreased from " + ic.length + " to " + nodeCount);
        //Node count has decreased
        InitialContext[] oldIc = ic;
        ic = new InitialContext[nodeCount];
        Queue[] oldQueue = queue;
        queue = new Queue[nodeCount];
        Topic[] oldTopic = topic;
        topic = new Topic[nodeCount];
        for (int i = 0; i < nodeCount; i++)
        {
          ic[i] = oldIc[i];
          queue[i] = oldQueue[i];
          topic[i] = oldTopic[i];
        }
       
        for (int i = nodeCount; i < oldIc.length; i++)
        {
          log.info("*** killing server");
          ServerManagement.kill(i);
        }
       
        changed = true;
      }
     
      if (overridesChanged(overrides, currentOverrides))
      {
        log.info("Overrides have changed so stopping and restarting all servers");
        currentOverrides = overrides;
       
        for (int i = 0; i < nodeCount; i++)
         {
          ServerManagement.stop(i);
         }
       
        changed = true;
      }
     
      for (int i = 0; i < nodeCount; i++)
      {
        if (!ServerManagement.isStarted(i))
        {
          log.info("Server " + i + " is not started - starting it");

            startDefaultServer(i, overrides, ic == null);
          
            if (ic == null)
            {
               ic = new InitialContext[nodeCount];
               queue = new Queue[nodeCount];
               topic = new Topic[nodeCount];
            }

           ic[i] = new InitialContext(ServerManagement.getJNDIEnvironment(i));
           
           queue[i] = (Queue)ic[i].lookup("queue/testDistributedQueue");
           topic[i] = (Topic)ic[i].lookup("topic/testDistributedTopic");
          
           changed = true;

        }
       
        checkEmpty(queue[i], i);
       
        // Check no subscriptions left lying around
             
        checkNoSubscriptions(topic[i], i)
       
        ServerManagement.getServer(i).resetAllSuckers();
      }
     
      if (changed)
      {
        //Wait a little while before starting the test to ensure the new view propagates
        //otherwise the view change might hit after the test has started
        Thread.sleep(10000);
      }
     
      if (ic != null)
      {
        cf = (JBossConnectionFactory)ic[0].lookup("/ClusteredConnectionFactory");
      }
   }

   protected void startDefaultServer(int serverNumber, ServiceAttributeOverrides attributes, boolean cleanDatabase)
      throws Exception
   {
      ServerManagement.start(serverNumber, config, attributes, cleanDatabase);

      log.info("deploying queue on node " + serverNumber);
      ServerManagement.deployQueue("testDistributedQueue", serverNumber);
      ServerManagement.deployTopic("testDistributedTopic", serverNumber);
   }

   private boolean overridesChanged(ServiceAttributeOverrides sao1, ServiceAttributeOverrides sao2)
   {
     Map map1 = sao1 == null ? null : sao1.getMap();
    
     Map map2 = sao2 == null ? null : sao2.getMap();
    
     if (map1 == null && map2 == null)
     {
       return false;
     }
     if ((map1 == null && map2 != null) || (map2 == null && map1 != null))
     {
       return true;
     }
    
     if (map1.size() != map2.size())
     {
       return true;
     }
    
     Iterator iter = map1.entrySet().iterator();
     while (iter.hasNext())
     {
       Map.Entry entry = (Map.Entry)iter.next();
       Object otherVal = map2.get(entry.getKey());
       if (otherVal == null)
       {
         return true;
       }
       if (!otherVal.equals(entry.getValue()))
       {
         return true;
       }
     }    
     return false;
   }
  
   protected void tearDown() throws Exception
   {
      super.tearDown();
                
      if (ResourceManagerFactory.instance.size() != 0)
      {
        ResourceManagerFactory.instance.dump();
       
        fail("Connection(s) have been left open");
      }
   }

   protected String getLocatorURL(Connection conn)
   {
      return getConnectionState(conn).getRemotingConnection().
         getRemotingClient().getInvoker().getLocator().getLocatorURI();
   }

   protected String getObjectId(Connection conn)
   {
      return ((DelegateSupport) ((JBossConnection) conn).
         getDelegate()).getID();
   }

   protected ConnectionState getConnectionState(Connection conn)
   {
      return (ConnectionState) (((DelegateSupport) ((JBossConnection) conn).
         getDelegate()).getState());
   }
 
   protected void waitForFailoverComplete(int serverID, Connection conn1)
      throws Exception
   {

      assertEquals(serverID, ((JBossConnection)conn1).getServerID());

      // register a failover listener
      SimpleFailoverListener failoverListener = new SimpleFailoverListener();
      ((JBossConnection)conn1).registerFailoverListener(failoverListener);

      log.debug("killing node " + serverID + " ....");

      ServerManagement.kill(serverID);

      log.info("########");
      log.info("######## KILLED NODE " + serverID);
      log.info("########");

      // wait for the client-side failover to complete

      while (true)
      {
        FailoverEvent event = failoverListener.getEvent(30000);
        if (event != null && FailoverEvent.FAILOVER_COMPLETED == event.getType())
        {
          break;
        }
        if (event == null)
        {
          fail("Did not get expected FAILOVER_COMPLETED event");
        }
      }

      // failover complete
      log.info("failover completed");
   }



   /**
    * Lookup for the connection with the right serverID. I'm using this method to find the proper
    * serverId so I won't relay on loadBalancing policies on testcases.
    */
   protected Connection getConnection(Connection[] conn, int serverId) throws Exception
   {
      for(int i = 0; i < conn.length; i++)
      {
         ConnectionState state = (ConnectionState)(((DelegateSupport)((JBossConnection)conn[i]).
            getDelegate()).getState());

         if (state.getServerID() == serverId)
         {
            return conn[i];
         }
      }

      return null;
   }

   protected void checkConnectionsDifferentServers(Connection[] conn) throws Exception
   {
      int[] serverID = new int[conn.length];
      for(int i = 0; i < conn.length; i++)
      {
         ConnectionState state = (ConnectionState)(((DelegateSupport)((JBossConnection)conn[i]).
            getDelegate()).getState());
         serverID[i] = state.getServerID();
      }

      for(int i = 0; i < nodeCount; i++)
      {
         for(int j = 0; j < nodeCount; j++)
         {
            if (i == j)
            {
               continue;
            }

            if (serverID[i] == serverID[j])
            {
               fail("Connections " + i + " and " + j +
                  " are pointing to the same physical node (" + serverID[i] + ")");
            }
         }
      }
   }

   // Private --------------------------------------------------------------------------------------

   // Inner classes --------------------------------------------------------------------------------
  
   protected class SimpleFailoverListener implements FailoverListener
   {
      private LinkedQueue buffer;

      public SimpleFailoverListener()
      {
         buffer = new LinkedQueue();
      }

      public void failoverEventOccured(FailoverEvent event)
      {
         try
         {
            buffer.put(event);
         }
         catch(InterruptedException e)
         {
            throw new RuntimeException("Putting thread interrupted while trying to add event " +
               "to buffer", e);
         }
      }

      /**
       * Blocks until a FailoverEvent is available or timeout occurs, in which case returns null.
       */
      public FailoverEvent getEvent(long timeout) throws InterruptedException
      {
         return (FailoverEvent)buffer.poll(timeout);
      }
   }

}
TOP

Related Classes of org.jboss.test.messaging.jms.clustering.ClusteringTestBase$SimpleFailoverListener

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.