Package com.nokia.dempsy.executor

Source Code of com.nokia.dempsy.executor.DefaultDempsyExecutor$ProxyFuture

package com.nokia.dempsy.executor;

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;

public class DefaultDempsyExecutor implements DempsyExecutor
{
   private ScheduledExecutorService schedule = null;
   private ThreadPoolExecutor executor = null;
   private AtomicLong numLimited = null;
   private long maxNumWaitingLimitedTasks = -1;
   private int threadPoolSize = -1;
   private static final int minNumThreads = 4;
  
   private double m = 1.25;
   private int additionalThreads = 2;
  
   public DefaultDempsyExecutor() { }
  
   /**
    * Create a DefaultDempsyExecutor with a fixed number of threads while setting the
    * maximum number of limited tasks.
    */
   public DefaultDempsyExecutor(int threadPoolSize, int maxNumWaitingLimitedTasks)
   {
      this.threadPoolSize = threadPoolSize;
      this.maxNumWaitingLimitedTasks = maxNumWaitingLimitedTasks;
   }
  
   /**
    * <p>Prior to calling start you can set the cores factor and additional
    * cores. Ultimately the number of threads in the pool will be given by:</p>
    *
    * <p>num threads = m * num cores + b</p>
    *
    * <p>Where 'm' is set by setCoresFactor and 'b' is set by setAdditionalThreads</p>
    */
   public void setCoresFactor(double m){ this.m = m; }
  
   /**
    * <p>Prior to calling start you can set the cores factor and additional
    * cores. Ultimately the number of threads in the pool will be given by:</p>
    *
    * <p>num threads = m * num cores + b</p>
    *
    * <p>Where 'm' is set by setCoresFactor and 'b' is set by setAdditionalThreads</p>
    */
   public void setAdditionalThreads(int additionalThreads){ this.additionalThreads = additionalThreads; }
  
   @Override
   public void start()
   {
      if (threadPoolSize == -1)
      {
         // figure out the number of cores.
         int cores = Runtime.getRuntime().availableProcessors();
         int cpuBasedThreadCount = (int)Math.ceil((double)cores * m) + additionalThreads; // why? I don't know. If you don't like it
                                                                                          //   then use the other constructor
         threadPoolSize = Math.max(cpuBasedThreadCount, minNumThreads);
      }
      executor = (ThreadPoolExecutor)Executors.newFixedThreadPool(threadPoolSize);
      schedule = Executors.newSingleThreadScheduledExecutor();
      numLimited = new AtomicLong(0);
     
      if (maxNumWaitingLimitedTasks < 0)
         maxNumWaitingLimitedTasks = 20 * threadPoolSize;
   }
  
   public int getMaxNumberOfQueuedLimitedTasks() { return (int)maxNumWaitingLimitedTasks; }
  
   public void setMaxNumberOfQueuedLimitedTasks(int maxNumWaitingLimitedTasks) { this.maxNumWaitingLimitedTasks = maxNumWaitingLimitedTasks; }
  
   @Override
   public int getNumThreads() { return threadPoolSize; }
  
   @Override
   public void shutdown()
   {
      if (executor != null)
         executor.shutdown();
     
      if (schedule != null)
         schedule.shutdown();
   }

   @Override
   public int getNumberPending()
   {
      return executor.getQueue().size();
   }
  
   @Override
   public int getNumberLimitedPending()
   {
      return numLimited.intValue();
   }

  
   public boolean isRunning() { return (schedule != null && executor != null) &&
         !(schedule.isShutdown() || schedule.isTerminated()) &&
         !(executor.isShutdown() || executor.isTerminated()); }
  
   @Override
   public <V> Future<V> submit(Callable<V> r) { return executor.submit(r); }

   @Override
   public <V> Future<V> submitLimited(final Rejectable<V> r)
   {
      Callable<V> task = new Callable<V>()
      {
         Rejectable<V> o = r;

         @Override
         public V call() throws Exception
         {
            long num = numLimited.decrementAndGet();
            if (num <= maxNumWaitingLimitedTasks)
               return o.call();
            o.rejected();
            return null;
         }
      };
     
      numLimited.incrementAndGet();

      Future<V> ret = executor.submit(task);
      return ret;
   }
  
   @Override
   public <V> Future<V> schedule(final Callable<V> r, long delay, TimeUnit unit)
   {
      final ProxyFuture<V> ret = new ProxyFuture<V>();
     
      // here we are going to wrap the Callable and the Future to change the
      // submission to one of the other queues.
      ret.schedFuture = schedule.schedule(new Runnable(){
         Callable<V> callable = r;
         ProxyFuture<V> rret = ret;
         @Override
         public void run()
         {
            // now resubmit the callable we're proxying
            rret.set(submit(callable));
         }
      }, delay, unit);
     
      // proxy the return future.
      return ret;
   }

   private static class ProxyFuture<V> implements Future<V>
   {
      private volatile Future<V> ret;
      private volatile ScheduledFuture<?> schedFuture;
     
      private synchronized void set(Future<V> f)
      {
         ret = f;
         if (schedFuture.isCancelled())
            ret.cancel(true);
         this.notifyAll();
      }
     
      // called only from synchronized methods
      private Future<?> getCurrent() { return ret == null ? schedFuture : ret; }
     
      @Override
      public synchronized boolean cancel(boolean mayInterruptIfRunning){ return getCurrent().cancel(mayInterruptIfRunning); }

      @Override
      public synchronized boolean isCancelled() { return getCurrent().isCancelled(); }

      @Override
      public synchronized boolean isDone() { return ret == null ? false : ret.isDone(); }

      @Override
      public synchronized V get() throws InterruptedException, ExecutionException
      {
         while (ret == null)
            this.wait();
         return ret.get();
      }

      @Override
      public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
      {
         long cur = System.currentTimeMillis();
         while (ret == null)
            this.wait(unit.toMillis(timeout));
         return ret.get(System.currentTimeMillis() - cur,TimeUnit.MILLISECONDS);
      }
   }
  
}
TOP

Related Classes of com.nokia.dempsy.executor.DefaultDempsyExecutor$ProxyFuture

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.