Package org.hornetq.tests.integration.cluster.reattach

Source Code of org.hornetq.tests.integration.cluster.reattach.MyHandler

/*
* Copyright 2009 Red Hat, Inc.
* Red Hat licenses this file to you under the Apache License, version
* 2.0 (the "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*    http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.  See the License for the specific language governing
* permissions and limitations under the License.
*/

package org.hornetq.tests.integration.cluster.reattach;

import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import junit.framework.Assert;
import junit.framework.TestSuite;

import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.client.ClientConsumer;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.api.core.client.MessageHandler;
import org.hornetq.core.client.impl.ClientSessionInternal;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.protocol.core.impl.RemotingConnectionImpl;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.jms.client.HornetQTextMessage;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.utils.concurrent.LinkedBlockingDeque;

/**
* A OrderReattachTest
*
* @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
*
*
*/
public class OrderReattachTest extends ServiceTestBase
{

   // Disabled for now... under investigation (Clebert)
   public static TestSuite suite()
   {
      TestSuite suite = new TestSuite();

      return suite;
   }

   // Constants -----------------------------------------------------

   final SimpleString ADDRESS = new SimpleString("address");

   // Attributes ----------------------------------------------------
   private final Logger log = Logger.getLogger(this.getClass());

   private HornetQServer server;

   // Static --------------------------------------------------------

   // Constructors --------------------------------------------------

   // Public --------------------------------------------------------

   public void testOrderOnSendInVM() throws Throwable
   {
      for (int i = 0; i < 500; i++)
      {
         log.info("#" + getName() + " # " + i);
         doTestOrderOnSend(false);
         tearDown();
         setUp();
      }
   }

   public void doTestOrderOnSend(final boolean isNetty) throws Throwable
   {
      server = createServer(false, isNetty);

      server.start();

      ClientSessionFactory sf = createFactory(isNetty);
      sf.setReconnectAttempts(-1);
      sf.setConfirmationWindowSize(100 * 1024 * 1024);
      sf.setBlockOnNonDurableSend(false);
      sf.setBlockOnAcknowledge(false);

      final ClientSession session = sf.createSession(false, true, true);

      final LinkedBlockingDeque<Boolean> failureQueue = new LinkedBlockingDeque<Boolean>();

      final CountDownLatch ready = new CountDownLatch(1);

     
      // this test will use a queue. Whenever the test wants a failure.. it can just send TRUE to failureQueue
      // This Thread will be reading the queue
      Thread failer = new Thread()
      {
         @Override
         public void run()
         {
            ready.countDown();
            while (true)
            {
               try
               {
                  Boolean poll = false;
                  try
                  {
                     poll = failureQueue.poll(60, TimeUnit.SECONDS);
                  }
                  catch (InterruptedException e)
                  {
                     e.printStackTrace();
                     break;
                  }

                  Thread.sleep(1);

                  final RemotingConnectionImpl conn = (RemotingConnectionImpl)((ClientSessionInternal)session).getConnection();

                  // True means... fail session
                  if (poll)
                  {
                     conn.fail(new HornetQException(HornetQException.NOT_CONNECTED, "poop"));
                  }
                  else
                  {
                     // false means... finish thread
                     break;
                  }
               }
               catch (Exception e)
               {
                  e.printStackTrace();
               }
            }
         }
      };

      failer.start();

      ready.await();

      try
      {
         doSend2(1, sf, failureQueue);
      }
      finally
      {
         try
         {
            session.close();
         }
         catch (Exception e)
         {
            e.printStackTrace();
         }

         try
         {
            sf.close();
         }
         catch (Exception e)
         {
            e.printStackTrace();
         }

         failureQueue.put(false);

         failer.join();
      }

   }

   public void doSend2(final int order, final ClientSessionFactory sf, final LinkedBlockingDeque<Boolean> failureQueue) throws Exception
   {
      long start = System.currentTimeMillis();

      ClientSession s = sf.createSession(false, false, false);

      final int numMessages = 500;

      final int numSessions = 100;

      Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
      Set<ClientSession> sessions = new HashSet<ClientSession>();

      for (int i = 0; i < numSessions; i++)
      {
         SimpleString subName = new SimpleString("sub" + i);

         // failureQueue.push(true);

         ClientSession sessConsume = sf.createSession(false, true, true);

         sessConsume.createQueue(ADDRESS, subName, null, false);

         ClientConsumer consumer = sessConsume.createConsumer(subName);

         consumers.add(consumer);

         sessions.add(sessConsume);
      }

      ClientSession sessSend = sf.createSession(false, true, true);

      ClientProducer producer = sessSend.createProducer(ADDRESS);

      for (int i = 0; i < numMessages; i++)
      {
         ClientMessage message = sessSend.createMessage(HornetQTextMessage.TYPE,
                                                        false,
                                                        0,
                                                        System.currentTimeMillis(),
                                                        (byte)1);

         if (i % 10 == 0)
         {
            // failureQueue.push(true);
         }
         message.putIntProperty(new SimpleString("count"), i);
         producer.send(message);
      }

      for (ClientSession session : sessions)
      {
         session.start();
      }

      class MyHandler implements MessageHandler
      {
         final CountDownLatch latch = new CountDownLatch(1);

         volatile int count;
        
         Exception failure;

         public void onMessage(final ClientMessage message)
         {
            if (count >= numMessages)
            {
               failure = new Exception("too many messages");
               latch.countDown();
            }
           
            if (message.getIntProperty("count") != count)
            {
               failure = new Exception("counter " + count + " was not as expected (" + message.getIntProperty("count") + ")");
               log.warn("Failure on receiving message ", failure);
               failure.printStackTrace();
               latch.countDown();
            }

            count++;

            if (count % 100 == 0)
            {
               failureQueue.push(true);
            }

            if (count == numMessages)
            {
               latch.countDown();
            }
         }
      }

      Set<MyHandler> handlers = new HashSet<MyHandler>();

      for (ClientConsumer consumer : consumers)
      {
         MyHandler handler = new MyHandler();

         consumer.setMessageHandler(handler);

         handlers.add(handler);
      }

      for (MyHandler handler : handlers)
      {
         boolean ok = handler.latch.await(60000, TimeUnit.MILLISECONDS);

         Assert.assertTrue(ok);
        
         if (handler.failure != null)
         {
            throw handler.failure;
         }
      }

      // failureQueue.push(true);

      sessSend.close();

      for (ClientSession session : sessions)
      {
         // failureQueue.push(true);
         session.close();
      }

      for (int i = 0; i < numSessions; i++)
      {

         failureQueue.push(true);

         SimpleString subName = new SimpleString("sub" + i);

         s.deleteQueue(subName);
      }

      s.close();

      long end = System.currentTimeMillis();

   }

   // Package protected ---------------------------------------------

   // Protected -----------------------------------------------------

   @Override
   protected void setUp() throws Exception
   {
      super.setUp();
   }

   @Override
   protected void tearDown() throws Exception
   {
      if (server != null && server.isStarted())
      {
         server.stop();
      }

      super.tearDown();
   }

   // Private -------------------------------------------------------

   // Inner classes -------------------------------------------------

}
TOP

Related Classes of org.hornetq.tests.integration.cluster.reattach.MyHandler

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.