Package org.infinispan.xsite.statetransfer

Source Code of org.infinispan.xsite.statetransfer.XSiteStateTransferManagerImpl

package org.infinispan.xsite.statetransfer;

import org.infinispan.commands.CommandsFactory;
import org.infinispan.commands.remote.CacheRpcCommand;
import org.infinispan.commons.util.CollectionFactory;
import org.infinispan.commons.util.concurrent.NotifyingFutureImpl;
import org.infinispan.commons.util.concurrent.NotifyingNotifiableFuture;
import org.infinispan.configuration.cache.BackupConfiguration;
import org.infinispan.configuration.cache.Configuration;
import org.infinispan.configuration.cache.SitesConfiguration;
import org.infinispan.factories.annotations.ComponentName;
import org.infinispan.factories.annotations.Inject;
import org.infinispan.remoting.LocalInvocation;
import org.infinispan.remoting.responses.ExceptionResponse;
import org.infinispan.remoting.responses.Response;
import org.infinispan.remoting.responses.ResponseGenerator;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.BackupResponse;
import org.infinispan.xsite.XSiteBackup;
import org.infinispan.xsite.XSiteReplicateCommand;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;

import static java.lang.String.format;
import static org.infinispan.factories.KnownComponentNames.ASYNC_TRANSPORT_EXECUTOR;
import static org.infinispan.xsite.statetransfer.XSiteStateTransferControlCommand.StateTransferControl;

/**
* {@link org.infinispan.xsite.statetransfer.XSiteStateTransferManager} implementation.
*
* @author Pedro Ruivo
* @since 7.0
*/
public class XSiteStateTransferManagerImpl implements XSiteStateTransferManager {

   private final ConcurrentMap<String, XSiteStateTransferCollector> siteCollector;
   private RpcManager rpcManager;
   private Configuration configuration;
   private CommandsFactory commandsFactory;
   private ResponseGenerator responseGenerator;
   private ExecutorService asyncExecutor;

   public XSiteStateTransferManagerImpl() {
      siteCollector = CollectionFactory.makeConcurrentMap();
   }

   @Inject
   public void inject(RpcManager rpcManager, Configuration configuration, CommandsFactory commandsFactory,
                      ResponseGenerator responseGenerator,
                      @ComponentName(value = ASYNC_TRANSPORT_EXECUTOR) ExecutorService asyncExecutor) {
      this.rpcManager = rpcManager;
      this.configuration = configuration;
      this.commandsFactory = commandsFactory;
      this.responseGenerator = responseGenerator;
      this.asyncExecutor = asyncExecutor;
   }

   @Override
   public void notifyStatePushFinished(String siteName, Address node) throws Throwable {
      XSiteStateTransferCollector collector = siteCollector.get(siteName);
      if (collector == null) {
         return;
      }
      final XSiteBackup xSiteBackup = findSite(siteName);
      if (collector.confirmStateTransfer(node)) {
         siteCollector.remove(siteName);
         controlStateTransferOnRemoteSite(xSiteBackup, StateTransferControl.FINISH_RECEIVE);
      }
   }

   @Override
   public final void startPushState(String siteName) throws Throwable {
      //check site name first
      if (siteName == null) {
         throw new NullPointerException("Site name cannot be null!");
      }
      final XSiteBackup xSiteBackup = findSite(siteName);
      if (xSiteBackup == null) {
         throw new IllegalArgumentException("Site " + siteName + " not found!");
      }

      if (siteCollector.putIfAbsent(siteName, new XSiteStateTransferCollector(rpcManager.getMembers())) != null) {
         throw new Exception(format("X-Site state transfer to '%s' already started!", siteName));
      }

      try {
         controlStateTransferOnRemoteSite(xSiteBackup, StateTransferControl.START_RECEIVE);
         controlStateTransferOnLocalSite(StateTransferControl.START_SEND, siteName);
      } catch (Throwable throwable) {
         handleFailure(xSiteBackup);
         throw new Exception(throwable);
      }
   }

   @Override
   public List<String> getRunningStateTransfers() {
      return siteCollector.isEmpty() ? Collections.<String>emptyList() : new ArrayList<String>(siteCollector.keySet());
   }

   private void handleFailure(XSiteBackup xSiteBackup) {
      try {
         controlStateTransferOnLocalSite(StateTransferControl.CANCEL_SEND, xSiteBackup.getSiteName());
      } catch (Exception e) {
         //ignored
      }
      try {
         controlStateTransferOnRemoteSite(xSiteBackup, StateTransferControl.FINISH_RECEIVE);
      } catch (Throwable throwable) {
         //ignored
      }
   }

   private void controlStateTransferOnRemoteSite(XSiteBackup xSiteBackup, StateTransferControl control) throws Throwable {
      XSiteStateTransferControlCommand command = commandsFactory.buildXSiteStateTransferControlCommand(control, null);
      BackupResponse response = invokeRemotelyInRemoteSite(command, xSiteBackup);
      response.waitForBackupToFinish();
      if (!response.getFailedBackups().values().isEmpty()) {
         throw response.getFailedBackups().values().iterator().next();
      }
   }

   private void controlStateTransferOnLocalSite(StateTransferControl control, String siteName) throws Exception {
      XSiteStateTransferControlCommand command = commandsFactory.buildXSiteStateTransferControlCommand(control, siteName);
      for (Map.Entry<Address, Response> entry : invokeRemotelyInLocalSite(command).entrySet()) {
         if (entry.getValue() instanceof ExceptionResponse) {
            throw ((ExceptionResponse) entry.getValue()).getException();
         }
      }
   }

   private XSiteBackup findSite(String siteName) {
      SitesConfiguration sites = configuration.sites();
      for (BackupConfiguration bc : sites.allBackups()) {
         if (bc.site().equals(siteName)) {
            return new XSiteBackup(bc.site(), true, bc.stateTransfer().timeout());
         }
      }
      return null;
   }

   private Map<Address, Response> invokeRemotelyInLocalSite(CacheRpcCommand command) throws Exception {
      commandsFactory.initializeReplicableCommand(command, false);
      final NotifyingNotifiableFuture<Object> remoteFuture = new NotifyingFutureImpl<Object>();
      rpcManager.invokeRemotelyInFuture(null, command, rpcManager.getDefaultRpcOptions(true, false), remoteFuture);
      final Future<Response> localFuture = asyncExecutor.submit(
            LocalInvocation.newInstance(responseGenerator, command, commandsFactory, rpcManager.getAddress()));
      final Map<Address, Response> responseMap = new HashMap<Address, Response>();
      responseMap.put(rpcManager.getAddress(), localFuture.get());
      //noinspection unchecked
      responseMap.putAll((Map<? extends Address, ? extends Response>) remoteFuture.get());
      return responseMap;
   }

   private BackupResponse invokeRemotelyInRemoteSite(XSiteReplicateCommand command, XSiteBackup xSiteBackup) throws Exception {
      return rpcManager.getTransport().backupRemotely(Collections.singletonList(xSiteBackup), command);
   }

}
TOP

Related Classes of org.infinispan.xsite.statetransfer.XSiteStateTransferManagerImpl

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.