Package org.infinispan.iteration

Source Code of org.infinispan.iteration.DistributedEntryRetrieverTest

package org.infinispan.iteration;

import org.infinispan.Cache;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.CacheEntry;
import org.infinispan.container.entries.ImmortalCacheEntry;
import org.infinispan.distribution.DistributionManager;
import org.infinispan.distribution.MagicKey;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.iteration.impl.EntryRequestCommand;
import org.infinispan.iteration.impl.EntryRetriever;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.rpc.RpcOptions;
import org.infinispan.remoting.transport.Address;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.mockito.AdditionalAnswers;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.annotations.Test;

import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import static org.mockito.Matchers.*;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.withSettings;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertTrue;

/**
* Test to verify distributed entry behavior
*
* @author wburns
* @since 7.0
*/
@Test(groups = "functional", testName = "iteration.DistributedEntryRetrieverTest")
public class DistributedEntryRetrieverTest extends BaseClusteredEntryRetrieverTest {
   public DistributedEntryRetrieverTest() {
      super(false, CacheMode.DIST_SYNC);
      // This is needed since we kill nodes
      cleanup = CleanupPhase.AFTER_METHOD;
   }

   protected Object getKeyTiedToCache(Cache<?, ?> cache) {
      return new MagicKey(cache);
   }

   @Test
   public void verifyNodeLeavesBeforeGettingData() throws TimeoutException, InterruptedException, ExecutionException {
      Map<Object, String> values = putValueInEachCache(3);

      Cache<Object, String> cache0 = cache(0, CACHE_NAME);
      Cache<Object, String> cache1 = cache(1, CACHE_NAME);

      CheckPoint checkPoint = new CheckPoint();
      checkPoint.triggerForever("post_send_response_released");
      waitUntilSendingResponse(cache1, checkPoint);

      final EntryRetriever<Object, String> retriever = cache0.getAdvancedCache().getComponentRegistry().getComponent(
            EntryRetriever.class);

      final BlockingQueue<Map.Entry<Object, String>> returnQueue = new ArrayBlockingQueue<Map.Entry<Object, String>>(10);
      Future<Void> future = fork(new Callable<Void>() {
         @Override
         public Void call() throws Exception {
            Iterator<CacheEntry<Object, String>> iter = retriever.retrieveEntries(null, null, null);
            while (iter.hasNext()) {
               Map.Entry<Object, String> entry = iter.next();
               returnQueue.add(entry);
            }
            return null;
         }
      });

      // Make sure the thread is waiting for the response
      checkPoint.awaitStrict("pre_send_response_invoked", 10, TimeUnit.SECONDS);

      // Now kill the cache - we should recover
      killMember(1, CACHE_NAME);

      checkPoint.trigger("pre_send_response_released");

      future.get(10, TimeUnit.SECONDS);

      for (Map.Entry<Object, String> entry : values.entrySet()) {
         assertTrue(returnQueue.contains(entry), "Entry wasn't found:" + entry);
      }
   }

   /**
    * This test is to verify proper behavior when a node dies after sending a batch to the requestor
    */
   @Test
   public void verifyNodeLeavesAfterSendingBackSomeData() throws TimeoutException, InterruptedException, ExecutionException {
      Cache<Object, String> cache0 = cache(0, CACHE_NAME);
      Cache<Object, String> cache1 = cache(1, CACHE_NAME);

      Map<Object, String> values = new HashMap<Object, String>();
      int chunkSize = cache0.getCacheConfiguration().clustering().stateTransfer().chunkSize();
      // Now insert 10 more values than the chunk size into the node we will kill
      for (int i = 0; i < chunkSize + 10; ++i) {
         MagicKey key = new MagicKey(cache1);
         cache1.put(key, key.toString());
         values.put(key, key.toString());
      }

      CheckPoint checkPoint = new CheckPoint();
      // Let the first request come through fine
      checkPoint.trigger("pre_send_response_released");
      waitUntilSendingResponse(cache1, checkPoint);

      final EntryRetriever<Object, String> retriever = cache0.getAdvancedCache().getComponentRegistry().getComponent(
            EntryRetriever.class);

      final BlockingQueue<Map.Entry<Object, String>> returnQueue = new LinkedBlockingQueue<Map.Entry<Object, String>>();
      Future<Void> future = fork(new Callable<Void>() {
         @Override
         public Void call() throws Exception {
            Iterator<CacheEntry<Object, String>> iter = retriever.retrieveEntries(null, null, null);
            while (iter.hasNext()) {
               Map.Entry<Object, String> entry = iter.next();
               returnQueue.add(entry);
            }
            return null;
         }
      });

      // Now wait for them to send back first results
      checkPoint.awaitStrict("post_send_response_invoked", 10, TimeUnit.SECONDS);

      // We should get a value now, note all values are currently residing on cache1 as primary
      Map.Entry<Object, String> value = returnQueue.poll(10, TimeUnit.SECONDS);

      // Now kill the cache - we should recover
      killMember(1, CACHE_NAME);

      future.get(10, TimeUnit.SECONDS);

      for (Map.Entry<Object, String> entry : values.entrySet()) {
         assertTrue(returnQueue.contains(entry) || entry.equals(value), "Entry wasn't found:" + entry);
      }
   }

   @Test
   public void waitUntilProcessingResults() throws TimeoutException, InterruptedException, ExecutionException {
      Cache<Object, String> cache0 = cache(0, CACHE_NAME);
      Cache<Object, String> cache1 = cache(1, CACHE_NAME);

      Map<Object, String> values = new HashMap<Object, String>();
      for (int i = 0; i < 501; ++i) {
         MagicKey key = new MagicKey(cache1);
         cache1.put(key, key.toString());
         values.put(key, key.toString());
      }

      CheckPoint checkPoint = new CheckPoint();
      checkPoint.triggerForever("post_receive_response_released");
      waitUntilStartOfProcessingResult(cache0, checkPoint);

      final EntryRetriever<Object, String> retriever = cache0.getAdvancedCache().getComponentRegistry().getComponent(
            EntryRetriever.class);

      final BlockingQueue<Map.Entry<Object, String>> returnQueue = new LinkedBlockingQueue<Map.Entry<Object, String>>();
      Future<Void> future = fork(new Callable<Void>() {
         @Override
         public Void call() throws Exception {
            Iterator<CacheEntry<Object, String>> iter = retriever.retrieveEntries(null, null, null);
            while (iter.hasNext()) {
               Map.Entry<Object, String> entry = iter.next();
               returnQueue.add(entry);
            }
            return null;
         }
      });

      // Now wait for them to send back first results but don't let them process
      checkPoint.awaitStrict("pre_receive_response_invoked", 10, TimeUnit.SECONDS);

      // Now let them process the results
      checkPoint.triggerForever("pre_receive_response_released");

      // Now kill the cache - we should recover and get appropriate values
      killMember(1, CACHE_NAME);

      future.get(10, TimeUnit.SECONDS);


      ConsistentHash hash = cache0.getAdvancedCache().getComponentRegistry().getComponent(DistributionManager.class).getReadConsistentHash();
      Map<Integer, Set<Map.Entry<Object, String>>> expected = generateEntriesPerSegment(hash, values.entrySet());
      Map<Integer, Set<Map.Entry<Object, String>>> answer = generateEntriesPerSegment(hash, returnQueue);

      for (Map.Entry<Integer, Set<Map.Entry<Object, String>>> entry : expected.entrySet()) {
         assertEquals(answer.get(entry.getKey()), entry.getValue(), "Segment " + entry.getKey() + " had a mismatch");
      }
   }

   @Test
   public void testNodeLeavesWhileIteratingOverContainerCausingRehashToLoseValues() throws TimeoutException,
                                                                                           InterruptedException,
                                                                                           ExecutionException {
      Cache<Object, String> cache0 = cache(0, CACHE_NAME);
      Cache<Object, String> cache1 = cache(1, CACHE_NAME);
      Cache<Object, String> cache2 = cache(2, CACHE_NAME);

      // Add an extra so that when we remove 1 it means not all the values will be on 1 node
      addClusterEnabledCacheManager(builderUsed);

      // put a lot of entries in cache0, so that when a node goes down it will lose some
      Map<Object, String> values = new HashMap<Object, String>();
      for (int i = 0; i < 501; ++i) {
         MagicKey key = new MagicKey(cache0);
         cache1.put(key, key.toString());
         values.put(key, key.toString());
      }

      CheckPoint checkPoint = new CheckPoint();
      checkPoint.triggerForever("post_iterator_released");
      waitUntilDataContainerWillBeIteratedOn(cache0, checkPoint);

      final EntryRetriever<Object, String> retriever = cache2.getAdvancedCache().getComponentRegistry().getComponent(
            EntryRetriever.class);

      final BlockingQueue<Map.Entry<Object, String>> returnQueue = new LinkedBlockingQueue<Map.Entry<Object, String>>();
      Future<Void> future = fork(new Callable<Void>() {
         @Override
         public Void call() throws Exception {
            Iterator<CacheEntry<Object, String>> iter = retriever.retrieveEntries(null, null, null);
            while (iter.hasNext()) {
               Map.Entry<Object, String> entry = iter.next();
               returnQueue.add(entry);
            }
            return null;
         }
      });

      // Now wait for them to send back first results but don't let them process
      checkPoint.awaitStrict("pre_iterator_invoked", 10, TimeUnit.SECONDS);

      // Now kill the cache - we should recover and get appropriate values
      killMember(1, CACHE_NAME);

      // Now let them process the results
      checkPoint.triggerForever("pre_iterator_released");

      future.get(10, TimeUnit.SECONDS);


      ConsistentHash hash = cache0.getAdvancedCache().getComponentRegistry().getComponent(DistributionManager.class).getReadConsistentHash();
      Map<Integer, Set<Map.Entry<Object, String>>> expected = generateEntriesPerSegment(hash, values.entrySet());
      Map<Integer, Set<Map.Entry<Object, String>>> answer = generateEntriesPerSegment(hash, returnQueue);

      for (Map.Entry<Integer, Set<Map.Entry<Object, String>>> entry : expected.entrySet()) {
         assertEquals(answer.get(entry.getKey()), entry.getValue(), "Segment " + entry.getKey() + " had a mismatch");
      }
   }

   private Map<Integer, Set<Map.Entry<Object, String>>> generateEntriesPerSegment(ConsistentHash hash, Iterable<Map.Entry<Object, String>> entries) {
      Map<Integer, Set<Map.Entry<Object, String>>> returnMap = new HashMap<Integer, Set<Map.Entry<Object, String>>>();

      for (Map.Entry<Object, String> value : entries) {
         int segment = hash.getSegment(value.getKey());
         Set<Map.Entry<Object, String>> set = returnMap.get(segment);
         if (set == null) {
            set = new HashSet<Map.Entry<Object, String>>();
            returnMap.put(segment, set);
         }
         set.add(new ImmortalCacheEntry(value.getKey(), value.getValue()));
      }
      return returnMap;
   }

   protected RpcManager waitUntilSendingResponse(final Cache<?, ?> cache, final CheckPoint checkPoint) {
      RpcManager rpc = TestingUtil.extractComponent(cache, RpcManager.class);
      final Answer<Object> forwardedAnswer = AdditionalAnswers.delegatesTo(rpc);
      RpcManager mockManager = mock(RpcManager.class, withSettings().defaultAnswer(forwardedAnswer));
      doAnswer(new Answer() {
         @Override
         public Object answer(InvocationOnMock invocation) throws Throwable {
            // Wait for main thread to sync up
            checkPoint.trigger("pre_send_response_invoked");
            // Now wait until main thread lets us through
            checkPoint.awaitStrict("pre_send_response_released", 10, TimeUnit.SECONDS);

            try {
               return forwardedAnswer.answer(invocation);
            } finally {
               // Wait for main thread to sync up
               checkPoint.trigger("post_send_response_invoked");
               // Now wait until main thread lets us through
               checkPoint.awaitStrict("post_send_response_released", 10, TimeUnit.SECONDS);
            }
         }
      }).when(mockManager).invokeRemotely(anyCollectionOf(Address.class), any(EntryRequestCommand.class),
                                          any(RpcOptions.class));
      TestingUtil.replaceComponent(cache, RpcManager.class, mockManager, true);
      return rpc;
   }

   protected EntryRetriever waitUntilStartOfProcessingResult(final Cache<?, ?> cache, final CheckPoint checkPoint) {
      EntryRetriever rpc = TestingUtil.extractComponent(cache, EntryRetriever.class);
      final Answer<Object> forwardedAnswer = AdditionalAnswers.delegatesTo(rpc);
      EntryRetriever mockRetriever = mock(EntryRetriever.class, withSettings().defaultAnswer(forwardedAnswer));
      doAnswer(new Answer() {
         @Override
         public Object answer(InvocationOnMock invocation) throws Throwable {
            // Wait for main thread to sync up
            checkPoint.trigger("pre_receive_response_invoked");
            // Now wait until main thread lets us through
            checkPoint.awaitStrict("pre_receive_response_released", 10, TimeUnit.SECONDS);

            try {
               return forwardedAnswer.answer(invocation);
            } finally {
               // Wait for main thread to sync up
               checkPoint.trigger("post_receive_response_invoked");
               // Now wait until main thread lets us through
               checkPoint.awaitStrict("post_receive_response_released", 10, TimeUnit.SECONDS);
            }
         }
      }).when(mockRetriever).receiveResponse(any(UUID.class), any(Address.class), anySetOf(Integer.class),
                                             anySetOf(Integer.class), anyCollection());
      TestingUtil.replaceComponent(cache, EntryRetriever.class, mockRetriever, true);
      return rpc;
   }

   protected DataContainer waitUntilDataContainerWillBeIteratedOn(final Cache<?, ?> cache, final CheckPoint checkPoint) {
      DataContainer rpc = TestingUtil.extractComponent(cache, DataContainer.class);
      final Answer<Object> forwardedAnswer = AdditionalAnswers.delegatesTo(rpc);
      DataContainer mocaContainer = mock(DataContainer.class, withSettings().defaultAnswer(forwardedAnswer));
      final AtomicInteger invocationCount = new AtomicInteger();
      doAnswer(new Answer() {
         @Override
         public Object answer(InvocationOnMock invocation) throws Throwable {
            boolean waiting = false;
            if (invocationCount.getAndIncrement() == 0) {
               waiting = true;
               // Wait for main thread to sync up
               checkPoint.trigger("pre_iterator_invoked");
               // Now wait until main thread lets us through
               checkPoint.awaitStrict("pre_iterator_released", 10, TimeUnit.SECONDS);
            }

            try {
               return forwardedAnswer.answer(invocation);
            } finally {
               invocationCount.getAndDecrement();
               if (waiting) {
                  // Wait for main thread to sync up
                  checkPoint.trigger("post_iterator_invoked");
                  // Now wait until main thread lets us through
                  checkPoint.awaitStrict("post_iterator_released", 10, TimeUnit.SECONDS);
               }
            }
         }
      }).when(mocaContainer).iterator();
      TestingUtil.replaceComponent(cache, DataContainer.class, mocaContainer, true);
      return rpc;
   }
}
TOP

Related Classes of org.infinispan.iteration.DistributedEntryRetrieverTest

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.