Package org.jboss.cache.replicated

Source Code of org.jboss.cache.replicated.SyncReplTxTest

/*
*
* JBoss, the OpenSource J2EE webOS
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jboss.cache.replicated;

import EDU.oswego.cs.dl.util.concurrent.FIFOSemaphore;
import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.AbstractTreeCacheListener;
import org.jboss.cache.CacheException;
import org.jboss.cache.Fqn;
import org.jboss.cache.TreeCache;
import org.jboss.cache.TreeCacheMBean;
import org.jboss.cache.lock.IsolationLevel;
import org.jboss.cache.lock.TimeoutException;
import org.jboss.cache.transaction.DummyTransactionManager;

import javax.naming.Context;
import javax.transaction.NotSupportedException;
import javax.transaction.RollbackException;
import javax.transaction.Status;
import javax.transaction.Synchronization;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import java.util.ArrayList;
import java.util.List;

/**
* Replicated unit test for sync transactional TreeCache
* Note: we use DummyTransactionManager for Tx purpose instead of relying on
* jta.
*
* @version $Revision: 3137 $
*/
public class SyncReplTxTest extends TestCase {
    private static Log log = LogFactory.getLog(SyncReplTxTest.class);
   TreeCache cache1, cache2;
   int caching_mode=TreeCache.REPL_SYNC;
   final String group_name="TreeCacheTestGroup";
   String props=
         "UDP(ip_mcast=true;ip_ttl=64;loopback=false;mcast_addr=228.1.2.3;" +
         "mcast_port=45566;mcast_recv_buf_size=80000;mcast_send_buf_size=150000;" +
         "ucast_recv_buf_size=80000;ucast_send_buf_size=150000):" +
         "PING(down_thread=true;num_initial_members=2;timeout=500;up_thread=true):" +
         "MERGE2(max_interval=20000;min_interval=10000):" +
         "FD(down_thread=true;shun=true;up_thread=true):" +
         "VERIFY_SUSPECT(down_thread=true;timeout=1500;up_thread=true):" +
         "pbcast.NAKACK(down_thread=true;gc_lag=50;retransmit_timeout=600,1200,2400,4800;" +
         "up_thread=true):" +
         "pbcast.STABLE(desired_avg_gossip=20000;down_thread=true;up_thread=true):" +
         "UNICAST(down_thread=true;min_threshold=10;timeout=600,1200,2400;window_size=100):" +
         "FRAG(down_thread=true;frag_size=8192;up_thread=true):" +
         "pbcast.GMS(join_retry_timeout=2000;join_timeout=5000;print_local_addr=true;shun=true):" +
         "pbcast.STATE_TRANSFER(down_thread=true;up_thread=true)";

   final static Log log_=LogFactory.getLog(SyncReplTxTest.class);
   String old_factory=null;
   final String FACTORY="org.jboss.cache.transaction.DummyContextFactory";
   FIFOSemaphore lock=new FIFOSemaphore(1);
   DummyTransactionManager tx_mgr;
   Throwable t1_ex, t2_ex, ex=null;



   public SyncReplTxTest(String name) {
      super(name);
   }

   public void setUp() throws Exception {
      super.setUp();
      old_factory=System.getProperty(Context.INITIAL_CONTEXT_FACTORY);
      System.setProperty(Context.INITIAL_CONTEXT_FACTORY, FACTORY);
      tx_mgr=DummyTransactionManager.getInstance();
      t1_ex=t2_ex=ex=null;
   }

   public void tearDown() throws Exception {
      super.tearDown();
      DummyTransactionManager.destroy();
      destroyCaches();
      if(old_factory != null) {
         System.setProperty(Context.INITIAL_CONTEXT_FACTORY, old_factory);
         old_factory=null;
      }
   }

   Transaction beginTransaction() throws SystemException, NotSupportedException {
      DummyTransactionManager mgr=DummyTransactionManager.getInstance();
      mgr.begin();
      Transaction tx=mgr.getTransaction();
      return tx;
   }

   void initCaches(int caching_mode) throws Exception {
      this.caching_mode=caching_mode;
      cache1=new TreeCache();
      cache2=new TreeCache();
      cache1.setCacheMode(caching_mode);
      cache2.setCacheMode(caching_mode);
      cache1.setIsolationLevel(IsolationLevel.SERIALIZABLE);
      cache2.setIsolationLevel(IsolationLevel.SERIALIZABLE);

      cache1.setTransactionManagerLookupClass("org.jboss.cache.DummyTransactionManagerLookup");
      cache2.setTransactionManagerLookupClass("org.jboss.cache.DummyTransactionManagerLookup");
      /*
      cache1.setTransactionManagerLookupClass("org.jboss.cache.JBossTransactionManagerLookup");
      cache2.setTransactionManagerLookupClass("org.jboss.cache.JBossTransactionManagerLookup");
*/
      cache1.setLockAcquisitionTimeout(5000);
      cache2.setLockAcquisitionTimeout(5000);
     
      configureMultiplexer(cache1);     
      configureMultiplexer(cache2);
     
      cache1.start();
      cache2.start();
     
      validateMultiplexer(cache1);
      validateMultiplexer(cache2);
   }
  
   /**
    * Provides a hook for multiplexer integration. This default implementation
    * is a no-op; subclasses that test mux integration would override
    * to integrate the given cache with a multiplexer.
    *
    * param cache a cache that has been configured but not yet created.
    */
   protected void configureMultiplexer(TreeCacheMBean cache) throws Exception
   {
      // default does nothing
   }
  
   /**
    * Provides a hook to check that the cache's channel came from the
    * multiplexer, or not, as expected.  This default impl asserts that
    * the channel did not come from the multiplexer.
    *
    * @param cache a cache that has already been started
    */
   protected void validateMultiplexer(TreeCacheMBean cache)
   {
      assertFalse("Cache is not using multiplexer", cache.isUsingMultiplexer());
   }

   void destroyCaches() throws Exception {
      if(cache1 != null)
         cache1.stop();
      if(cache2 != null)
         cache2.stop();
      cache1=null;
      cache2=null;
   }

   public void testLockRemoval() throws Exception {
      initCaches(TreeCache.REPL_SYNC);
      cache1.setSyncCommitPhase(true);
      cache1.releaseAllLocks("/");
      Transaction tx=beginTransaction();
      cache1.put("/bela/ban", "name", "Bela Ban");
      assertEquals(3, cache1.getNumberOfLocksHeld());
      assertEquals(0, cache2.getNumberOfLocksHeld());
      tx.commit();
      assertEquals(0, cache1.getNumberOfLocksHeld());
      assertEquals(0, cache2.getNumberOfLocksHeld());
    }



   public void testSyncRepl() throws Exception {
      Integer age;
      Transaction tx;

      try {
         initCaches(TreeCache.REPL_SYNC);
         cache1.setSyncCommitPhase(true);
         cache2.setSyncCommitPhase(true);

         // assertEquals(2, cache1.getMembers().size());

         tx=beginTransaction();
         cache1.put("/a/b/c", "age", new Integer(38));
         TransactionManager mgr = cache1.getTransactionManager();
         tx = mgr.suspend();
         assertNull("age on cache2 must be null as the TX has not yet been committed", cache2.get("/a/b/c", "age"));
          log.debug("cache1: locks held before commit: " + cache1.printLockInfo());
          log.debug("cache2: locks held before commit: " + cache2.printLockInfo());
         mgr.resume(tx);
         tx.commit();
          log.debug("cache1: locks held after commit: " + cache1.printLockInfo());
          log.debug("cache2: locks held after commit: " + cache2.printLockInfo());

          // value on cache2 must be 38
         age=(Integer)cache2.get("/a/b/c", "age");
         assertNotNull("\"age\" obtained from cache2 must be non-null ", age);
         assertTrue("\"age\" must be 38", age.intValue() == 38);
      }
      catch(Exception e) {
         fail(e.toString());
      }
   }

   /**
    *
    * @throws Exception
    */
   public void testSimplePut() throws Exception {
      Integer age;

      try {
         initCaches(TreeCache.REPL_SYNC);

         cache1.put("/JSESSION/localhost/192.168.1.10:32882/Courses/0", "Instructor", "Ben Wang");

         cache1.put("/JSESSION/localhost/192.168.1.10:32882/1", "Number", new Integer(10));
      }
      catch(Exception e) {
         fail(e.toString());
      }
   }


   public void testSimpleTxPut() throws Exception {
      Transaction tx;
      final Fqn NODE1=Fqn.fromString("/one/two/three");
      final Fqn NODE2=Fqn.fromString("/one/two/three/four");
      final Fqn NODE3=Fqn.fromString("/one/two/three/fourI");

    try {
       initCaches(TreeCache.REPL_SYNC);

       tx=beginTransaction();
       cache1.put(NODE1, "age", new Integer(38));
       System.out.println("TransactionTable for cache1 after cache1.put():\n" + cache1.getTransactionTable().toString(true));
       tx.commit();

       /*
       tx=beginTransaction();
       cache1.put(NODE1, "age", new Integer(38));
       cache1.put(NODE2, "name", "Ben of The Far East");
       cache1.put(NODE3, "key", "UnknowKey");
       System.out.println("TransactionTable for cache1 after cache1.put():\n" + cache1.getTransactionTable().toString(true));

       tx.commit();
       */

       /*
       tx=beginTransaction();
       cache1.put(NODE1, "age", new Integer(38));
       cache1.put(NODE1, "AOPInstance", new AOPInstance());
       cache1.put(NODE2, "AOPInstance", new AOPInstance());
       cache1.put(NODE1, "AOPInstance", new AOPInstance());
       tx.commit();
       System.out.println("TransactionTable for cache1 after cache1.put():\n" + cache1.getTransactionTable().toString(true));
       */
    }
    catch(Exception e) {
       fail(e.toString());
    }
}


     public void testSyncReplWithModficationsOnBothCaches() throws Exception {
        Integer age;
        Transaction tx;
        final Fqn NODE1=Fqn.fromString("/one/two/three");
        final Fqn NODE2=Fqn.fromString("/eins/zwei/drei");

         initCaches(TreeCache.REPL_SYNC);

         // create roots first
        cache1.put("/one/two", null);
        cache2.put("/eins/zwei", null);

         cache1.setSyncCommitPhase(true);
         cache2.setSyncCommitPhase(true);

         tx=beginTransaction();
         cache1.put(NODE1, "age", new Integer(38));
         System.out.println("TransactionTable for cache1 after cache1.put():\n" + cache1.getTransactionTable().toString(true));

         cache2.put(NODE2, "age", new Integer(39));
         System.out.println("TransactionTable for cache2 after cache2.put():\n" + cache2.getTransactionTable().toString(true));

         System.out.println("cache1 before commit:\n" + cache1.printLockInfo());
         System.out.println("cache2 before commit:\n" + cache2.printLockInfo());

        try
        {
            tx.commit();
           fail("Should not succeed with SERIALIZABLE semantics");
        }
        catch (Exception e)
        {
           //should be a classic deadlock here.
        }

         System.out.println("cache1 after commit:\n" + cache1.printLockInfo());
         System.out.println("cache2 after commit:\n" + cache2.printLockInfo());

        /*
         assertTrue(cache1.exists(NODE1));
         assertTrue(cache1.exists(NODE2));
         assertTrue(cache1.exists(NODE1));
         assertTrue(cache2.exists(NODE2));

         age=(Integer)cache1.get(NODE1, "age");
         assertNotNull("\"age\" obtained from cache1 for " + NODE1 + " must be non-null ", age);
         assertTrue("\"age\" must be 38", age.intValue() == 38);

         age=(Integer)cache2.get(NODE1, "age");
         assertNotNull("\"age\" obtained from cache2 for " + NODE1 + " must be non-null ", age);
         assertTrue("\"age\" must be 38", age.intValue() == 38);

         age=(Integer)cache1.get(NODE2, "age");
         assertNotNull("\"age\" obtained from cache1 for " + NODE2 + " must be non-null ", age);
         assertTrue("\"age\" must be 39", age.intValue() == 39);

         age=(Integer)cache2.get(NODE2, "age");
         assertNotNull("\"age\" obtained from cache2 for " + NODE2 + " must be non-null ", age);
         assertTrue("\"age\" must be 39", age.intValue() == 39);
         */

         assertEquals(0, cache1.getNumberOfLocksHeld());
         assertEquals(0, cache2.getNumberOfLocksHeld());
         System.out.println("TransactionTable for cache1:\n" + cache1.getTransactionTable().toString(true));
         System.out.println("TransactionTable for cache2:\n" + cache2.getTransactionTable().toString(true));
   }

   public void testSyncReplWithModficationsOnBothCachesSameData() throws Exception {
      Transaction tx;
      final Fqn NODE=Fqn.fromString("/one/two/three");

      try {
         initCaches(TreeCache.REPL_SYNC);

         // sync commit + rollback?
         cache1.setSyncCommitPhase(true);
         cache1.setSyncRollbackPhase(true);
         cache2.setSyncCommitPhase(true);
         cache2.setSyncRollbackPhase(true);

         tx=beginTransaction();
         cache1.put(NODE, "age", new Integer(38));
         System.out.println("TransactionTable for cache1 after cache1.put():\n" + cache1.getTransactionTable().toString(true));

         cache2.put(NODE, "age", new Integer(39));
         System.out.println("TransactionTable for cache2 after cache2.put():\n" + cache2.getTransactionTable().toString(true));

         System.out.println("cache1 before commit:\n" + cache1.printLockInfo());
         System.out.println("cache2 before commit:\n" + cache2.printLockInfo());

         try {
            tx.commit();
            fail("commit should throw a RollbackException, we should not get here");
         }
         catch(RollbackException rollback) {
            System.out.println("Transaction was rolled back, this is correct");
         }

         System.out.println("cache1 after commit:\n" + cache1.printLockInfo());
         System.out.println("cache2 after commit:\n" + cache2.printLockInfo());

         assertEquals(0, cache1.getNumberOfLocksHeld());
         assertEquals(0, cache2.getNumberOfLocksHeld());

         assertEquals(0, cache1.getNumberOfNodes());
         assertEquals(0, cache2.getNumberOfNodes());
      }
      catch(Exception e) {
         fail(e.toString());
      }
   }


   public void testSyncReplWithModficationsOnBothCachesWithRollback() throws Exception {
         Transaction tx;
         final Fqn NODE1=Fqn.fromString("/one/two/three");
         final Fqn NODE2=Fqn.fromString("/eins/zwei/drei");

       try {
          initCaches(TreeCache.REPL_SYNC);

          cache1.setSyncRollbackPhase(true);
          cache2.setSyncRollbackPhase(true);

          tx=beginTransaction();
          cache1.put(NODE1, "age", new Integer(38));
          cache2.put(NODE2, "age", new Integer(39));

          System.out.println("cache1 (before commit):\n" + cache1.printLockInfo());
          System.out.println("cache2 (before commit):\n" + cache2.printLockInfo());

          // this will rollback the transaction
          tx.registerSynchronization(new TransactionAborter(tx));

          try {
             tx.commit();
             fail("commit should throw a RollbackException, we should not get here");
          }
          catch(RollbackException rollback) {
             System.out.println("Transaction was rolled back, this is correct");
          }

          System.out.println("cache1 (after rollback):\n" + cache1.printLockInfo());
          System.out.println("cache2 (after rollback):\n" + cache2.printLockInfo());

          assertEquals(0, cache1.getNumberOfLocksHeld());
          assertEquals(0, cache2.getNumberOfLocksHeld());

          assertEquals(0, cache1.getNumberOfNodes());
          assertEquals(0, cache2.getNumberOfNodes());
       }
       catch(Exception e) {
          fail(e.toString());
       }
    }


   /**
    * Test for JBCACHE-359 -- does a callback into cache from a listener
    * interfere with transaction rollback.
    *
    * @throws Exception
    */
   public void testSyncReplWithRollbackAndListener() throws Exception {
         Transaction tx;
         final Fqn NODE1=Fqn.fromString("/one/two/three");

       try {
          initCaches(TreeCache.REPL_SYNC);

          cache1.setSyncRollbackPhase(true);
          cache2.setSyncRollbackPhase(true);

          // Test with a rollback on the sending side

          CallbackListener cbl1 = new CallbackListener(cache1, "age");
          CallbackListener cbl2 = new CallbackListener(cache2, "age");

          tx=beginTransaction();
          cache1.put(NODE1, "age", new Integer(38));

          System.out.println("cache1 (before commit):\n" + cache1.printLockInfo());
          System.out.println("cache2 (before commit):\n" + cache2.printLockInfo());

          // this will rollback the transaction
          tx.registerSynchronization(new TransactionAborter(tx));

          try {
             tx.commit();
             fail("commit should throw a RollbackException, we should not get here");
          }
          catch(RollbackException rollback) {
             rollback.printStackTrace();
             System.out.println("Transaction was rolled back, this is correct");
          }

          // Sleep, as the rollback call to cache2 is async
          _pause(1000);

          System.out.println("cache1 (after rollback):\n" + cache1.printLockInfo());
          System.out.println("cache2 (after rollback):\n" + cache2.printLockInfo());

          assertNull(cbl1.getCallbackException());
          assertNull(cbl2.getCallbackException());

          assertEquals(0, cache1.getNumberOfLocksHeld());
          assertEquals(0, cache2.getNumberOfLocksHeld());

          assertEquals(0, cache1.getNumberOfNodes());
          assertEquals(0, cache2.getNumberOfNodes());

          // Test with a rollback on the receiving side

          cache2.removeTreeCacheListener(cbl2);
          // listener aborts any active tx
          cbl2 = new TransactionAborterCallbackListener(cache2, "age");

          tx=beginTransaction();
          cache1.put(NODE1, "age", new Integer(38));

          System.out.println("cache1 (before commit):\n" + cache1.printLockInfo());
          System.out.println("cache2 (before commit):\n" + cache2.printLockInfo());

          tx.commit();

          // Sleep, as the commit call to cache2 is async
          _pause(1000);

          System.out.println("cache1 (after rollback):\n" + cache1.printLockInfo());
          System.out.println("cache2 (after rollback):\n" + cache2.printLockInfo());

          assertNull(cbl1.getCallbackException());
          assertNull(cbl2.getCallbackException());

          assertEquals(0, cache1.getNumberOfLocksHeld());
          assertEquals(0, cache2.getNumberOfLocksHeld());

          // cache1 didn't fail, so should have 3 nodes
          assertEquals(3, cache1.getNumberOfNodes());
          assertEquals(0, cache2.getNumberOfNodes());

       }
       catch(Exception e) {
          e.printStackTrace();
          fail(e.toString());
       }
    }


   /**
    * Test for JBCACHE-361 -- does marking a tx on the remote side
    * rollback-only cause a rollback on the originating side?
    *
    * @throws Exception
    */
   public void testSyncReplWithRemoteRollback() throws Exception {
         Transaction tx;
         final Fqn NODE1=Fqn.fromString("/one/two/three");

       try {
          initCaches(TreeCache.REPL_SYNC);

          cache1.setSyncRollbackPhase(true);
          cache2.setSyncRollbackPhase(true);

          // Test with a rollback on the remote side

          // listener aborts any active tx
          TransactionAborterListener tal = new TransactionAborterListener(cache2);

          tx=beginTransaction();
          cache1.put(NODE1, "age", new Integer(38));

          System.out.println("cache1 (before commit):\n" + cache1.printLockInfo());
          System.out.println("cache2 (before commit):\n" + cache2.printLockInfo());

          try {
             tx.commit();
             fail("commit should throw a RollbackException, we should not get here");
          }
          catch(RollbackException rollback) {
             System.out.println("Transaction was rolled back, this is correct");
          }

          // Sleep, as the commit call to cache2 is async
          _pause(1000);

          System.out.println("cache1 (after rollback):\n" + cache1.printLockInfo());
          System.out.println("cache2 (after rollback):\n" + cache2.printLockInfo());

          assertNull(tal.getCallbackException());

          assertEquals(0, cache1.getNumberOfLocksHeld());
          assertEquals(0, cache2.getNumberOfLocksHeld());

          assertEquals(0, cache1.getNumberOfNodes());
          assertEquals(0, cache2.getNumberOfNodes());

       }
       catch(Exception e) {
          e.printStackTrace();
          fail(e.toString());
       }
    }


    public void testSyncReplWithRemoteRollbackWithExistingData() throws Exception {
             Transaction tx;
             final Fqn NODE1=Fqn.fromString("/one/two/three");

           try {
              initCaches(TreeCache.REPL_SYNC);

              cache1.setSyncRollbackPhase(true);
              cache2.setSyncRollbackPhase(true);

              // Test with a rollback on the remote side

              // listener aborts any active tx
              TransactionAborterListener tal = new TransactionAborterListener(cache2);

              cache1.put(NODE1, "age", new Integer(20));
              assertEquals(new Integer(20), cache1.get(NODE1, "age"));
              assertEquals(new Integer(20), cache2.get(NODE1, "age"));

              tx=beginTransaction();
              cache1.put(NODE1, "age", new Integer(38));

              System.out.println("cache1 (before commit):\n" + cache1.printLockInfo());
              System.out.println("cache2 (before commit):\n" + cache2.printLockInfo());

              try {
                 tx.commit();
                 fail("commit should throw a RollbackException, we should not get here");
              }
              catch(RollbackException rollback) {
                 System.out.println("Transaction was rolled back, this is correct");
              }

              // Sleep, as the commit call to cache2 is async
              _pause(1000);

              System.out.println("cache1 (after rollback):\n" + cache1.printLockInfo());
              System.out.println("cache2 (after rollback):\n" + cache2.printLockInfo());

              assertNull(tal.getCallbackException());

              assertEquals(0, cache1.getNumberOfLocksHeld());
              assertEquals(0, cache2.getNumberOfLocksHeld());

              assertEquals(3, cache1.getNumberOfNodes());
              assertEquals(3, cache2.getNumberOfNodes());
              assertEquals(new Integer(20), cache1.get(NODE1, "age"));
              assertEquals(new Integer(20), cache2.get(NODE1, "age"));

           }
           catch(Exception e) {
              e.printStackTrace();
              fail(e.toString());
           }
        }
   


   public void testASyncRepl() throws Exception {
      Integer age;
      Transaction tx;

      initCaches(TreeCache.REPL_ASYNC);

      try {
         tx=beginTransaction();
         cache1.put("/a/b/c", "age", new Integer(38));
         Thread.sleep(1000);
         assertNull("age on cache2 must be null as the TX has not yet been committed", cache2.get("/a/b/c", "age"));
         tx.commit();
         Thread.sleep(1000);

         // value on cache2 must be 38
         age=(Integer)cache2.get("/a/b/c", "age");
         assertNotNull("\"age\" obtained from cache2 is null ", age);
         assertTrue("\"age\" must be 38", age.intValue() == 38);
      }
      catch(Exception e) {
         fail(e.toString());
      }
   }

   /**
    * Tests concurrent modifications: thread1 succeeds and thread2 is blocked until thread1 is done, and then succeeds
    * too. However, this is flawed with the introduction of interceptors, here's why.<br/>
    * <ul>
    * <li>Thread1 acquires the lock for /bela/ban on cache1
    * <li>Thread2 blocks on Thread1 to release the lock
    * <li>Thread1 commits: this means the TransactionInterceptor and the ReplicationInterceptor are called in
    * the sequence in which they registered. Unfortunately, the TransactionInterceptor registered first. In the
    * PREPARE phase, the ReplicationInterceptor calls prepare() in cache2 synchronously. The TxInterceptor
    * does nothing. The the COMMIT phase, the TxInterceptor commits the data by releasing the locks locally and
    * then the ReplicationInterceptor sends an asynchronous COMMIT to cache2.
    * <li>Because the TxInterceptor for Thread1 releases the locks locally <em>before</em> sending the async COMMIT,
    * Thread2 is able to acquire the lock for /bela/ban in cache1 and then starts the PREPARE phase by sending a
    * synchronous PREPARE to cache2. If this PREPARE arrives at cache2 <em>before</em> the COMMIT from Thread1,
    * the PREPARE will block because it attempts to acquire a lock on /bela/ban on cache2 still held by Thread1
    * (which would be released by Thread1's COMMIT). This results in deadlock, which is resolved by Thread2 running
    * into a timeout with subsequent rollback and Thread1 succeeding.<br/>
    * </ul>
    * There are 3 solutions to this:
    * <ol>
    * <li>Do nothing. This is standard behavior for concurrent access to the same data. Same thing if the 2 threads
    * operated on the same data in <em>separate</em> caches, e.g. Thread1 on /bela/ban in cache1 and Thread2 on
    * /bela/ban in cache2. The semantics of Tx commit as handled by the interceptors is: after tx1.commit() returns
    * the locks held by tx1 are release and a COMMIT message is on the way (if sent asynchronously).
    * <li>Force an order over TxInterceptor and ReplicationInterceptor. This would require ReplicationInterceptor
    * to always be fired first on TX commit. Downside: the interceptors have an implicit dependency, which is not
    * nice.
    * <li>Priority-order requests at the receiver; e.g. a COMMIT could release a blocked PREPARE. This is bad because
    * it violates JGroups' FIFO ordering guarantees.
    * </ol>
    * I'm currently investigating solution #2, ie. creating an OrderedSynchronizationHandler, which allows other
    * SynchronizationHandlers to register (atHead, atTail), and the OrderedSynchronizationHandler would call the
    * SynchronizationHandler in the order in which they are defined.
    * @throws Exception
    */
   public void testConcurrentPuts() throws Exception {
      initCaches(TreeCache.REPL_SYNC);
      cache1.setSyncCommitPhase(true);

      Thread t1=new Thread("Thread1") {
         Transaction tx;

         public void run() {
            try {
               tx=beginTransaction();
               cache1.put("/bela/ban", "name", "Bela Ban");
               _pause(2000); // Thread2 will be blocked until we commit
               tx.commit();
               System.out.println("[Thread1] ** LOCK INFO cache1: " + cache1.printLockInfo());
               System.out.println("[Thread1] ** LOCK INFO cache2: " + cache2.printLockInfo());
            }
            catch(Throwable ex) {
               ex.printStackTrace();
               t1_ex=ex;
            }
         }
      };

      Thread t2=new Thread("Thread2") {
         Transaction tx;

         public void run() {
            try {
               _pause(1000); // give Thread1 time to acquire the lock
               tx=beginTransaction();
               System.out.println("[Thread2] ** LOCK INFO cache1: " + cache1.printLockInfo());
               System.out.println("[Thread2] ** LOCK INFO cache2: " + cache2.printLockInfo());
               cache1.put("/bela/ban", "name", "Michelle Ban");
               System.out.println("[Thread2] ** LOCK INFO cache1: " + cache1.printLockInfo());
               System.out.println("[Thread2] ** LOCK INFO cache2: " + cache2.printLockInfo());
               tx.commit();
               System.out.println("[Thread2] ** LOCK INFO cache1: " + cache1.printLockInfo());
               System.out.println("[Thread2] ** LOCK INFO cache2: " + cache2.printLockInfo());
            }
            catch(Throwable ex) {
               ex.printStackTrace();
               t2_ex=ex;
            }
         }
      };

      // Let the game start
      t1.start();
      t2.start();

      // Wait for threads to die
      t1.join();
      t2.join();

      if(t1_ex != null)
         fail("Thread1 failed: " + t1_ex);
      if(t2_ex != null)
         fail("Thread2 failed: " + t2_ex);

      assertEquals("Michelle Ban", cache1.get("/bela/ban", "name"));
   }


   /**
    * Should reproduce JBCACHE-32 problem (http://jira.jboss.com/jira/browse/JBCACHE-32)
    */
   public void testConcurrentCommitsWith1Thread() throws Exception {
      _testConcurrentCommits(1);
   }

    /**
    * Should reproduce JBCACHE-32 problem (http://jira.jboss.com/jira/browse/JBCACHE-32)
    */
   public void testConcurrentCommitsWith5Threads() throws Exception {
      _testConcurrentCommits(5);
   }

   /**
    * Should reproduce JBCACHE-32 problem (http://jira.jboss.com/jira/browse/JBCACHE-32)
    */
   private void _testConcurrentCommits(int num_threads) throws Exception {
      Object myMutex=new Object();

      final TreeCache c1=new TreeCache();
      final TreeCache c2=new TreeCache();
      c1.setClusterName("TempCluster");
      c2.setClusterName("TempCluster");
      c1.setCacheMode(TreeCache.REPL_SYNC);
      c2.setCacheMode(TreeCache.REPL_SYNC);
      c1.setSyncCommitPhase(true);
      c2.setSyncCommitPhase(true);
      c1.setSyncRollbackPhase(true);
      c2.setSyncRollbackPhase(true);
      c1.setIsolationLevel(IsolationLevel.REPEATABLE_READ);
      c2.setIsolationLevel(IsolationLevel.REPEATABLE_READ);
      c1.setTransactionManagerLookupClass("org.jboss.cache.DummyTransactionManagerLookup");
      c2.setTransactionManagerLookupClass("org.jboss.cache.DummyTransactionManagerLookup");
      c1.setLockAcquisitionTimeout(5000);
      c2.setLockAcquisitionTimeout(5000);
      c1.start();
      c2.start();
      final List exceptions = new ArrayList();

      class MyThread extends Thread {
         Object mutex;

         public MyThread(String name, Object mutex) {
            super(name);
            this.mutex=mutex;
         }

         public void run() {
            Transaction tx = null;

            try {
               tx=beginTransaction();
               c1.put("/thread/" + getName(), null);
               System.out.println("Thread " + getName() + " after put(): " + c1.toString());
               System.out.println("Thread " + getName() + " waiting on mutex");
               synchronized(mutex) {
                  mutex.wait();
               }
               System.out.println("Thread " + getName() + " committing");
               tx.commit();
               System.out.println("Thread " + getName() + " committed successfully");
            }
            catch(Exception e) {
               exceptions.add(e);
            }
            finally {
               try
               {
                  if (tx != null) tx.rollback();
               }
               catch (Exception e)
               {

               }
            }
         }
      }

      MyThread[] threads=new MyThread[num_threads];
      for(int i=0; i < threads.length; i++) {
         threads[i]=new MyThread("#" + i, myMutex);
      }
      for(int i=0; i < threads.length; i++) {
         MyThread thread=threads[i];
         System.out.println("starting thread #" + i);
         thread.start();
      }

      _pause(6000);
      synchronized(myMutex) {
         System.out.println("cache is " + c1.printLockInfo());
         System.out.println("******************* SIGNALLING THREADS ********************");
         myMutex.notifyAll();
      }

      for(int i=0; i < threads.length; i++) {
         MyThread thread=threads[i];
         try {
            thread.join();
            System.out.println("Joined thread " + thread.getName());
         }
         catch(InterruptedException e) {
            e.printStackTrace();
         }
      }

      System.out.println("FINAL c1:\n" + c1.printDetails() + "\nlocks:\n" + c1.printLockInfo());

      assertEquals(0, c1.getNumberOfLocksHeld());
      assertEquals(0, c2.getNumberOfLocksHeld());

      c1.stop();
      c2.stop();

//      if(ex != null)
//      {
//         ex.printStackTrace();
//         fail("Thread failed: " + ex);
//      }

      // we can only expect 1 thread to succeed.  The others will fail.  So, threads.length -1 exceptions.
      // this is a timing issue - 2 threads still may succeed on a multi cpu system
      // assertEquals(threads.length - 1, exceptions.size());
     
      for (int i=0; i<exceptions.size(); i++) assertEquals(TimeoutException.class, exceptions.get(i).getClass());
   }


   /**
    * Conncurrent put on 2 different instances.
    */
   public void testConcurrentPutsOnTwoInstances() throws Exception {
      initCaches(TreeCache.REPL_SYNC);
      final TreeCache c1=this.cache1;
      final TreeCache c2=this.cache2;

      Thread t1=new Thread() {
         Transaction tx;

         public void run() {
            try {
               tx=beginTransaction();
               c1.put("/ben/wang", "name", "Ben Wang");
               _pause(8000);
               tx.commit(); // This should go thru
            }
            catch(Throwable ex) {
               ex.printStackTrace();
               t1_ex=ex;
            }
         }
      };

      Thread t2=new Thread() {
         Transaction tx;

         public void run() {
            try {
               _pause(1000); // give Thread1 time to acquire the lock
               tx=beginTransaction();
               c2.put("/ben/wang", "name", "Ben Jr.");
               tx.commit(); // This will time out and rollback first because Thread1 has a tx going as well.
            }
            catch(RollbackException rollback_ex) {
               System.out.println("received rollback exception as expected");
            }
            catch(Throwable ex) {
               ex.printStackTrace();
               t2_ex=ex;
            }
         }
      };

      // Let the game start
      t1.start();
      t2.start();

      // Wait for thread to die but put an insurance of 5 seconds on it.
      t1.join();
      t2.join();

      if(t1_ex != null)
         fail("Thread1 failed: " + t1_ex);
      if(t2_ex != null)
         fail("Thread2 failed: " + t2_ex);
      assertEquals("Ben Wang", c1.get("/ben/wang", "name"));
   }


   public void testPut() throws Exception {
      initCaches(TreeCache.REPL_SYNC);
      final TreeCache c1=this.cache1;


      Thread t1=new Thread() {
         public void run() {
            try {
               lock.acquire();
               System.out.println("-- t1 has lock");
               c1.put("/a/b/c", "age", new Integer(38));
               System.out.println("[Thread1] set value to 38");

               System.out.println("-- t1 releases lock");
               lock.release();
               _pause(300);
               Thread.yield();

               lock.acquire();
               System.out.println("-- t1 has lock");
               c1.put("/a/b/c", "age", new Integer(39));
               System.out.println("[Thread1] set value to 39");

               System.out.println("-- t1 releases lock");
               lock.release();
               assertEquals(new Integer(39), c1.get("/a/b/c", "age"));
            }
            catch(Throwable ex) {
               ex.printStackTrace();
               t1_ex=ex;
            }
            finally {
               lock.release();
            }
         }
      };

      Thread t2=new Thread() {
         public void run() {
            try {
               _pause(100);
               Thread.yield();
               lock.acquire();
               System.out.println("-- t2 has lock");
               // Should replicate the value right away.
               Integer val=(Integer)cache2.get("/a/b/c", "age");
               System.out.println("[Thread2] value is " + val);
               assertEquals(new Integer(38), val);
               System.out.println("-- t2 releases lock");
               lock.release();
               _pause(300);
               Thread.yield();
               _pause(500);
               lock.acquire();
               System.out.println("-- t2 has lock");
               val=(Integer)cache2.get("/a/b/c", "age");
               System.out.println("-- t2 releases lock");
               lock.release();
               assertEquals(new Integer(39), val);
            }
            catch(Throwable ex) {
               ex.printStackTrace();
               t2_ex=ex;
            }
            finally {
               lock.release();
            }
         }
      };

      // Let the game start
      t1.start();
      t2.start();

      // Wait for thread to die but put an insurance of 5 seconds on it.
      t1.join();
      t2.join();
      if(t1_ex != null)
         fail("Thread1 failed: " + t1_ex);
      if(t2_ex != null)
         fail("Thread2 failed: " + t2_ex);
   }

   /**
    * Test replicated cache with transaction. Idea is to have two threads running
    * a local cache each that is replicating. Depending on whether cache1 commit/rollback or not,
    * the cache2.get will get different values.
    * Note that we have used sleep to interpose thread execution sequence.
    * Although it's not fool proof, it is rather simple and intuitive.
    *
    * @throws Exception
    */
   public void testPutTx() throws Exception {
      Transaction tx=null;

      try {
         initCaches(TreeCache.REPL_SYNC);
         cache1.setSyncCommitPhase(true);
         cache2.setSyncCommitPhase(true);
         tx=beginTransaction();
         cache1.put("/a/b/c", "age", new Integer(38));
         cache1.put("/a/b/c", "age", new Integer(39));
         Object val=cache2.get("/a/b/c", "age"); // must be null as not yet committed
         assertNull(val);
         tx.commit();

         tx=beginTransaction();
         assertEquals(new Integer(39), cache2.get("/a/b/c", "age")); // must not be null
         tx.commit();
      }
      catch(Throwable t) {
         t.printStackTrace();
         t1_ex=t;
      }
      finally {
         lock.release();
      }
   }


   /**
    * Have both cache1 and cache2 do add and commit. cache1 commit should time out
    * since it can't obtain the lock when trying to replicate cache2. On the other hand,
    * cache2 commit will succeed since now that cache1 is rollbacked and lock is
    * released.
    */
   public void testPutTx1() throws Exception {
      initCaches(TreeCache.REPL_SYNC);
      final TreeCache c1=this.cache1;
      Thread t1=new Thread() {
         public void run() {
            Transaction tx=null;

            try {
               lock.acquire();
               tx=beginTransaction();
               c1.put("/a/b/c", "age", new Integer(38));
               c1.put("/a/b/c", "age", new Integer(39));
               lock.release();

               _pause(300);
               lock.acquire();
               try {
                  tx.commit();
               }
               catch(RollbackException ex) {
                  System.out.println("[Thread1] received RollbackException, as expected. Rolling back changes");
                  return;
               }
               finally {
                  lock.release();
               }
            }
            catch(Throwable ex) {
               ex.printStackTrace();
               t1_ex=ex;
            }
            finally {
               lock.release();
            }
         }
      };

      Thread t2=new Thread() {
         public void run() {
            Transaction tx=null;

            try {
               sleep(200);
               Thread.yield();
               lock.acquire();
               tx=beginTransaction();
               assertNull(cache2.get("/a/b/c", "age")); // must be null as not yet committed
               cache2.put("/a/b/c", "age", new Integer(40));
               lock.release();

               _pause(300);
               lock.acquire();
               assertEquals(new Integer(40), cache2.get("/a/b/c", "age")); // must not be null
               tx.commit();
               lock.release();

               _pause(1000);
               tx=beginTransaction();
               assertEquals("After cache2 commit", new Integer(40), cache2.get("/a/b/c", "age"));
               tx.commit();
            }
            catch(Throwable ex) {
               ex.printStackTrace();
               t2_ex=ex;
            }
            finally {
               lock.release();
            }
         }
      };

      // Let the game start
      t1.start();
      t2.start();

      t1.join();
      t2.join();

      if(t1_ex != null)
         fail("Thread1 failed: " + t1_ex);
      if(t2_ex != null)
         fail("Thread2 failed: " + t2_ex);
   }



   public void testPutTxWithRollback() throws Exception {
      initCaches(TreeCache.REPL_SYNC);
      final TreeCache c2=this.cache1;
      Thread t1=new Thread() {
         public void run() {
            Transaction tx=null;

            try {
               lock.acquire();
               tx=beginTransaction();
               c2.put("/a/b/c", "age", new Integer(38));
               c2.put("/a/b/c", "age", new Integer(39));
               lock.release();

               _pause(100);
               lock.acquire();
               tx.rollback();
               lock.release();
            }
            catch(Throwable ex) {
               ex.printStackTrace();
               t1_ex=ex;
            }
            finally {
               lock.release();
            }
         }
      };

      Thread t2=new Thread() {
         public void run() {
            Transaction tx=null;

            try {
               sleep(200);
               Thread.yield();
               lock.acquire();
               tx=beginTransaction();
               assertNull(cache2.get("/a/b/c", "age")); // must be null as not yet committed
               lock.release();

               _pause(100);
               lock.acquire();
               assertNull(cache2.get("/a/b/c", "age")); // must be null as rolledback
               tx.commit();
               lock.release();
            }
            catch(Throwable ex) {
               ex.printStackTrace();
               t2_ex=ex;
            }
            finally {
               lock.release();
            }
         }
      };

      // Let the game start
      t1.start();
      t2.start();

      // Wait for thread to die but put an insurance of 5 seconds on it.
      t1.join();
      t2.join();
      if(t1_ex != null)
         fail("Thread1 failed: " + t1_ex);
      if(t2_ex != null)
         fail("Thread2 failed: " + t2_ex);
   }


   static class TransactionAborter implements Synchronization {
      Transaction ltx=null;

      public TransactionAborter(Transaction ltx) {
         this.ltx=ltx;
      }

      public void beforeCompletion() {
         try {
            ltx.setRollbackOnly();
         }
         catch(SystemException e) {
         }
      }

      public void afterCompletion(int status) {
      }
   }

   static class CallbackListener extends AbstractTreeCacheListener {

      TreeCache callbackCache;
      Object    callbackKey;
      Exception ex;
      Object    mutex = new Object();

      CallbackListener(TreeCache cache, Object callbackKey) {
         this.callbackCache = cache;
         this.callbackKey   = callbackKey;
         cache.addTreeCacheListener(this);
      }

      public void nodeModified(Fqn fqn)
      {
         // Lock on a mutex so test can't check for an exception
         // until the get call completes
         synchronized(mutex) {
            try
            {
               callbackCache.get(fqn, callbackKey);
            }
            catch (CacheException e)
            {
               e.printStackTrace();
               ex = e;
            }
         }
      }

      Exception getCallbackException() {
         synchronized(mutex) {
            return ex;
         }
      }

   }

   static class TransactionAborterCallbackListener extends CallbackListener {

      TransactionManager callbackTM;

      TransactionAborterCallbackListener(TreeCache cache, Object callbackKey) {
         super(cache, callbackKey);
         callbackTM = callbackCache.getTransactionManager();
      }

      public void nodeModified(Fqn fqn)
      {
         try
         {
            Transaction tx = callbackTM.getTransaction();
            if (tx != null && tx.getStatus() == Status.STATUS_ACTIVE) {
               // this will rollback the transaction
               tx.registerSynchronization(new TransactionAborter(tx));
            }
            else {
               super.nodeModified(fqn);
            }

         }
         catch (Exception e)
         {
            e.printStackTrace();
            if (ex == null)
               ex = e;
         }
      }

   }

   static class TransactionAborterListener extends AbstractTreeCacheListener {

      TransactionManager callbackTM;
      Object mutex = new Object();
      Exception ex;

      TransactionAborterListener(TreeCache cache) {
         callbackTM = cache.getTransactionManager();
         cache.addTreeCacheListener(this);
      }

      public void nodeModified(Fqn fqn)
      {
         synchronized(mutex) {
            try
            {
               Transaction tx = callbackTM.getTransaction();
               if (tx != null && tx.getStatus() == Status.STATUS_ACTIVE) {
                  // this will rollback the transaction
                  tx.setRollbackOnly();
               }
            }
            catch (Exception e)
            {
               e.printStackTrace();
               if (ex == null)
                  ex = e;
            }
         }
      }

      Exception getCallbackException() {
         synchronized(mutex) {
            return ex;
         }
      }

   }


   static void _pause(long millis) {
      try {
         Thread.sleep(millis);
      }
      catch(Exception t) {
      }
   }

   public static Test suite() throws Exception {
//        return getDeploySetup(SyncTxUnitTestCase.class, "cachetest.jar");
      return new TestSuite(SyncReplTxTest.class);
   }


}
TOP

Related Classes of org.jboss.cache.replicated.SyncReplTxTest

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.