Package org.infinispan.statetransfer

Source Code of org.infinispan.statetransfer.WriteSkewDuringStateTransferTest$NewNode

package org.infinispan.statetransfer;

import org.infinispan.Cache;
import org.infinispan.commands.VisitableCommand;
import org.infinispan.commands.tx.PrepareCommand;
import org.infinispan.commands.tx.VersionedPrepareCommand;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.configuration.cache.VersioningScheme;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.context.InvocationContext;
import org.infinispan.interceptors.base.BaseCustomInterceptor;
import org.infinispan.manager.CacheContainer;
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.tx.dld.ControlledRpcManager;
import org.infinispan.util.BlockingLocalTopologyManager;
import org.infinispan.util.SingleSegmentConsistentHashFactory;
import org.infinispan.util.concurrent.IsolationLevel;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

import static org.infinispan.distribution.DistributionTestHelper.hasOwners;
import static org.testng.AssertJUnit.*;

/**
* Tests if the entry version is lost during the state transfer in which the primary owner changes.
*
* @author Pedro Ruivo
* @since 6.0
*/
@Test(groups = "functional", testName = "statetransfer.WriteSkewDuringStateTransferTest", singleThreaded = true)
public class WriteSkewDuringStateTransferTest extends MultipleCacheManagersTest {

   private final List<BlockingLocalTopologyManager> topologyManagerList =
         Collections.synchronizedList(new ArrayList<BlockingLocalTopologyManager>(4));

   @AfterMethod(alwaysRun = true)
   public final void unblockAll() {
      //keep track of all controlled components. In case of failure, we need to unblock all otherwise we have to wait
      //long time until the test is able to stop all cache managers.
      for (BlockingLocalTopologyManager topologyManager : topologyManagerList) {
         topologyManager.stopBlockingAll();
      }
      topologyManagerList.clear();
   }

   /*
   Replicated TX cache with WSC, A, B are in cluster, C is joining
   0. The current CH already contains A and B as owners, C is joining (is not primary owner of anything yet).
B is primary owner of K=V.
   1. A sends PrepareCommand to B and C with put(K, V) (V is null on all nodes); //A has already received the
rebalance_start
   2. C receives PrepareCommand and responds with no versions (it is not primary owner); //C has already received the
rebalance_start
   3. topology changes on B - primary ownership of K is transferred to C; //B has already received the ch_update
   4. B receives PrepareCommand, responds without K's version (it is not primary)
   5. B forwards the Prepare to C as it sees that the command has lower topology ID
   6. C responds to B's prepare with version of K; //at this point, C has received the ch_update
   7. K version is not added to B's response, B responds to A
   8. A finds out that topology has changed, forwards prepare to C; //A has received the ch_update
   9. C responds to C's prepare with version of K
   10. A receives C's response, but the versions are not added to transaction
   11. A sends out CommitCommand missing version of K
   12. all nodes record K=V without version as usual ImmortalCacheEntry
   */

   /**
    * See description above or ISPN-3738
    */
   public void testVersionsAfterStateTransfer() throws Exception {
      assertClusterSize("Wrong cluster size", 2);
      final Object key = "key1";
      assertKeyOwnership(key, cache(1), cache(0));
      final int currentTopologyId = currentTopologyId(cache(0));

      final ControlledRpcManager nodeARpcManager = replaceRpcManager(cache(0));
      final NodeController nodeAController = setNodeControllerIn(cache(0));
      setInitialPhaseForNodeA(nodeAController, currentTopologyId);
      final NodeController nodeBController = setNodeControllerIn(cache(1));
      setInitialPhaseForNodeB(nodeBController, currentTopologyId);
      final NewNode nodeC = addNode(currentTopologyId);

      //node A thinks that node B is the primary owner. Node B is blocking the prepare command until it thinks that
      //node C is the primary owner
      nodeAController.topologyManager.waitToBlock(BlockingLocalTopologyManager.LatchType.CONSISTENT_HASH_UPDATE);
      nodeARpcManager.blockAfter(VersionedPrepareCommand.class);
      //node C thinks that node B is the primary owner.
      //nodeC.controller.topologyManager.waitToBlock(BlockingLocalTopologyManager.LatchType.CONSISTENT_HASH_UPDATE);

      //after this waiting phase, node A thinks that node B is the primary owner, node B thinks that node C is the
      // primary owner and node C thinks that node B is the primary owner
      //lets execute the transaction...

      Future<Object> tx = executeTransaction(cache(0), key);

      //it waits until all nodes has replied. then, we change the topology ID and let it collect the responses.
      nodeARpcManager.waitForCommandToBlock();
      nodeAController.topologyManager.stopBlocking(BlockingLocalTopologyManager.LatchType.CONSISTENT_HASH_UPDATE);
      awaitForTopology(currentTopologyId + 2, cache(0));

      nodeARpcManager.stopBlocking();
      assertNull("Wrong put() return value.", tx.get());

      nodeAController.topologyManager.stopBlockingAll();
      nodeBController.topologyManager.stopBlockingAll();
      nodeC.controller.topologyManager.stopBlockingAll();

      nodeC.joinerFuture.get();

      awaitForTopology(currentTopologyId + 2, cache(0));
      awaitForTopology(currentTopologyId + 2, cache(1));
      awaitForTopology(currentTopologyId + 2, cache(2));

      assertKeyVersionInDataContainer(key, cache(1), cache(2));
      cache(0).put(key, "v2");
   }

   @Override
   protected void createCacheManagers() throws Throwable {
      createClusteredCaches(2, configuration());
   }

   private void assertKeyVersionInDataContainer(Object key, Cache... owners) {
      for (Cache cache : owners) {
         DataContainer dataContainer = TestingUtil.extractComponent(cache, DataContainer.class);
         InternalCacheEntry entry = dataContainer.get(key);
         assertNotNull("Entry cannot be null in " + address(cache) + ".", entry);
         assertNotNull("Version cannot be null.", entry.getMetadata().version());
      }
   }

   private ControlledRpcManager replaceRpcManager(Cache cache) {
      RpcManager manager = TestingUtil.extractComponent(cache, RpcManager.class);
      ControlledRpcManager controlledRpcManager = new ControlledRpcManager(manager);
      TestingUtil.replaceComponent(cache, RpcManager.class, controlledRpcManager, true);
      //rpcManagerList.add(controlledRpcManager);
      return controlledRpcManager;
   }

   private void awaitForTopology(final int expectedTopologyId, final Cache cache) {
      eventually(new Condition() {
         @Override
         public boolean isSatisfied() throws Exception {
            return expectedTopologyId == currentTopologyId(cache);
         }
      });
   }

   private int currentTopologyId(Cache cache) {
      return TestingUtil.extractComponent(cache, StateTransferManager.class).getCacheTopology().getTopologyId();
   }

   private Future<Object> executeTransaction(final Cache<Object, Object> cache, final Object key) {
      return fork(new Callable<Object>() {
         @Override
         public Object call() throws Exception {
            return TestingUtil.withTx(cache.getAdvancedCache().getTransactionManager(), new Callable<Object>() {
               @Override
               public Object call() throws Exception {
                  return cache.put(key, "value");
               }
            });
         }
      });
   }

   private NewNode addNode(final int currentTopologyId) {
      final NewNode newNode = new NewNode();
      ConfigurationBuilder builder = configuration();
      newNode.controller = new NodeController();
      newNode.controller.interceptor = new ControlledCommandInterceptor();
      builder.customInterceptors().addInterceptor().index(0).interceptor(newNode.controller.interceptor);
      EmbeddedCacheManager embeddedCacheManager = addClusterEnabledCacheManager(builder);
      newNode.controller.topologyManager = replaceTopologyManager(embeddedCacheManager);
      newNode.controller.interceptor.addAction(new Action() {
         @Override
         public boolean isApplicable(InvocationContext context, VisitableCommand command) {
            return !context.isOriginLocal() && command instanceof PrepareCommand;
         }

         @Override
         public void before(InvocationContext context, VisitableCommand command, Cache cache) {
            log.tracef("Before: command=%s. origin=%s", command, context.getOrigin());
            if (context.getOrigin().equals(address(cache(1)))) {
               //from node B, i.e, it is forwarded. it needs to wait until the topology changes
               try {
                  cache.getAdvancedCache().getComponentRegistry().getStateTransferLock().waitForTopology(currentTopologyId + 2,
                                                                                                         10, TimeUnit.SECONDS);
               } catch (InterruptedException e) {
                  Thread.currentThread().interrupt();
               }
            }
         }

         @Override
         public void after(InvocationContext context, VisitableCommand command, Cache cache) {
            log.tracef("After: command=%s. origin=%s", command, context.getOrigin());
            if (context.getOrigin().equals(address(cache(0)))) {
               newNode.controller.topologyManager.stopBlocking(BlockingLocalTopologyManager.LatchType.CONSISTENT_HASH_UPDATE);
            }
         }
      });

      newNode.controller.topologyManager.startBlocking(BlockingLocalTopologyManager.LatchType.CONSISTENT_HASH_UPDATE);
      newNode.joinerFuture = fork(new Callable<Void>() {
         @Override
         public Void call() throws Exception {
            waitForClusterToForm();
            return null;
         }
      });
      return newNode;
   }

   private ConfigurationBuilder configuration() {
      ConfigurationBuilder builder = getDefaultClusteredCacheConfig(CacheMode.REPL_SYNC, true);
      builder.clustering()
            .stateTransfer().fetchInMemoryState(true)
            .hash().numSegments(1).numOwners(3).consistentHashFactory(new ConsistentHashFactoryImpl());
      builder.locking()
            .isolationLevel(IsolationLevel.REPEATABLE_READ)
            .writeSkewCheck(true);
      builder.versioning()
            .enable()
            .scheme(VersioningScheme.SIMPLE);
      return builder;
   }

   private void assertKeyOwnership(Object key, Cache primaryOwner, Cache... backupOwners) {
      assertTrue("Wrong ownership for " + key + ".", hasOwners(key, primaryOwner, backupOwners));
   }

   private BlockingLocalTopologyManager replaceTopologyManager(CacheContainer cacheContainer) {
      BlockingLocalTopologyManager localTopologyManager = BlockingLocalTopologyManager.replaceTopologyManager(cacheContainer);
      topologyManagerList.add(localTopologyManager);
      return localTopologyManager;
   }

   private static NodeController setNodeControllerIn(Cache<Object, Object> cache) {
      NodeController nodeController = new NodeController();
      nodeController.interceptor = new ControlledCommandInterceptor(cache);
      nodeController.topologyManager = BlockingLocalTopologyManager.replaceTopologyManager(cache.getCacheManager());
      return nodeController;
   }

   private static void setInitialPhaseForNodeA(NodeController nodeA, final int currentTopology) {
      //node A initial phase:
      //* Node A sends the prepare for B and C. So, node A will send the prepare after the topologyId+1 is installed.
      nodeA.interceptor.addAction(new Action() {
         @Override
         public boolean isApplicable(InvocationContext context, VisitableCommand command) {
            return context.isOriginLocal() && command instanceof PrepareCommand;
         }

         @Override
         public void before(InvocationContext context, VisitableCommand command, Cache cache) {
            try {
               cache.getAdvancedCache().getComponentRegistry().getStateTransferLock().waitForTopology(currentTopology + 1,
                                                                                                      10, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               Thread.currentThread().interrupt();
            }
         }

         @Override
         public void after(InvocationContext context, VisitableCommand command, Cache cache) {
            //no-op
         }
      });
      nodeA.topologyManager.startBlocking(BlockingLocalTopologyManager.LatchType.CONSISTENT_HASH_UPDATE);
   }

   private static void setInitialPhaseForNodeB(NodeController nodeB, final int currentTopology) {
      //node B initial phase:
      //* Node B receives the prepare after it looses the primary owner to node C
      nodeB.interceptor.addAction(new Action() {
         @Override
         public boolean isApplicable(InvocationContext context, VisitableCommand command) {
            return !context.isOriginLocal() && command instanceof PrepareCommand;
         }

         @Override
         public void before(InvocationContext context, VisitableCommand command, Cache cache) {
            try {
               cache.getAdvancedCache().getComponentRegistry().getStateTransferLock().waitForTopology(currentTopology + 2,
                                                                                                      10, TimeUnit.SECONDS);
            } catch (InterruptedException e) {
               Thread.currentThread().interrupt();
            }
         }

         @Override
         public void after(InvocationContext context, VisitableCommand command, Cache cache) {
            //no-op
         }
      });
   }

   public interface Action {

      public boolean isApplicable(InvocationContext context, VisitableCommand command);

      public void before(InvocationContext context, VisitableCommand command, Cache cache);

      public void after(InvocationContext context, VisitableCommand command, Cache cache);

   }

   public static class ConsistentHashFactoryImpl extends SingleSegmentConsistentHashFactory {

      @Override
      protected final List<Address> createOwnersCollection(List<Address> members, int numberOfOwners) {
         assertEquals("Wrong number of owners", 3, numberOfOwners);
         //the primary owner is the last member.
         final List<Address> owners = new ArrayList<Address>(3);
         owners.add(members.get(members.size() - 1));
         for (int i = 0; i < members.size() - 1; ++i) {
            owners.add(members.get(i));
         }
         return owners;
      }
   }

   public static class ControlledCommandInterceptor extends BaseCustomInterceptor {

      private final List<Action> actionList;

      public ControlledCommandInterceptor(Cache<Object, Object> cache) {
         actionList = new ArrayList<Action>(3);
         this.cache = cache;
         this.cacheConfiguration = cache.getCacheConfiguration();
         this.embeddedCacheManager = cache.getCacheManager();
         cache.getAdvancedCache().addInterceptor(this, 0);
      }

      public ControlledCommandInterceptor() {
         actionList = new ArrayList<Action>(3);
      }

      public void addAction(Action action) {
         actionList.add(action);
      }

      @Override
      protected Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable {
         List<Action> actions = extractActions(ctx, command);
         if (actions.isEmpty()) {
            return invokeNextInterceptor(ctx, command);
         }
         for (Action action : actions) {
            action.before(ctx, command, cache);
         }
         Object retVal = invokeNextInterceptor(ctx, command);
         for (Action action : actions) {
            action.after(ctx, command, cache);
         }
         return retVal;
      }

      private List<Action> extractActions(InvocationContext context, VisitableCommand command) {
         if (actionList.isEmpty()) {
            return Collections.emptyList();
         }
         List<Action> actions = new ArrayList<Action>(actionList.size());
         for (Action action : actionList) {
            if (action.isApplicable(context, command)) {
               actions.add(action);
            }
         }
         return actions;
      }
   }

   private static class NodeController {
      ControlledCommandInterceptor interceptor;
      BlockingLocalTopologyManager topologyManager;
   }

   private static class NewNode {
      Future<Void> joinerFuture;
      NodeController controller;
   }
}
TOP

Related Classes of org.infinispan.statetransfer.WriteSkewDuringStateTransferTest$NewNode

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.