Package org.hornetq.tests.stress.journal

Source Code of org.hornetq.tests.stress.journal.FastConsumer

/*
* Copyright 2009 Red Hat, Inc.
* Red Hat licenses this file to you under the Apache License, version
* 2.0 (the "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*    http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.  See the License for the specific language governing
* permissions and limitations under the License.
*/

package org.hornetq.tests.stress.journal;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;

import junit.framework.Assert;

import org.hornetq.api.core.HornetQException;
import org.hornetq.api.core.Message;
import org.hornetq.api.core.client.ClientConsumer;
import org.hornetq.api.core.client.ClientMessage;
import org.hornetq.api.core.client.ClientProducer;
import org.hornetq.api.core.client.ClientSession;
import org.hornetq.api.core.client.ClientSessionFactory;
import org.hornetq.core.asyncio.impl.AsynchronousFileImpl;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.JournalType;
import org.hornetq.tests.util.ServiceTestBase;

/**
* A CompactingTest
*
* @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
*
*
*/
public class CompactingStressTest extends ServiceTestBase
{

   // Constants -----------------------------------------------------

   // Attributes ----------------------------------------------------

   private static final String AD1 = "ad1";

   private static final String AD2 = "ad2";

   private static final String AD3 = "ad3";

   private static final String Q1 = "q1";

   private static final String Q2 = "q2";

   private static final String Q3 = "q3";

   private static final int TOT_AD3 = 5000;

   private HornetQServer server;

   private ClientSessionFactory sf;

   // Static --------------------------------------------------------

   // Constructors --------------------------------------------------

   // Public --------------------------------------------------------

   public void testCleanupAIO() throws Throwable
   {
      if (AsynchronousFileImpl.isLoaded())
      {
         internalTestCleanup(JournalType.ASYNCIO);
         tearDown();
         setUp();
      }
   }

   public void testCleanupNIO() throws Throwable
   {
      internalTestCleanup(JournalType.NIO);
      tearDown();
      setUp();
   }

   private void internalTestCleanup(final JournalType journalType) throws Throwable
   {
      setupServer(journalType);

      ClientSession session = sf.createSession(false, true, true);

      ClientProducer prod = session.createProducer(CompactingStressTest.AD1);

      for (int i = 0; i < 500; i++)
      {
         prod.send(session.createMessage(true));
      }

      session.commit();

      prod.close();

      ClientConsumer cons = session.createConsumer(CompactingStressTest.Q2);
      prod = session.createProducer(CompactingStressTest.AD2);

      session.start();

      for (int i = 0; i < 200; i++)
      {
         System.out.println("Iteration " + i);
         // Sending non transactionally, so it would test non transactional stuff on the journal
         for (int j = 0; j < 1000; j++)
         {
            Message msg = session.createMessage(true);
            msg.getBodyBuffer().writeBytes(new byte[1024]);

            prod.send(msg);
         }

         // I need to guarantee a roundtrip to the server, to make sure everything is persisted
         session.commit();

         for (int j = 0; j < 1000; j++)
         {
            ClientMessage msg = cons.receive(2000);
            Assert.assertNotNull(msg);
            msg.acknowledge();
         }

         // I need to guarantee a roundtrip to the server, to make sure everything is persisted
         session.commit();

      }

      Assert.assertNull(cons.receiveImmediate());

      session.close();

      server.stop();

      server.start();

      session = sf.createSession(false, true, true);
      cons = session.createConsumer(CompactingStressTest.Q1);
      session.start();

      for (int i = 0; i < 500; i++)
      {
         ClientMessage msg = cons.receive(1000);
         Assert.assertNotNull(msg);
         msg.acknowledge();
      }

      Assert.assertNull(cons.receiveImmediate());

      prod = session.createProducer(CompactingStressTest.AD2);

      session.close();

   }

   public void testMultiProducerAndCompactAIO() throws Throwable
   {
      internalTestMultiProducer(JournalType.ASYNCIO);
   }

   public void testMultiProducerAndCompactNIO() throws Throwable
   {
      internalTestMultiProducer(JournalType.NIO);
   }

   public void internalTestMultiProducer(final JournalType journalType) throws Throwable
   {

      setupServer(journalType);

      ClientSession session = sf.createSession(false, false);

      try
      {
         ClientProducer producer = session.createProducer(CompactingStressTest.AD3);

         byte[] buffer = new byte[10 * 1024];

         ClientMessage msg = session.createMessage(true);

         for (int i = 0; i < CompactingStressTest.TOT_AD3; i++)
         {
            producer.send(msg);
            if (i % 100 == 0)
            {
               session.commit();
            }
         }

         session.commit();
      }
      finally
      {
         session.close();
      }

      server.stop();

      setupServer(journalType);

      final AtomicInteger numberOfMessages = new AtomicInteger(0);
      final int NUMBER_OF_FAST_MESSAGES = 100000;
      final int SLOW_INTERVAL = 100;

      final CountDownLatch latchReady = new CountDownLatch(2);
      final CountDownLatch latchStart = new CountDownLatch(1);

      class FastProducer extends Thread
      {
         Throwable e;

         FastProducer()
         {
            super("Fast-Thread");
         }

         @Override
         public void run()
         {
            ClientSession session = null;
            ClientSession sessionSlow = null;
            latchReady.countDown();
            try
            {
               latchStart.await();
               session = sf.createSession(true, true);
               sessionSlow = sf.createSession(false, false);
               ClientProducer prod = session.createProducer(CompactingStressTest.AD2);
               ClientProducer slowProd = sessionSlow.createProducer(CompactingStressTest.AD1);
               for (int i = 0; i < NUMBER_OF_FAST_MESSAGES; i++)
               {
                  if (i % SLOW_INTERVAL == 0)
                  {
                     if (numberOfMessages.incrementAndGet() % 5 == 0)
                     {
                        sessionSlow.commit();
                     }
                     slowProd.send(session.createMessage(true));
                  }
                  ClientMessage msg = session.createMessage(true);

                  prod.send(msg);
               }
               sessionSlow.commit();
            }
            catch (Throwable e)
            {
               this.e = e;
            }
            finally
            {
               try
               {
                  session.close();
               }
               catch (Throwable e)
               {
                  this.e = e;
               }
               try
               {
                  sessionSlow.close();
               }
               catch (Throwable e)
               {
                  this.e = e;
               }
            }
         }
      }

      class FastConsumer extends Thread
      {
         Throwable e;

         FastConsumer()
         {
            super("Fast-Consumer");
         }

         @Override
         public void run()
         {
            ClientSession session = null;
            latchReady.countDown();
            try
            {
               latchStart.await();
               session = sf.createSession(true, true);
               session.start();
               ClientConsumer cons = session.createConsumer(CompactingStressTest.Q2);
               for (int i = 0; i < NUMBER_OF_FAST_MESSAGES; i++)
               {
                  ClientMessage msg = cons.receive(60 * 1000);
                  msg.acknowledge();
               }

               Assert.assertNull(cons.receiveImmediate());
            }
            catch (Throwable e)
            {
               this.e = e;
            }
            finally
            {
               try
               {
                  session.close();
               }
               catch (Throwable e)
               {
                  this.e = e;
               }
            }
         }
      }

      FastConsumer f1 = new FastConsumer();
      f1.start();

      FastProducer p1 = new FastProducer();
      p1.start();

      latchReady.await();
      latchStart.countDown();

      p1.join();

      if (p1.e != null)
      {
         throw p1.e;
      }

      f1.join();

      if (f1.e != null)
      {
         throw f1.e;
      }

      sf.close();

      server.stop();

      setupServer(journalType);

      ClientSession sess = null;

      try
      {

         sess = sf.createSession(true, true);

         ClientConsumer cons = sess.createConsumer(CompactingStressTest.Q1);

         sess.start();

         for (int i = 0; i < numberOfMessages.intValue(); i++)
         {
            ClientMessage msg = cons.receive(60000);
            Assert.assertNotNull(msg);
            msg.acknowledge();
         }

         Assert.assertNull(cons.receiveImmediate());

         cons.close();

         cons = sess.createConsumer(CompactingStressTest.Q2);

         Assert.assertNull(cons.receiveImmediate());

         cons.close();

         cons = sess.createConsumer(CompactingStressTest.Q3);

         for (int i = 0; i < CompactingStressTest.TOT_AD3; i++)
         {
            ClientMessage msg = cons.receive(60000);
            Assert.assertNotNull(msg);
            msg.acknowledge();
         }

         Assert.assertNull(cons.receiveImmediate());

      }
      finally
      {
         try
         {
            sess.close();
         }
         catch (Throwable ignored)
         {
         }
      }
   }

   @Override
   protected void setUp() throws Exception
   {
      super.setUp();

      clearData();
   }

   /**
    * @throws Exception
    * @throws HornetQException
    */
   private void setupServer(final JournalType journalType) throws Exception, HornetQException
   {
      Configuration config = createDefaultConfig();
      config.setJournalSyncNonTransactional(false);
      config.setJournalFileSize(ConfigurationImpl.DEFAULT_JOURNAL_FILE_SIZE);

      config.setJournalType(journalType);

      config.setJournalCompactMinFiles(10);
      config.setJournalCompactPercentage(50);

      server = createServer(true, config);

      server.start();

      sf = createInVMFactory();
      sf.setBlockOnDurableSend(false);
      sf.setBlockOnAcknowledge(false);

      ClientSession sess = sf.createSession();

      try
      {
         sess.createQueue(CompactingStressTest.AD1, CompactingStressTest.Q1, true);
      }
      catch (Exception ignored)
      {
      }

      try
      {
         sess.createQueue(CompactingStressTest.AD2, CompactingStressTest.Q2, true);
      }
      catch (Exception ignored)
      {
      }

      try
      {
         sess.createQueue(CompactingStressTest.AD3, CompactingStressTest.Q3, true);
      }
      catch (Exception ignored)
      {
      }

      sess.close();
   }

   @Override
   protected void tearDown() throws Exception
   {
      try
      {
         if (sf != null)
         {
            sf.close();
         }

         if (server != null)
         {
            server.stop();
         }
      }
      catch (Exception e)
      {
         e.printStackTrace(); // system.out -> junit reports
      }

      server = null;

      sf = null;

      super.tearDown();
   }

   // Package protected ---------------------------------------------

   // Protected -----------------------------------------------------

   // Private -------------------------------------------------------

   // Inner classes -------------------------------------------------

}
TOP

Related Classes of org.hornetq.tests.stress.journal.FastConsumer

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.