Package org.infinispan.cli.upgrade

Source Code of org.infinispan.cli.upgrade.CLInterfaceSourceMigrator$GlobalKeysetTask

package org.infinispan.cli.upgrade;

import org.infinispan.Cache;
import org.infinispan.CacheException;
import org.infinispan.configuration.cache.CacheMode;
import org.infinispan.distexec.DefaultExecutorService;
import org.infinispan.distexec.DistributedCallable;
import org.infinispan.distexec.DistributedExecutorService;
import org.infinispan.upgrade.SourceMigrator;

import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/**
* // TODO: Document this
*
* @author Galder Zamarreño
* @since // TODO
*/
public class CLInterfaceSourceMigrator implements SourceMigrator {

   private static final String KNOWN_KEY = "___MigrationManager_CLI_KnownKeys___";

   private final Cache<String, Set<Object>> cache;

   public CLInterfaceSourceMigrator(Cache<?, ?> cache) {
      this.cache = (Cache<String, Set<Object>>) cache;
   }

   @Override
   public void recordKnownGlobalKeyset() {
      try {
         CacheMode cm = cache.getCacheConfiguration().clustering().cacheMode();
         Set<Object> keys;
         if (cm.isReplicated() || !cm.isClustered()) {
            // If cache mode is LOCAL or REPL, dump local keyset.
            // Defensive copy to serialize and transmit across a network
            keys = new HashSet<Object>(cache.keySet());
         } else {
            // If cache mode is DIST, use a map/reduce task
            DistributedExecutorService des = new DefaultExecutorService(cache);
            List<Future<Set<Object>>> keysets = des.submitEverywhere(new GlobalKeysetTask(cache));
            Set<Object> combinedKeyset = new HashSet<Object>();

            for (Future<Set<Object>> keyset : keysets)
               combinedKeyset.addAll(keyset.get());

            keys = combinedKeyset;
         }

         // Remove KNOWN_KEY from the key set - just in case it is there from a previous run.
         keys.remove(KNOWN_KEY);

         cache.put(KNOWN_KEY, keys);
      } catch (InterruptedException e) {
         Thread.currentThread().interrupt(); // reset
      } catch (ExecutionException e) {
         throw new CacheException("Unable to record all known keys", e);
      }
   }

   @Override
   public String getCacheName() {
      return cache.getName();
   }

   static class GlobalKeysetTask implements DistributedCallable<Object, Object, Set<Object>> {

      final Cache<?, ?> cache;

      GlobalKeysetTask(Cache<?, ?> cache) {
         this.cache = cache;
      }

      @Override
      public void setEnvironment(Cache<Object, Object> cache, Set<Object> inputKeys) {
         // TODO: Customise this generated block
      }

      @Override
      public Set<Object> call() throws Exception {
         // Defensive copy to serialize and transmit across a network
         return new HashSet<Object>(cache.keySet());
      }
   }

}
TOP

Related Classes of org.infinispan.cli.upgrade.CLInterfaceSourceMigrator$GlobalKeysetTask

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.