Package org.infinispan.notifications.cachelistener

Source Code of org.infinispan.notifications.cachelistener.CacheNotifierImplInitialTransferDistTest

package org.infinispan.notifications.cachelistener;

import org.infinispan.Cache;
import org.infinispan.commons.util.CloseableIterator;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.distribution.ch.ConsistentHash;
import org.infinispan.filter.Converter;
import org.infinispan.filter.KeyValueFilter;
import org.infinispan.iteration.impl.EntryRetriever;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryCreated;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryModified;
import org.infinispan.notifications.cachelistener.annotation.CacheEntryRemoved;
import org.infinispan.notifications.cachelistener.cluster.ClusterCacheNotifier;
import org.infinispan.notifications.cachelistener.event.CacheEntryEvent;
import org.infinispan.notifications.cachelistener.event.Event;
import org.infinispan.remoting.transport.Address;
import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.CheckPoint;
import org.mockito.AdditionalAnswers;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testng.annotations.Test;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;
import static org.testng.Assert.*;


@Test(groups = "unit", testName = "notifications.cachelistener.CacheNotifierImplInitialTransferDistTest")
public class CacheNotifierImplInitialTransferDistTest extends MultipleCacheManagersTest {
   private final String CACHE_NAME = "DistInitialTransferListener";
   @Override
   protected void createCacheManagers() throws Throwable {
      createClusteredCaches(3, CACHE_NAME, getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC));
   }

   private static enum Operation {
      PUT(Event.Type.CACHE_ENTRY_MODIFIED), CREATE(Event.Type.CACHE_ENTRY_CREATED), REMOVE(Event.Type.CACHE_ENTRY_REMOVED) {
         @Override
         public <K, V> Object perform(Cache<K, V> cache, K key, V value) {
            return cache.remove(key);
         }
      };

      private final Event.Type type;

      private Operation(Event.Type type) {
         this.type = type;
      }

      public Event.Type getType() {
         return type;
      }

      public <K, V> Object perform(Cache<K, V> cache, K key, V value) {
         return cache.put(key, value);
      }
   }

//   TODO: commented out until local listners support includeCurrentState
//   public void testSimpleCacheStartingNonClusterListener() {
//      testSimpleCacheStarting(new StateListenerNotClustered());
//   }

   public void testSimpleCacheStartingClusterListener() {
      testSimpleCacheStarting(new StateListenerClustered());
   }

   private void testSimpleCacheStarting(final StateListener<String, String> listener) {
      final Map<String, String> expectedValues = new HashMap<String, String>(10);
      Cache<String, String> cache = cache(0, CACHE_NAME);
      for (int i = 0; i < 10; i++) {
         String key = "key-" + i;
         String value = "value-" + i;
         expectedValues.put(key, value);
         cache.put(key, value);
      }

      cache.addListener(listener);
      try {
         verifyEvents(isClustered(listener), listener, expectedValues);
      } finally {
         cache.removeListener(listener);
      }
   }

   private void verifyEvents(boolean isClustered, StateListener<String, String> listener,
                            Map<String, String> expected) {
      assertEquals(listener.events.size(), isClustered ? expected.size() : expected.size() * 2);
      boolean isPost = true;
      for (CacheEntryEvent<String, String> event : listener.events) {
         // Even checks means it will be post and have a value - note we force every check to be
         // even for clustered since those should always be post
         if (!isClustered) {
            isPost = !isPost;
         }

         assertEquals(event.getType(), Event.Type.CACHE_ENTRY_CREATED);
         assertTrue(expected.containsKey(event.getKey()));
         assertEquals(event.isPre(), !isPost);
         if (isPost) {
            assertEquals(event.getValue(), expected.get(event.getKey()));
         } else {
            assertNull(event.getValue());
         }
      }
   }

//   TODO: commented out until local listners support includeCurrentState
//   public void testCreateAfterIterationBeganButNotIteratedValueYetNonOwnerNotClustered()
//         throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
//      testModificationAfterIterationBeganButNotIteratedValueYet(new StateListenerNotClustered(), Operation.CREATE, false);
//   }

   public void testCreateAfterIterationBeganButNotIteratedValueYetNonOwnerClustered()
         throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
      testModificationAfterIterationBeganButNotIteratedValueYet(new StateListenerClustered(), Operation.CREATE, false);
   }

//   TODO: commented out until local listners support includeCurrentState
//   public void testCreateAfterIterationBeganButNotIteratedValueYetOwnerNotClustered()
//         throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
//      testModificationAfterIterationBeganButNotIteratedValueYet(new StateListenerNotClustered(), Operation.CREATE, true);
//   }

   public void testCreateAfterIterationBeganButNotIteratedValueYetOwnerClustered()
         throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
      testModificationAfterIterationBeganButNotIteratedValueYet(new StateListenerClustered(), Operation.CREATE, true);
   }

//   TODO: commented out until local listners support includeCurrentState
//   public void testModificationAfterIterationBeganButNotIteratedValueYetNonOwnerNotClustered()
//         throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
//      testModificationAfterIterationBeganButNotIteratedValueYet(new StateListenerNotClustered(), Operation.PUT, false);
//   }

   public void testModificationAfterIterationBeganButNotIteratedValueYetNonOwnerClustered()
         throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
      testModificationAfterIterationBeganButNotIteratedValueYet(new StateListenerClustered(), Operation.PUT, false);
   }

//   TODO: commented out until local listners support includeCurrentState
//   public void testModificationAfterIterationBeganButNotIteratedValueYetOwnerNotClustered()
//         throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
//      testModificationAfterIterationBeganButNotIteratedValueYet(new StateListenerNotClustered(), Operation.PUT, true);
//   }

   public void testModificationAfterIterationBeganButNotIteratedValueYetOwnerClustered()
         throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
      testModificationAfterIterationBeganButNotIteratedValueYet(new StateListenerClustered(), Operation.PUT, true);
   }

//   TODO: commented out until local listners support includeCurrentState
//   public void testRemoveAfterIterationBeganButNotIteratedValueYetNonOwnerNotClustered()
//         throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
//      testModificationAfterIterationBeganButNotIteratedValueYet(new StateListenerNotClustered(), Operation.REMOVE, false);
//   }

   public void testRemoveAfterIterationBeganButNotIteratedValueYetNonOwnerClustered()
         throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
      testModificationAfterIterationBeganButNotIteratedValueYet(new StateListenerClustered(), Operation.REMOVE, false);
   }

//   TODO: commented out until local listners support includeCurrentState
//   public void testRemoveAfterIterationBeganButNotIteratedValueYetOwnerNotClustered()
//         throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
//      testModificationAfterIterationBeganButNotIteratedValueYet(new StateListenerNotClustered(), Operation.REMOVE, true);
//   }

   public void testRemoveAfterIterationBeganButNotIteratedValueYetOwnerClustered()
         throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException {
      testModificationAfterIterationBeganButNotIteratedValueYet(new StateListenerClustered(), Operation.REMOVE, true);
   }

   /**
    * This test is to verify that the modification event replaces the current value for the key
    */
   private void testModificationAfterIterationBeganButNotIteratedValueYet(final StateListener<String, String> listener,
                                                                          Operation operation, boolean shouldBePrimaryOwner)
         throws InterruptedException, TimeoutException, BrokenBarrierException, ExecutionException {
      final Map<String, String> expectedValues = new HashMap<>(10);
      final Cache<String, String> cache = cache(0, CACHE_NAME);
      for (int i = 0; i < 10; i++) {
         String key = "key-" + i;
         String value = "value-" + i;
         expectedValues.put(key, value);
         cache.put(key, value);
      }

      final CheckPoint checkPoint = new CheckPoint();

      EntryRetriever retriever = waitUntilRetrievingIterator(cache, checkPoint);
      try {
         Future<Void> future = fork(new Callable<Void>() {

            @Override
            public Void call() throws Exception {
               cache.addListener(listener);
               return null;
            }
         });

         checkPoint.awaitStrict("pre_retrieve_entry_invoked", 10, TimeUnit.SECONDS);

         String value;
         String keyToChange = findKeyBasedOnOwnership(expectedValues.keySet(),
               cache.getAdvancedCache().getDistributionManager().getConsistentHash(),
               shouldBePrimaryOwner, cache.getCacheManager().getAddress());

         switch (operation) {
            case CREATE:
               keyToChange = "new-key";
               value = "new-value";
               expectedValues.put(keyToChange, value);
               break;
            case PUT:
               value =  cache.get(keyToChange) + "-changed";
               // Now remove the old value and put in the new one
               expectedValues.put(keyToChange, value);
               break;
            case REMOVE:
               value = null;
               expectedValues.remove(keyToChange);
               break;
            default:
               throw new IllegalArgumentException("Unsupported Operation provided " + operation);
         }

         operation.perform(cache, keyToChange, value);

         // Now let the iteration complete
         checkPoint.triggerForever("pre_retrieve_entry_released");

         future.get(10, TimeUnit.SECONDS);

         verifyEvents(isClustered(listener), listener, expectedValues);
      } finally {
         TestingUtil.replaceComponent(cache, EntryRetriever.class, retriever, true);
         cache.removeListener(listener);
      }
   }

   /**
    * This test is to verify that the modification event is sent after the creation event is done
    */
   private void testModificationAfterIterationBeganAndCompletedSegmentValueOwner(final StateListener<String, String> listener,
                                                                         Operation operation,
                                                                         boolean shouldBePrimaryOwner)
         throws IOException, InterruptedException, TimeoutException, BrokenBarrierException, ExecutionException {
      final Map<String, String> expectedValues = new HashMap<String, String>(10);
      final Cache<String, String> cache = cache(0, CACHE_NAME);
      for (int i = 0; i < 10; i++) {
         String key = "key-" + i;
         String value = "value-" + i;
         expectedValues.put(key, value);
         cache.put(key, value);
      }

      CheckPoint checkPoint = new CheckPoint();

      EntryRetriever retriever = waitUntilClosingIterator(cache, checkPoint);

      try {
         Future<Void> future = fork(new Callable<Void>() {

            @Override
            public Void call() throws Exception {
               cache.addListener(listener);
               return null;
            }
         });

         checkPoint.awaitStrict("pre_close_iter_invoked", 10, TimeUnit.SECONDS);

         String value;
         String keyToChange = findKeyBasedOnOwnership(expectedValues.keySet(),
                                                      cache.getAdvancedCache().getDistributionManager().getConsistentHash(),
                                                      shouldBePrimaryOwner, cache.getCacheManager().getAddress());

         switch (operation) {
            case CREATE:
               keyToChange = "new-key";
               value = "new-value";
               break;
            case PUT:
               value =  cache.get(keyToChange) + "-changed";
               break;
            case REMOVE:
               value = null;
               break;
            default:
               throw new IllegalArgumentException("Unsupported Operation provided " + operation);
         }

         Object oldValue = operation.perform(cache, keyToChange, value);

         // Now let the iteration complete
         checkPoint.triggerForever("pre_close_iter_released");

         future.get(10, TimeUnit.SECONDS);

         boolean isClustered = isClustered(listener);

         // We should have 1 or 2 (local) events due to the modification coming after we iterated on it.  Note the value
         // isn't brought up until the iteration is done
         assertEquals(listener.events.size(), isClustered ? expectedValues.size() + 1 : (expectedValues.size() + 1) * 2);


         // Assert the first 10/20 since they should all be from iteration - this may not work since segments complete earlier..
         boolean isPost = true;
         int position = 0;
         for (; position < (isClustered ? expectedValues.size() : expectedValues.size() * 2); ++position) {
            // Even checks means it will be post and have a value - note we force every check to be
            // even for clustered since those should always be post
            if (!isClustered) {
               isPost = !isPost;
            }

            CacheEntryEvent event = listener.events.get(position);

            assertEquals(event.getType(), Event.Type.CACHE_ENTRY_CREATED);
            assertTrue(expectedValues.containsKey(event.getKey()));
            assertEquals(event.isPre(), !isPost);
            if (isPost) {
               assertEquals(event.getValue(), expectedValues.get(event.getKey()));
            } else {
               assertNull(event.getValue());
            }
         }


         // We should have 2 extra events at the end which are our modifications
         if (isClustered) {
            CacheEntryEvent<String, String> event = listener.events.get(position);
            assertEquals(event.getType(), operation.getType());
            assertEquals(event.isPre(), false);
            assertEquals(event.getKey(), keyToChange);
            assertEquals(event.getValue(), value);
         } else {
            CacheEntryEvent<String, String> event = listener.events.get(position);
            assertEquals(event.getType(), operation.getType());
            assertEquals(event.isPre(), true);
            assertEquals(event.getKey(), keyToChange);
            assertEquals(event.getValue(), oldValue);

            event = listener.events.get(position + 1);
            assertEquals(event.getType(), operation.getType());
            assertEquals(event.isPre(), false);
            assertEquals(event.getKey(), keyToChange);
            assertEquals(event.getValue(), value);
         }
      } finally {
         TestingUtil.replaceComponent(cache, EntryRetriever.class, retriever, true);
         cache.removeListener(listener);
      }
   }

   private <K> K findKeyBasedOnOwnership(Iterable<? extends K> keys, ConsistentHash hash, boolean shouldBePrimaryOwner,
                                         Address address) {
      for (K key : keys) {
         boolean isPrimaryOwner = hash.locatePrimaryOwner(key).equals(address);
         if (isPrimaryOwner == shouldBePrimaryOwner) {
            return key;
         }
      }
      throw new RuntimeException("No key could be found for owner, this may be a bug in test or really bad luck!");
   }

//   TODO: commented out until local listners support includeCurrentState
//   public void testCreateAfterIterationBeganAndCompletedSegmentValueNonOwnerNotClustered()
//         throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
//      testModificationAfterIterationBeganAndCompletedSegmentValueOwner(new StateListenerNotClustered(), Operation.CREATE, false);
//   }

   public void testCreateAfterIterationBeganAndCompletedSegmentValueNonOwnerClustered()
         throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
      testModificationAfterIterationBeganAndCompletedSegmentValueOwner(new StateListenerClustered(), Operation.CREATE, false);
   }

//   TODO: commented out until local listners support includeCurrentState
//   public void testCreateAfterIterationBeganAndCompletedSegmentValueOwnerNotClustered()
//         throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
//      testModificationAfterIterationBeganAndCompletedSegmentValueOwner(new StateListenerNotClustered(), Operation.CREATE, true);
//   }

   public void testCreateAfterIterationBeganAndCompletedSegmentValueOwnerClustered()
         throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
      testModificationAfterIterationBeganAndCompletedSegmentValueOwner(new StateListenerClustered(), Operation.CREATE, true);
   }

//   TODO: commented out until local listners support includeCurrentState
//   public void testModificationAfterIterationBeganAndCompletedSegmentValueNonOwnerNotClustered()
//         throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
//      testModificationAfterIterationBeganAndCompletedSegmentValueOwner(new StateListenerNotClustered(), Operation.PUT, false);
//   }

   public void testModificationAfterIterationBeganAndCompletedSegmentValueNonOwnerClustered()
         throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
      testModificationAfterIterationBeganAndCompletedSegmentValueOwner(new StateListenerClustered(), Operation.PUT, false);
   }

//   TODO: commented out until local listners support includeCurrentState
//   public void testModificationAfterIterationBeganAndCompletedSegmentValueOwnerNotClustered()
//         throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
//      testModificationAfterIterationBeganAndCompletedSegmentValueOwner(new StateListenerNotClustered(), Operation.PUT, true);
//   }

   public void testModificationAfterIterationBeganAndCompletedSegmentValueOwnerClustered()
         throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
      testModificationAfterIterationBeganAndCompletedSegmentValueOwner(new StateListenerClustered(), Operation.PUT, true);
   }

//   TODO: commented out until local listners support includeCurrentState
//   public void testRemoveAfterIterationBeganAndCompletedSegmentValueNonOwnerNotClustered()
//         throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
//      testModificationAfterIterationBeganAndCompletedSegmentValueOwner(new StateListenerNotClustered(), Operation.REMOVE, false);
//   }

   public void testRemoveAfterIterationBeganAndCompletedSegmentValueNonOwnerClustered()
         throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
      testModificationAfterIterationBeganAndCompletedSegmentValueOwner(new StateListenerClustered(), Operation.REMOVE, false);
   }

//   TODO: commented out until local listners support includeCurrentState
//   public void testRemoveAfterIterationBeganAndCompletedSegmentValueOwnerNotClustered()
//         throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
//      testModificationAfterIterationBeganAndCompletedSegmentValueOwner(new StateListenerNotClustered(), Operation.REMOVE, true);
//   }

   public void testRemoveAfterIterationBeganAndCompletedSegmentValueOwnerClustered()
         throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
      testModificationAfterIterationBeganAndCompletedSegmentValueOwner(new StateListenerClustered(), Operation.REMOVE, true);
   }

   protected void testIterationBeganAndSegmentNotComplete(final StateListener<String, String> listener,
                                                          Operation operation, boolean shouldBePrimaryOwner)
         throws TimeoutException, InterruptedException, ExecutionException {
      final Map<String, String> expectedValues = new HashMap<String, String>(10);
      final Cache<String, String> cache = cache(0, CACHE_NAME);
      for (int i = 0; i < 10; i++) {
         String key = "key-" + i;
         String value = "value-" + i;
         expectedValues.put(key, value);
         cache.put(key, value);
      }

      String value;
      String keyToChange = findKeyBasedOnOwnership(expectedValues.keySet(),
                                                   cache.getAdvancedCache().getDistributionManager().getConsistentHash(),
                                                   shouldBePrimaryOwner, cache.getCacheManager().getAddress());

      switch (operation) {
         case CREATE:
            keyToChange = "new-key";
            value = "new-value";
            break;
         case PUT:
            value = cache.get(keyToChange) + "-changed";
            break;
         case REMOVE:
            value = null;
            break;
         default:
            throw new IllegalArgumentException("Unsupported Operation provided " + operation);
      }

      CheckPoint checkPoint = new CheckPoint();
      int segmentToUse = cache.getAdvancedCache().getDistributionManager().getConsistentHash().getSegment(keyToChange);

      // do the operation, which should put it in the queue.
      ClusterCacheNotifier notifier = waitUntilClosingSegment(cache, segmentToUse, checkPoint);

      Future<Void> future = fork(new Callable<Void>() {

         @Override
         public Void call() throws Exception {
            cache.addListener(listener);
            return null;
         }
      });

      try {
         checkPoint.awaitStrict("pre_complete_segment_invoked", 10, TimeUnit.SECONDS);
         Object oldValue = operation.perform(cache, keyToChange, value);

         // Now let the iteration complete
         checkPoint.triggerForever("pre_complete_segment_released");

         future.get(10, TimeUnit.SECONDS);

         boolean isClustered = isClustered(listener);

         // We should have 1 or 2 (local) events due to the modification coming after we iterated on it.  Note the value
         // isn't brought up until the iteration is done
         assertEquals(listener.events.size(), isClustered ? expectedValues.size() + 1 : (expectedValues.size() + 1) * 2);

         // If it is clustered, then the modify can occur in the middle.  In non clustered we have to block all events
         // just in case of tx event listeners (ie. tx start/tx end would have to wrap all events) and such so we can't
         // release them early.  The cluster listeners aren't affected by transaction since it those are not currently
         // supported
         if (isClustered) {

            CacheEntryEvent event = null;
            boolean foundEarlierCreate = false;
            // We iterate backwards so we only have to do it once
            for (int i = listener.events.size() - 1; i >= 0; --i) {
               CacheEntryEvent currentEvent = listener.events.get(i);
               if (currentEvent.getKey().equals(keyToChange) && operation.getType() == currentEvent.getType()) {
                  if (event == null) {
                     event = currentEvent;
                     // We can remove safely since we are doing backwards counter as well
                     listener.events.remove(i);

                     // If it is a create there is no previous create
                     if (operation.getType() == Event.Type.CACHE_ENTRY_CREATED) {
                        foundEarlierCreate = true;
                        break;
                     }
                  } else {
                     fail("There should only be a single event in the event queue!");
                  }
               } else if (event != null && (foundEarlierCreate = event.getKey().equals(currentEvent.getKey()))) {
                  break;
               }
            }
            // This should have been set
            assertTrue(foundEarlierCreate, "There was no matching create event for key " + event.getKey());

            assertEquals(event.getType(), operation.getType());
            assertEquals(event.isPre(), false);
            assertEquals(event.getValue(), value);
         }

         // Assert the first 10/20 since they should all be from iteration - this may not work since segments complete earlier..
         boolean isPost = true;
         int position = 0;
         for (; position < (isClustered ? expectedValues.size() : expectedValues.size() * 2); ++position) {
            // Even checks means it will be post and have a value - note we force every check to be
            // even for clustered since those should always be post
            if (!isClustered) {
               isPost = !isPost;
            }

            CacheEntryEvent event = listener.events.get(position);

            assertEquals(event.getType(), Event.Type.CACHE_ENTRY_CREATED);
            assertTrue(expectedValues.containsKey(event.getKey()));
            assertEquals(event.isPre(), !isPost);
            if (isPost) {
               assertEquals(event.getValue(), expectedValues.get(event.getKey()));
            } else {
               assertNull(event.getValue());
            }
         }


         // We should have 2 extra events at the end which are our modifications
         if (!isClustered) {
            CacheEntryEvent<String, String> event = listener.events.get(position);
            assertEquals(event.getType(), operation.getType());
            assertEquals(event.isPre(), true);
            assertEquals(event.getKey(), keyToChange);
            assertEquals(event.getValue(), oldValue);

            event = listener.events.get(position + 1);
            assertEquals(event.getType(), operation.getType());
            assertEquals(event.isPre(), false);
            assertEquals(event.getKey(), keyToChange);
            assertEquals(event.getValue(), value);
         }
      } finally {
         TestingUtil.replaceComponent(cache, CacheNotifier.class, notifier, true);
         TestingUtil.replaceComponent(cache, ClusterCacheNotifier.class, notifier, true);
         cache.removeListener(listener);
      }
   }

//   TODO: commented out until local listners support includeCurrentState
//   public void testCreateAfterIterationBeganAndSegmentNotCompleteValueNonOwnerNotClustered()
//         throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
//      testIterationBeganAndSegmentNotComplete(new StateListenerNotClustered(), Operation.CREATE, false);
//   }

   public void testCreateAfterIterationBeganAndSegmentNotCompleteValueNonOwnerClustered()
         throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
      testIterationBeganAndSegmentNotComplete(new StateListenerClustered(), Operation.CREATE, false);
   }

//   TODO: commented out until local listners support includeCurrentState
//   public void testCreateAfterIterationBeganAndSegmentNotCompleteValueOwnerNotClustered()
//         throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
//      testIterationBeganAndSegmentNotComplete(new StateListenerNotClustered(), Operation.CREATE, true);
//   }

   public void testCreateAfterIterationBeganAndSegmentNotCompleteValueOwnerClustered()
         throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
      testIterationBeganAndSegmentNotComplete(new StateListenerClustered(), Operation.CREATE, true);
   }

//   TODO: commented out until local listners support includeCurrentState
//   public void testModificationAfterIterationBeganAndSegmentNotCompleteValueNonOwnerNotClustered()
//         throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
//      testIterationBeganAndSegmentNotComplete(new StateListenerNotClustered(), Operation.PUT, false);
//   }

   public void testModificationAfterIterationBeganAndSegmentNotCompleteValueNonOwnerClustered()
         throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
      testIterationBeganAndSegmentNotComplete(new StateListenerClustered(), Operation.PUT, false);
   }

//   TODO: commented out until local listners support includeCurrentState
//   public void testModificationAfterIterationBeganAndSegmentNotCompleteValueOwnerNotClustered()
//         throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
//      testIterationBeganAndSegmentNotComplete(new StateListenerNotClustered(), Operation.PUT, true);
//   }

   public void testModificationAfterIterationBeganAndSegmentNotCompleteValueOwnerClustered()
         throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
      testIterationBeganAndSegmentNotComplete(new StateListenerClustered(), Operation.PUT, true);
   }

//   TODO: commented out until local listners support includeCurrentState
//   public void testRemoveAfterIterationBeganAndSegmentNotCompleteValueNonOwnerNotClustered()
//         throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
//      testIterationBeganAndSegmentNotComplete(new StateListenerNotClustered(), Operation.REMOVE, false);
//   }

   public void testRemoveAfterIterationBeganAndSegmentNotCompleteValueNonOwnerClustered()
         throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
      testIterationBeganAndSegmentNotComplete(new StateListenerClustered(), Operation.REMOVE, false);
   }

//   TODO: commented out until local listners support includeCurrentState
//   public void testRemoveAfterIterationBeganAndSegmentNotCompleteValueOwnerNotClustered()
//         throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
//      testIterationBeganAndSegmentNotComplete(new StateListenerNotClustered(), Operation.REMOVE, true);
//   }

   public void testRemoveAfterIterationBeganAndSegmentNotCompleteValueOwnerClustered()
         throws InterruptedException, BrokenBarrierException, TimeoutException, ExecutionException, IOException {
      testIterationBeganAndSegmentNotComplete(new StateListenerClustered(), Operation.REMOVE, true);
   }

   private boolean isClustered(StateListener listener) {
      return listener.getClass().getAnnotation(Listener.class).clustered();
   }

   protected static abstract class StateListener<K, V> {
      final List<CacheEntryEvent<K, V>> events = new ArrayList<>();

      @CacheEntryCreated
      @CacheEntryModified
      @CacheEntryRemoved
      public synchronized void onCacheNotification(CacheEntryEvent<K, V> event) {
         events.add(event);
      }
   }
   @Listener(includeCurrentState = true, clustered = false)
   private static class StateListenerNotClustered extends StateListener {

   }

   @Listener(includeCurrentState = true, clustered = true)
   private static class StateListenerClustered extends StateListener {

   }


   protected ClusterCacheNotifier waitUntilClosingSegment(final Cache<?, ?> cache, final int segment, final CheckPoint checkPoint) {
      ClusterCacheNotifier realNotifier = TestingUtil.extractComponent(cache, ClusterCacheNotifier.class);
      ConcurrentMap<UUID, QueueingSegmentListener> listeningMap = new ConcurrentHashMap() {
         @Override
         public Object putIfAbsent(Object key, Object value) {
            final Answer<Object> listenerAnswer = AdditionalAnswers.delegatesTo(value);

            final AtomicBoolean wasLastSegment = new AtomicBoolean(false);
            QueueingSegmentListener mockListener = mock(QueueingSegmentListener.class,
                                                       withSettings().defaultAnswer(listenerAnswer));

            doAnswer(new Answer() {
               @Override
               public Object answer(InvocationOnMock invocation) throws Throwable {
                  wasLastSegment.set(true);
                  return listenerAnswer.answer(invocation);
               }
            }).when(mockListener).segmentTransferred(Mockito.eq(segment), Mockito.eq(true));

            doAnswer(new Answer() {
               @Override
               public Object answer(InvocationOnMock invocation) throws Throwable {
                  // Wait for main thread to sync up
                  checkPoint.trigger("pre_complete_segment_invoked");
                  // Now wait until main thread lets us through
                  checkPoint.awaitStrict("pre_complete_segment_released", 10, TimeUnit.SECONDS);
                  return listenerAnswer.answer(invocation);
               }
               // If this was false means we won't have the notifiedKey callback
            }).when(mockListener).segmentTransferred(Mockito.eq(segment), Mockito.eq(false));

            doAnswer(new Answer() {
               @Override
               public Object answer(InvocationOnMock invocation) throws Throwable {
                  if (wasLastSegment.compareAndSet(true, false)) {
                     // Wait for main thread to sync up
                     checkPoint.trigger("pre_complete_segment_invoked");
                     // Now wait until main thread lets us through
                     checkPoint.awaitStrict("pre_complete_segment_released", 10, TimeUnit.SECONDS);
                  }
                  return listenerAnswer.answer(invocation);
               }
            }).when(mockListener).notifiedKey(Mockito.any());
            return super.putIfAbsent(key, mockListener);
         }
      };
      CacheNotifierImpl notifier = new CacheNotifierImpl(listeningMap);
      TestingUtil.replaceComponent(cache, CacheNotifier.class, notifier, true);
      TestingUtil.replaceComponent(cache, ClusterCacheNotifier.class, notifier, true);
      return realNotifier;
   }

   protected EntryRetriever waitUntilRetrievingIterator(final Cache<?, ?> cache, final CheckPoint checkPoint) {
      EntryRetriever retriever = TestingUtil.extractComponent(cache, EntryRetriever.class);
      final Answer<Object> forwardedAnswer = AdditionalAnswers.delegatesTo(retriever);
      EntryRetriever mockRetriever = mock(EntryRetriever.class, withSettings().defaultAnswer(forwardedAnswer));
      doAnswer(new Answer() {
         @Override
         public Object answer(InvocationOnMock invocation) throws Throwable {
            // Wait for main thread to sync up
            checkPoint.trigger("pre_retrieve_entry_invoked");
            // Now wait until main thread lets us through
            checkPoint.awaitStrict("pre_retrieve_entry_released", 10, TimeUnit.SECONDS);

            return forwardedAnswer.answer(invocation);
         }
      }).when(mockRetriever).retrieveEntries(any(KeyValueFilter.class), any(Converter.class),
                                             any(EntryRetriever.SegmentListener.class));
      TestingUtil.replaceComponent(cache, EntryRetriever.class, mockRetriever, true);
      return retriever;
   }

   protected EntryRetriever waitUntilClosingIterator(final Cache<?, ?> cache, final CheckPoint checkPoint) {
      EntryRetriever retriever = TestingUtil.extractComponent(cache, EntryRetriever.class);
      final Answer<Object> forwardedAnswer = AdditionalAnswers.delegatesTo(retriever);
      EntryRetriever mockRetriever = mock(EntryRetriever.class, withSettings().defaultAnswer(forwardedAnswer));

      doAnswer(new Answer() {
         @Override
         public Object answer(InvocationOnMock invocation) throws Throwable {
            CloseableIterator realIter = (CloseableIterator)forwardedAnswer.answer(invocation);

            final Answer<Object> forwardedIterAnswer = AdditionalAnswers.delegatesTo(realIter);

            CloseableIterator iter = mock(CloseableIterator.class, withSettings().defaultAnswer(forwardedIterAnswer));

            doAnswer(new Answer() {

               @Override
               public Object answer(InvocationOnMock invocationOnMock) throws Throwable {
                  // Wait for main thread to sync up
                  checkPoint.trigger("pre_close_iter_invoked");
                  // Now wait until main thread lets us through
                  checkPoint.awaitStrict("pre_close_iter_released", 10, TimeUnit.SECONDS);
                  return forwardedIterAnswer.answer(invocationOnMock);
               }
            }).when(iter).close();

            return iter;
         }
      }).when(mockRetriever).retrieveEntries(any(KeyValueFilter.class), any(Converter.class),
                                             any(EntryRetriever.SegmentListener.class));
      TestingUtil.replaceComponent(cache, EntryRetriever.class, mockRetriever, true);
      return retriever;
   }
}
TOP

Related Classes of org.infinispan.notifications.cachelistener.CacheNotifierImplInitialTransferDistTest

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.