Package org.infinispan.xsite.statetransfer.failures

Source Code of org.infinispan.xsite.statetransfer.failures.StateTransferLinkFailuresTest$ControllerTransport

package org.infinispan.xsite.statetransfer.failures;

import org.infinispan.Cache;
import org.infinispan.configuration.cache.BackupConfigurationBuilder;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.configuration.cache.ConfigurationBuilder;
import org.infinispan.context.Flag;
import org.infinispan.manager.CacheContainer;
import org.infinispan.remoting.transport.AbstractDelegatingTransport;
import org.infinispan.remoting.transport.BackupResponse;
import org.infinispan.remoting.transport.Transport;
import org.infinispan.statetransfer.CommitManager;
import org.infinispan.test.TestingUtil;
import org.infinispan.util.concurrent.TimeoutException;
import org.infinispan.xsite.AbstractTwoSitesTest;
import org.infinispan.xsite.XSiteAdminOperations;
import org.infinispan.xsite.XSiteBackup;
import org.infinispan.xsite.XSiteReplicateCommand;
import org.infinispan.xsite.statetransfer.XSiteStatePushCommand;
import org.infinispan.xsite.statetransfer.XSiteStateTransferManager;
import org.testng.AssertJUnit;
import org.testng.annotations.Test;

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.infinispan.test.TestingUtil.extractComponent;
import static org.infinispan.test.TestingUtil.wrapGlobalComponent;
import static org.infinispan.xsite.XSiteAdminOperations.SUCCESS;
import static org.testng.AssertJUnit.*;

/**
* Tests the Cross-Site replication state transfer with a broken connection between sites.
*
* @author Pedro Ruivo
* @since 7.0
*/
@Test(groups = "xsite", testName = "xsite.statetransfer.failures.StateTransferLinkFailuresTest")
public class StateTransferLinkFailuresTest extends AbstractTwoSitesTest {

   protected static final int NR_KEYS = 20; //10 * chunk size

   public StateTransferLinkFailuresTest() {
      super();
      this.cleanup = CleanupPhase.AFTER_METHOD;
      this.implicitBackupCache = true;
   }

   /*
   tests:
   * request the state transfer without link to other site.
   * lose link while transferring data
    */

   public void testStartStateTransferWithoutLink() throws Exception {
      initBeforeTest();
      List<ControllerTransport> transports = replaceTransportInSite(LON);
      for (ControllerTransport transport : transports) {
         transport.fail = true;
      }

      assertTrue(!SUCCESS.equals(extractComponent(cache(LON, 0), XSiteAdminOperations.class).pushState(NYC)));
      assertDataInSite(LON);
      assertInSite(NYC, new AssertCondition<Object, Object>() {
         @Override
         public void assertInCache(Cache<Object, Object> cache) {
            AssertJUnit.assertTrue(cache.isEmpty());
         }
      });
      assertTrue(getStatus(LON).isEmpty());

   }

   public void testLinkBrokenDuringStateTransfer() {
      initBeforeTest();
      initBeforeTest();
      List<ControllerTransport> transports = replaceTransportInSite(LON);
      for (ControllerTransport transport : transports) {
         transport.failAfterFirstChunk = true;
      }

      startStateTransfer(cache(LON, 0), NYC);
      assertOnline(LON, NYC);

      assertEventuallyInSite(LON, new EventuallyAssertCondition<Object, Object>() {
         @Override
         public boolean assertInCache(Cache<Object, Object> cache) {
            return extractComponent(cache, XSiteStateTransferManager.class).getRunningStateTransfers().isEmpty();
         }
      }, 1, TimeUnit.MINUTES);

      assertEquals(1, getStatus(LON).size());
      assertEquals(XSiteStateTransferManager.STATUS_ERROR, getStatus(LON).get(NYC));

      assertInSite(NYC, new AssertCondition<Object, Object>() {
         @Override
         public void assertInCache(Cache<Object, Object> cache) {
            //link is broken. NYC is still expecting state.
            assertTrue(extractComponent(cache, CommitManager.class).isTracking(Flag.PUT_FOR_X_SITE_STATE_TRANSFER));
            assertEquals(LON, extractComponent(cache, XSiteAdminOperations.class).getSendingSiteName());
         }
      });

      assertEquals(SUCCESS, extractComponent(cache(NYC, 0), XSiteAdminOperations.class).cancelReceiveState(LON));

      assertInSite(NYC, new AssertCondition<Object, Object>() {
         @Override
         public void assertInCache(Cache<Object, Object> cache) {
            assertFalse(extractComponent(cache, CommitManager.class).isTracking(Flag.PUT_FOR_X_SITE_STATE_TRANSFER));
            assertNull(extractComponent(cache, XSiteAdminOperations.class).getSendingSiteName());
         }
      });

      assertEquals(SUCCESS, extractComponent(cache(LON, 0), XSiteAdminOperations.class).clearPushStateStatus());
      assertTrue(getStatus(LON).isEmpty());
   }

   @Override
   protected ConfigurationBuilder getNycActiveConfig() {
      return createConfiguration();
   }

   @Override
   protected ConfigurationBuilder getLonActiveConfig() {
      return createConfiguration();
   }

   protected static ConfigurationBuilder createConfiguration() {
      ConfigurationBuilder builder = getDefaultClusteredCacheConfig(CacheMode.DIST_SYNC, false);
      builder.clustering().hash().numOwners(2);
      return builder;
   }

   @Override
   protected void adaptLONConfiguration(BackupConfigurationBuilder builder) {
      builder.stateTransfer().chunkSize(2).timeout(2000);
   }


   private void putData() {
      for (int i = 0; i < NR_KEYS; ++i) {
         cache(LON, 0).put(key(Integer.toString(i)), val(Integer.toString(i)));
      }
   }

   private void initBeforeTest() {
      takeSiteOffline(LON, NYC);
      assertOffline(LON, NYC);
      putData();
      assertDataInSite(LON);
      assertInSite(NYC, new AssertCondition<Object, Object>() {
         @Override
         public void assertInCache(Cache<Object, Object> cache) {
            assertTrue(cache.isEmpty());
         }
      });
   }

   private void startStateTransfer(Cache<?, ?> coordinator, String toSite) {
      XSiteAdminOperations operations = extractComponent(coordinator, XSiteAdminOperations.class);
      assertEquals(SUCCESS, operations.pushState(toSite));
   }

   private void takeSiteOffline(String localSite, String remoteSite) {
      XSiteAdminOperations operations = extractComponent(cache(localSite, 0), XSiteAdminOperations.class);
      assertEquals(SUCCESS, operations.takeSiteOffline(remoteSite));
   }

   private void assertOffline(String localSite, String remoteSite) {
      XSiteAdminOperations operations = extractComponent(cache(localSite, 0), XSiteAdminOperations.class);
      assertEquals(XSiteAdminOperations.OFFLINE, operations.siteStatus(remoteSite));
   }

   private void assertOnline(String localSite, String remoteSite) {
      XSiteAdminOperations operations = extractComponent(cache(localSite, 0), XSiteAdminOperations.class);
      assertEquals(XSiteAdminOperations.ONLINE, operations.siteStatus(remoteSite));
   }

   private Map<String, String> getStatus(String localSite) {
      XSiteAdminOperations operations = extractComponent(cache(localSite, 0), XSiteAdminOperations.class);
      return operations.getPushStateStatus();
   }

   private void assertDataInSite(String siteName) {
      assertInSite(siteName, new AssertCondition<Object, Object>() {
         @Override
         public void assertInCache(Cache<Object, Object> cache) {
            for (int i = 0; i < NR_KEYS; ++i) {
               assertEquals(val(Integer.toString(i)), cache.get(key(Integer.toString(i))));
            }
         }
      });
   }

   private List<ControllerTransport> replaceTransportInSite(String site) {
      List<ControllerTransport> transports = new ArrayList<>(site(site).cacheManagers().size());
      for (CacheContainer cacheContainer : site(site).cacheManagers()) {
         transports.add(wrapGlobalComponent(cacheContainer,
                                            Transport.class,
                                            new TestingUtil.WrapFactory<Transport, ControllerTransport, CacheContainer>() {
                                               @Override
                                               public ControllerTransport wrap(CacheContainer wrapOn, Transport current) {
                                                  return new ControllerTransport(current);
                                               }
                                            }, true));
      }
      return transports;
   }

   private static class ControllerTransport extends AbstractDelegatingTransport {

      private volatile boolean fail;
      private volatile boolean failAfterFirstChunk;

      public ControllerTransport(Transport actual) {
         super(actual);
      }

      @Override
      public void start() {
         //no-op; avoid re-start the transport again...
      }

      @Override
      public BackupResponse backupRemotely(Collection<XSiteBackup> backups, XSiteReplicateCommand rpcCommand) throws Exception {
         if (fail) {
            throw new TimeoutException("induced timeout!");
         } else if (failAfterFirstChunk && rpcCommand instanceof XSiteStatePushCommand) {
            fail = true;
         }
         return super.backupRemotely(backups, rpcCommand);
      }
   }
}
TOP

Related Classes of org.infinispan.xsite.statetransfer.failures.StateTransferLinkFailuresTest$ControllerTransport

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.