Package org.infinispan.distribution

Source Code of org.infinispan.distribution.DistSyncL1FuncTest

package org.infinispan.distribution;

import org.infinispan.Cache;
import org.infinispan.commands.read.GetKeyValueCommand;
import org.infinispan.commands.write.PutKeyValueCommand;
import org.infinispan.commands.write.ReplaceCommand;
import org.infinispan.interceptors.base.CommandInterceptor;
import org.infinispan.interceptors.distribution.L1NonTxInterceptor;
import org.infinispan.interceptors.distribution.NonTxDistributionInterceptor;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.test.TestingUtil;
import org.infinispan.tx.dld.ControlledRpcManager;
import org.testng.annotations.Test;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;

@Test(groups = "functional", testName = "distribution.DistSyncL1FuncTest")
public class DistSyncL1FuncTest extends BaseDistSyncL1Test {

   public DistSyncL1FuncTest() {
      sync = true;
      tx = false;
      testRetVals = true;
   }

   @Override
   protected Class<? extends CommandInterceptor> getDistributionInterceptorClass() {
      return NonTxDistributionInterceptor.class;
   }

   @Override
   protected Class<? extends CommandInterceptor> getL1InterceptorClass() {
      return L1NonTxInterceptor.class;
   }

   protected void assertL1PutWithConcurrentUpdate(final Cache<Object, String> nonOwnerCache, Cache<Object, String> ownerCache,
                                                  final boolean replace, final Object key, final String originalValue,
                                                  final String nonOwnerValue, String updateValue) throws InterruptedException, ExecutionException, TimeoutException, BrokenBarrierException {
      CyclicBarrier barrier = new CyclicBarrier(2);
      addBlockingInterceptorBeforeTx(nonOwnerCache, barrier, replace ? ReplaceCommand.class : PutKeyValueCommand.class);

      try {
         Future<String> future = fork(new Callable<String>() {
            @Override
            public String call() throws Exception {
               if (replace) {
                  // This should always be true
                  if (nonOwnerCache.replace(key, originalValue, nonOwnerValue)) {
                     return originalValue;
                  }
                  return nonOwnerCache.get(key);
               }
               else {
                  return nonOwnerCache.put(key, nonOwnerValue);
               }
            }
         });

         // Now wait for the put/replace to return and block it for now
         barrier.await(5, TimeUnit.SECONDS);

         // Owner should have the new value
         assertEquals(nonOwnerValue, ownerCache.put(key, updateValue));

         // Now let owner key->updateValue go through
         barrier.await(5, TimeUnit.SECONDS);

         // This should be originalValue still as we did the get
         assertEquals(originalValue, future.get(5, TimeUnit.SECONDS));

         // Remove the interceptor now since we don't want to block ourselves - if using phaser this isn't required
         removeAllBlockingInterceptorsFromCache(nonOwnerCache);

         assertL1StateOnLocalWrite(nonOwnerCache, ownerCache, key, updateValue);
         // The nonOwnerCache should retrieve new value as it isn't in L1
         assertEquals(updateValue, nonOwnerCache.get(key));
         assertIsInL1(nonOwnerCache, key);
      }
      finally {
         removeAllBlockingInterceptorsFromCache(nonOwnerCache);
      }
   }

   public void testNoEntryInL1PutWithConcurrentInvalidation() throws InterruptedException, ExecutionException, TimeoutException, BrokenBarrierException {
      final Cache<Object, String> nonOwnerCache = getFirstNonOwner(key);
      final Cache<Object, String> ownerCache = getFirstOwner(key);

      // Put the first value in the owner, so the L1 is empty
      ownerCache.put(key, firstValue);

      assertL1PutWithConcurrentUpdate(nonOwnerCache, ownerCache, false, key, firstValue, "intermediate-put", secondValue);
   }

   public void testEntryInL1PutWithConcurrentInvalidation() throws InterruptedException, ExecutionException, TimeoutException, BrokenBarrierException {
      final Cache<Object, String> nonOwnerCache = getFirstNonOwner(key);
      final Cache<Object, String> ownerCache = getFirstOwner(key);

      // Put the first value in a non owner, so the L1 has the key
      ownerCache.put(key, firstValue);
      nonOwnerCache.get(key);

      assertIsInL1(nonOwnerCache, key);

      assertL1PutWithConcurrentUpdate(nonOwnerCache, ownerCache, false, key, firstValue, "intermediate-put", secondValue);
   }

   public void testNoEntryInL1PutWithConcurrentPut() throws InterruptedException, ExecutionException, TimeoutException, BrokenBarrierException {
      final Cache<Object, String> nonOwnerCache = getFirstNonOwner(key);
      final Cache<Object, String> ownerCache = getFirstOwner(key);

      // Put the first value in the owner, so the L1 is empty
      ownerCache.put(key, firstValue);

      assertL1PutWithConcurrentUpdate(nonOwnerCache, nonOwnerCache, false, key, firstValue, "intermediate-put", secondValue);
   }

   public void testEntryInL1PutWithConcurrentPut() throws InterruptedException, ExecutionException, TimeoutException, BrokenBarrierException {
      final Cache<Object, String> nonOwnerCache = getFirstNonOwner(key);
      final Cache<Object, String> ownerCache = getFirstOwner(key);

      // Put the first value in a non owner, so the L1 has the key
      ownerCache.put(key, firstValue);
      nonOwnerCache.get(key);

      assertIsInL1(nonOwnerCache, key);

      assertL1PutWithConcurrentUpdate(nonOwnerCache, nonOwnerCache, false, key, firstValue, "intermediate-put", secondValue);
   }

   public void testNoEntryInL1ReplaceWithConcurrentInvalidation() throws InterruptedException, ExecutionException, TimeoutException, BrokenBarrierException {
      final Cache<Object, String> nonOwnerCache = getFirstNonOwner(key);
      final Cache<Object, String> ownerCache = getFirstOwner(key);

      // Put the first value in the owner, so the L1 is empty
      ownerCache.put(key, firstValue);

      assertL1PutWithConcurrentUpdate(nonOwnerCache, ownerCache, true, key, firstValue, "intermediate-put", secondValue);
   }

   public void testEntryInL1ReplaceWithConcurrentInvalidation() throws InterruptedException, ExecutionException, TimeoutException, BrokenBarrierException {
      final Cache<Object, String> nonOwnerCache = getFirstNonOwner(key);
      final Cache<Object, String> ownerCache = getFirstOwner(key);

      // Put the first value in a non owner, so the L1 has the key
      ownerCache.put(key, firstValue);
      nonOwnerCache.get(key);

      assertIsInL1(nonOwnerCache, key);

      assertL1PutWithConcurrentUpdate(nonOwnerCache, ownerCache, true, key, firstValue, "intermediate-put", secondValue);
   }

   public void testNoEntryInL1ReplaceWithConcurrentPut() throws InterruptedException, ExecutionException, TimeoutException, BrokenBarrierException {
      final Cache<Object, String> nonOwnerCache = getFirstNonOwner(key);
      final Cache<Object, String> ownerCache = getFirstOwner(key);

      // Put the first value in the owner, so the L1 is empty
      ownerCache.put(key, firstValue);

      assertL1PutWithConcurrentUpdate(nonOwnerCache, nonOwnerCache, true, key, firstValue, "intermediate-put", secondValue);
   }

   public void testEntryInL1ReplaceWithConcurrentPut() throws InterruptedException, ExecutionException, TimeoutException, BrokenBarrierException {
      final Cache<Object, String> nonOwnerCache = getFirstNonOwner(key);
      final Cache<Object, String> ownerCache = getFirstOwner(key);

      // Put the first value in a non owner, so the L1 has the key
      ownerCache.put(key, firstValue);
      nonOwnerCache.get(key);

      assertIsInL1(nonOwnerCache, key);

      assertL1PutWithConcurrentUpdate(nonOwnerCache, nonOwnerCache, true, key, firstValue, "intermediate-put", secondValue);
   }

   public void testNoEntryInL1GetWithConcurrentReplace() throws InterruptedException, ExecutionException, TimeoutException, BrokenBarrierException {
      final Cache<Object, String> nonOwnerCache = getFirstNonOwner(key);
      final Cache<Object, String> ownerCache = getFirstOwner(key);

      // Put the first value in a non owner, so the L1 has the key
      ownerCache.put(key, firstValue);
      nonOwnerCache.get(key);

      assertIsInL1(nonOwnerCache, key);

      assertL1PutWithConcurrentUpdate(nonOwnerCache, nonOwnerCache, true, key, firstValue, "intermediate-put", secondValue);
   }

   public void testNoEntryInL1PutReplacedNullValueConcurrently() throws InterruptedException, ExecutionException, TimeoutException {
      final Cache<Object, String> nonOwnerCache = getFirstNonOwner(key);
      final Cache<Object, String> ownerCache = getFirstOwner(key);

      RpcManager rm = TestingUtil.extractComponent(nonOwnerCache, RpcManager.class);
      ControlledRpcManager crm = new ControlledRpcManager(rm);
      // Make our node block and not return the get yet
      crm.blockAfter(PutKeyValueCommand.class);
      TestingUtil.replaceComponent(nonOwnerCache, RpcManager.class, crm, true);

      try {
         Future<String> future = nonOwnerCache.putIfAbsentAsync(key, firstValue);

         // Now wait for the get to return and block it for now
         crm.waitForCommandToBlock(5, TimeUnit.SECONDS);

         // Owner should have the new value
         assertEquals(firstValue, ownerCache.remove(key));

         // Now let owner key->updateValue go through
         crm.stopBlocking();

         // This should be originalValue still as we did the get
         assertNull(future.get(5, TimeUnit.SECONDS));

         // Remove the interceptor now since we don't want to block ourselves - if using phaser this isn't required
         removeAllBlockingInterceptorsFromCache(nonOwnerCache);

         assertIsNotInL1(nonOwnerCache, key);
         // The nonOwnerCache should retrieve new value as it isn't in L1
         assertNull(nonOwnerCache.get(key));
         assertIsNotInL1(nonOwnerCache, key);
      } finally {
         TestingUtil.replaceComponent(nonOwnerCache, RpcManager.class, rm, true);
      }
   }

   @Test(groups = "unstable", description = "ISPN-4133")
   public void testNonOwnerRetrievesValueFromBackupOwnerWhileWrite() throws Exception {
      final Cache<Object, String>[] owners = getOwners(key, 2);

      final Cache<Object, String> ownerCache = owners[0];
      final Cache<Object, String> backupOwnerCache = owners[1];
      final Cache<Object, String> nonOwnerCache = getFirstNonOwner(key);

      ownerCache.put(key, firstValue);

      assertEquals(firstValue, nonOwnerCache.get(key));

      assertIsInL1(nonOwnerCache, key);

      // Add a barrier to block the owner from receiving the get command from the non owner
      CyclicBarrier ownerGetBarrier = new CyclicBarrier(2);
      addBlockingInterceptor(ownerCache, ownerGetBarrier, GetKeyValueCommand.class, L1NonTxInterceptor.class, false);

      // Add a barrier to block the backup owner from committing the write to memory
      CyclicBarrier backupOwnerWriteBarrier = new CyclicBarrier(2);
      addBlockingInterceptor(backupOwnerCache, backupOwnerWriteBarrier, PutKeyValueCommand.class, L1NonTxInterceptor.class, true);

      try {
         Future<String> future = fork(new Callable<String>() {
            @Override
            public String call() throws Exception {

               return ownerCache.put(key, secondValue);
            }
         });

         // Wait until the put is trying to replicate
         backupOwnerWriteBarrier.await(5, TimeUnit.SECONDS);

         // Wait until the L1 is cleared out from the owners L1 invalidation
         eventually(new Condition() {

            @Override
            public boolean isSatisfied() throws Exception {
               return !isInL1(nonOwnerCache, key);
            }
         }, 5000, 250);

         // This should come back from the backup owner, since the primary owner is blocked
         assertEquals(firstValue, nonOwnerCache.get(key));

         assertIsInL1(nonOwnerCache, key);

         // Now let the backup owner put complete and send response
         backupOwnerWriteBarrier.await(5, TimeUnit.SECONDS);

         // Wait for the put to complete
         future.get(5, TimeUnit.SECONDS);

         // The Last chance interceptor is async so wait to make sure it was invalidated
         eventually(new Condition() {
            @Override
            public boolean isSatisfied() throws Exception {
               return !isInL1(nonOwnerCache, key);
            }
         }, 5000, 250);
         // The L1 value shouldn't be present
         assertIsNotInL1(nonOwnerCache, key);

         // Now finally let the get from the non owner to the primary owner go, which at this point will finally
         // register the requestor
         ownerGetBarrier.await(5, TimeUnit.SECONDS);
         ownerGetBarrier.await(5, TimeUnit.SECONDS);

         // The L1 value shouldn't be present
         assertIsNotInL1(nonOwnerCache, key);
      } finally {
         removeAllBlockingInterceptorsFromCache(ownerCache);
         removeAllBlockingInterceptorsFromCache(backupOwnerCache);
      }
   }

   /**
    * See ISPN-3617
    */
   public void testNonOwnerRemovesValueFromL1ProperlyOnWrite() throws InterruptedException, TimeoutException,
                                                                      BrokenBarrierException, ExecutionException {

      final Cache<Object, String>[] owners = getOwners(key, 2);

      final Cache<Object, String> ownerCache = owners[0];
      final Cache<Object, String> backupOwnerCache = owners[1];
      final Cache<Object, String> nonOwnerCache = getFirstNonOwner(key);

      ownerCache.put(key, firstValue);

      assertEquals(firstValue, nonOwnerCache.get(key));

      assertIsInL1(nonOwnerCache, key);

      // Add a barrier to block the owner from actually updating it's own local value
      CyclicBarrier ownerPutBarrier = new CyclicBarrier(2);
      addBlockingInterceptor(ownerCache, ownerPutBarrier, PutKeyValueCommand.class, L1NonTxInterceptor.class, true);

      // Add a barrier to block the get from being retrieved on the backup owner
      CyclicBarrier backupGetBarrier = new CyclicBarrier(2);
      addBlockingInterceptor(backupOwnerCache, backupGetBarrier, GetKeyValueCommand.class, L1NonTxInterceptor.class,
                             false);

      try {
         Future<String> future = fork(new Callable<String>() {

            @Override
            public String call() throws Exception {
               return nonOwnerCache.put(key, secondValue);
            }
         });

         // Wait until owner has already replicated to backup owner, but hasn't updated local value
         ownerPutBarrier.await(10, TimeUnit.SECONDS);

         assertEquals(firstValue, ownerCache.getAdvancedCache().getDataContainer().get(key).getValue());
         assertEquals(secondValue, backupOwnerCache.getAdvancedCache().getDataContainer().get(key).getValue());

         assertEquals(firstValue, nonOwnerCache.get(key));

         assertIsInL1(nonOwnerCache, key);

         // Just let the backup get return now
         backupGetBarrier.await(10, TimeUnit.SECONDS);
         backupGetBarrier.await(10, TimeUnit.SECONDS);

         // Finally let the put complete
         ownerPutBarrier.await(10, TimeUnit.SECONDS);

         assertEquals(firstValue, future.get(10, TimeUnit.SECONDS));

         assertIsNotInL1(nonOwnerCache, key);

         assertEquals(secondValue, ownerCache.getAdvancedCache().getDataContainer().get(key).getValue());
      } finally {
         removeAllBlockingInterceptorsFromCache(ownerCache);
         removeAllBlockingInterceptorsFromCache(backupOwnerCache);
      }
   }
}
TOP

Related Classes of org.infinispan.distribution.DistSyncL1FuncTest

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.