Package org.hornetq.tests.integration.journal

Source Code of org.hornetq.tests.integration.journal.JournalPerfTuneTest$TestCallback

/*
* Copyright 2010 Red Hat, Inc.
* Red Hat licenses this file to you under the Apache License, version
* 2.0 (the "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*    http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
* implied.  See the License for the specific language governing
* permissions and limitations under the License.
*/

package org.hornetq.tests.integration.journal;

import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicLong;

import junit.framework.TestSuite;

import org.hornetq.api.core.HornetQBuffer;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.journal.IOCompletion;
import org.hornetq.core.journal.Journal;
import org.hornetq.core.journal.LoaderCallback;
import org.hornetq.core.journal.PreparedTransactionInfo;
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.SequentialFileFactory;
import org.hornetq.core.journal.impl.AIOSequentialFileFactory;
import org.hornetq.core.journal.impl.JournalImpl;
import org.hornetq.core.logging.Logger;
import org.hornetq.tests.util.UnitTestCase;
import org.hornetq.utils.DataConstants;

/**
* A JournalPerfTuneTest
*
* @author tim
*
*
*/
public class JournalPerfTuneTest extends UnitTestCase
{
   private static final Logger log = Logger.getLogger(JournalPerfTuneTest.class);

   private Journal journal;

   public static TestSuite suite()
   {
      TestSuite suite = new TestSuite();
      return suite;
   }

   protected void setUp() throws Exception
   {
      super.setUp();

      final int fileSize = 1024 * 1024 * 10;
      final int minFiles = 10;
      final int compactMinFiles = 20;
      final int compactPercentage = 30;
      final String filePrefix = "data";
      final String extension = "hq";
      final int maxIO = 500;

      final String journalDir = getTestDir();
      final int bufferSize = 490 * 1024;
      final int bufferTimeout = (int)(1000000000d / 2000);
      final boolean logRates = true;

      super.recreateDirectory(journalDir);

      SequentialFileFactory fileFactory = new AIOSequentialFileFactory(journalDir, bufferSize, bufferTimeout, logRates);

      journal = new JournalImpl(fileSize,
                                minFiles,
                                compactMinFiles,
                                compactPercentage,
                                fileFactory,
                                filePrefix,
                                extension,
                                maxIO);

      journal.start();

      class LoaderCB implements LoaderCallback
      {

         public void addPreparedTransaction(PreparedTransactionInfo preparedTransaction)
         {
            // TODO Auto-generated method stub

         }

         public void addRecord(RecordInfo info)
         {
            // TODO Auto-generated method stub

         }

         public void deleteRecord(long id)
         {
            // TODO Auto-generated method stub

         }

         public void updateRecord(RecordInfo info)
         {
            // TODO Auto-generated method stub

         }

         public void failedTransaction(long transactionID, List<RecordInfo> records, List<RecordInfo> recordsToDelete)
         {
            // TODO Auto-generated method stub

         }

      }

      journal.load(new LoaderCB());
   }

   class TestCallback implements IOCompletion
   {
      private CountDownLatch latch;

      TestCallback(final int counts)
      {
         this.latch = new CountDownLatch(counts);
      }

      public void await() throws Exception
      {
         latch.await();
      }

      public void storeLineUp()
      {
      }

      public void done()
      {
         latch.countDown();

         log.info(latch.getCount());
      }

      public void onError(int errorCode, String errorMessage)
      {
      }

   }

   protected void tearDown() throws Exception
   {
      journal.stop();

      super.tearDown();

   }

   public void test1() throws Exception
   {
      final int itersPerThread = 10000000;

      final int numThreads = 1;

      this.callback = new TestCallback(2 * itersPerThread * numThreads);

      Worker[] workers = new Worker[numThreads];

      for (int i = 0; i < numThreads; i++)
      {
         workers[i] = new Worker(itersPerThread);

         workers[i].start();
      }

      for (int i = 0; i < numThreads; i++)
      {
         workers[i].join();
      }

      callback.await();
   }

   private AtomicLong idGen = new AtomicLong(0);

   private TestCallback callback;

   class Worker extends Thread
   {
      final int iters;

      Worker(final int iters)
      {
         this.iters = iters;
      }

      public void run()
      {
         try
         {
            Record record = new Record(new byte[1024]);

            for (int i = 0; i < iters; i++)
            {
               long id = idGen.getAndIncrement();

               journal.appendAddRecord(id, (byte)0, record, true, callback);

               journal.appendDeleteRecord(id, true, callback);

               // log.info("did " + i);
            }
         }
         catch (Exception e)
         {
            log.error("Failed", e);
         }
      }
   }

   static class Record implements EncodingSupport
   {
      private byte[] bytes;

      Record(byte[] bytes)
      {
         this.bytes = bytes;
      }

      public void decode(HornetQBuffer buffer)
      {
         int length = buffer.readInt();

         bytes = new byte[length];

         buffer.readBytes(bytes);
      }

      public void encode(HornetQBuffer buffer)
      {
         buffer.writeInt(bytes.length);

         buffer.writeBytes(bytes);
      }

      public int getEncodeSize()
      {
         return DataConstants.SIZE_INT + bytes.length;
      }

   }
}
TOP

Related Classes of org.hornetq.tests.integration.journal.JournalPerfTuneTest$TestCallback

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.