Package org.apache.geronimo.concurrent.executor

Source Code of org.apache.geronimo.concurrent.executor.AbstractManagedScheduledExecutorService

/**
*  Licensed to the Apache Software Foundation (ASF) under one or more
*  contributor license agreements.  See the NOTICE file distributed with
*  this work for additional information regarding copyright ownership.
*  The ASF licenses this file to You under the Apache License, Version 2.0
*  (the "License"); you may not use this file except in compliance with
*  the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
*  Unless required by applicable law or agreed to in writing, software
*  distributed under the License is distributed on an "AS IS" BASIS,
*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*  See the License for the specific language governing permissions and
*  limitations under the License.
*/

/*
* This class is based on and borrows code from java.util.concurrent.ScheduledThreadPoolExecutor
* class in Apache Harmony.
*/
package org.apache.geronimo.concurrent.executor;

import java.util.AbstractCollection;
import java.util.Collection;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.Delayed;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;

import javax.util.concurrent.ManagedScheduledExecutorService;
import javax.util.concurrent.ManagedTaskListener;
import javax.util.concurrent.Trigger;

import org.apache.geronimo.concurrent.ManagedContext;
import org.apache.geronimo.concurrent.ManagedTaskListenerSupport;

public abstract class AbstractManagedScheduledExecutorService
    extends AbstractManagedExecutorService
    implements ManagedScheduledExecutorService {

    /**
     * False if should cancel/suppress periodic tasks on shutdown.
     */
    private volatile boolean continueExistingPeriodicTasksAfterShutdown;

    /**
     * False if should cancel non-periodic tasks on shutdown.
     */
    private volatile boolean executeExistingDelayedTasksAfterShutdown = true;

    /**
     * Sequence number to break scheduling ties, and in turn to
     * guarantee FIFO order among tied entries.
     */
    private static final AtomicLong sequencer = new AtomicLong(0);

    private class ScheduledFutureTask<V>
            extends ManagedFutureTask<V> implements ScheduledFuture<V> {
                       
        /** Sequence number to break ties FIFO */
        private final long sequenceNumber;
       
        private final Trigger trigger;
       
        private Date lastActualRunTime;
        private Date lastScheduledRunTime;
        private Date lastCompleteTime;
        private Date submitTime;
       
        ScheduledFutureTask(Runnable r,
                            V result,
                            ManagedContext managedContext,
                            ManagedTaskListenerSupport listener,
                            Date triggerTime) {
            super(r, result, managedContext, listener);
            this.lastScheduledRunTime = triggerTime;
            this.trigger = null;
            this.submitTime = new Date();
            this.sequenceNumber = sequencer.getAndIncrement();
        }
       
        ScheduledFutureTask(Runnable r,
                            V result,
                            ManagedContext managedContext,
                            ManagedTaskListenerSupport listener,
                            Trigger trigger) {
            super(r, result, managedContext, listener);
            this.trigger = trigger;
            this.submitTime = new Date();
            this.sequenceNumber = sequencer.getAndIncrement();
            getNextRunTime();
        }

        ScheduledFutureTask(Callable<V> callable,
                            ManagedContext managedContext,
                            ManagedTaskListenerSupport listener,
                            Date triggerTime) {
            super(callable, managedContext, listener);
            this.lastScheduledRunTime = triggerTime;
            this.trigger = null;
            this.submitTime = new Date();
            this.sequenceNumber = sequencer.getAndIncrement();
        }
       
        ScheduledFutureTask(Callable<V> callable,
                            ManagedContext managedContext,
                            ManagedTaskListenerSupport listener,
                            Trigger trigger) {
            super(callable, managedContext, listener);
            this.trigger = trigger;
            this.submitTime = new Date();
            this.sequenceNumber = sequencer.getAndIncrement();
            getNextRunTime();
        }
       
        private void getNextRunTime() {  
            /* XXX: The spec does not define (yet) what should happen if trigger.getNextRunTime()
             * returns null on the verify first time. We throw RejectedExecutionException
             * in such case since the task will not run.             
             */
            this.lastScheduledRunTime =
                trigger.getNextRunTime(this, this.submitTime, null, null, null)
            if (this.lastScheduledRunTime == null) {
                throw new RejectedExecutionException("Null nextRunTime returned by Trigger");
            }           
        }
       
        public long getDelay(TimeUnit unit) {
            long d = unit.convert(this.lastScheduledRunTime.getTime() - System.currentTimeMillis(), TimeUnit.MILLISECONDS);        
            return d;
        }

        public int compareTo(Delayed other) {
            if (other == this) // compare zero ONLY if same object
                return 0;
            ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
            long diff = this.lastScheduledRunTime.getTime() - x.lastScheduledRunTime.getTime();
            if (diff < 0)
                return -1;
            else if (diff > 0)
                return 1;
            else if (sequenceNumber < x.sequenceNumber)
                return -1;
            else
                return 1;
        }

        /**
         * Returns true if this is a periodic (not a one-shot) action.
         * @return true if periodic
         */
        boolean isPeriodic() {
            return this.trigger != null;
        }

        /**
         * Run a periodic task
         */
        private void runPeriodicSub() {
            boolean ok = false;
            if (isCancelled()) {
                ok = false;
                if (this.listenerSupport != null) {
                    this.listenerSupport.taskDone(this, null);
                }
            } else {
                if (this.trigger.skipRun(this, this.lastScheduledRunTime)) {
                    // skip run, task is rescheduled only if not cancelled
                    ok = super.setSkipped();
                    if (this.listenerSupport != null) {
                        this.listenerSupport.taskDone(this, null);
                    }
                } else {
                    // run now
                    this.lastActualRunTime = new Date();
                    ok = super.runAndReset();
                    this.lastCompleteTime = new Date();
                }
            }
            boolean down = isShutdown();
            // Reschedule if not cancelled and not shutdown or policy allows
            if (ok && (!down ||
                       (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
                        !isTerminating()))) {
                this.lastScheduledRunTime =
                    this.trigger.getNextRunTime(this,
                                                this.submitTime,
                                                this.lastActualRunTime,
                                                this.lastScheduledRunTime,
                                                this.lastCompleteTime);
                if (this.lastScheduledRunTime != null) {
                    // reschedule the task
                    if (this.listenerSupport != null) {
                        this.listenerSupport.taskSubmitted(this);
                    }
                    AbstractManagedScheduledExecutorService.super.getQueue().add(this);
                }
            } else if (down) {
                // This might have been the final executed delayed
                // task.  Wake up threads to check.          
                interruptIdleWorkers();
            }
        }
       
        private void runPeriodic() {
            if (this.setContextOnRun && this.managedContext != null) {
                Map<String, Object> threadContext = null;
                try {
                    threadContext = this.managedContext.set();
                } catch (RuntimeException e) {
                    LOG.warn("Failed to apply context to thread for task " + this + ": " +
                             e.getMessage() + ". Cancelling task");       
                    cancelFutureTask(false);
                    return;
                }
                try {
                    runPeriodicSub();
                } finally {
                    this.managedContext.unset(threadContext);
                }
            } else {
                runPeriodicSub();
            }
        }

        /**
         * Overrides FutureTask version so as to reset/requeue if periodic.
         */
        public void run() {
            if (isPeriodic()) {
                runPeriodic();
            } else {
                super.run();
            }
        }
    }

    public AbstractManagedScheduledExecutorService(int corePoolSize,
                                                   ThreadFactory threadFactory) {
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
              new DelayedWorkQueue(), threadFactory);
    }
       
    /**
     * Specialized variant of ThreadPoolExecutor.execute for delayed tasks.
     */
    protected void delayedExecute(ScheduledFutureTask<?> task) {
        preExecute(task);
        super.getQueue().add(task);
    }
   
    protected void delayedExecute(ManagedFutureTask<?> task) {
        preExecute(task);
        // wrap ManagedFutureTask into ScheduledFutureTask
        Date triggerTime = new Date();
        ScheduledFutureTask<?> t =
            new ScheduledFutureTask<Object>(task, null, null, null, triggerTime);   
        super.getQueue().add(t);
    }
   
    protected void preExecute(ManagedFutureTask<?> task) {
        ManagedTaskListenerSupport listenerSupport = task.getManagedTaskListenerSupport();
       
        if (listenerSupport != null) {
            listenerSupport.taskSubmitted(task);
        }
       
        if (isShutdown()) {
            try {
                reject(task);
            } catch (RejectedExecutionException exception) {
                if (listenerSupport != null) {
                    listenerSupport.taskDone(task, exception);
                }
                throw exception;
            }
            return;
        }
       
        // Prestart a thread if necessary. We cannot prestart it
        // running the task because the task (probably) shouldn't be
        // run yet, so thread will just idle until delay elapses.
        if (getPoolSize() < getCorePoolSize()) {
            prestartCoreThread();
        }
    }   

    /**
     * Cancel and clear the queue of all tasks that should not be run
     * due to shutdown policy.
     */
    private void cancelUnwantedTasks() {
        boolean keepDelayed = getExecuteExistingDelayedTasksAfterShutdownPolicy();
        boolean keepPeriodic = getContinueExistingPeriodicTasksAfterShutdownPolicy();
        if (!keepDelayed && !keepPeriodic)
            super.getQueue().clear();
        else if (keepDelayed || keepPeriodic) {
            Object[] entries = super.getQueue().toArray();
            for (int i = 0; i < entries.length; ++i) {
                Object e = entries[i];
                if (e instanceof ScheduledFutureTask) {
                    ScheduledFutureTask<?> t = (ScheduledFutureTask<?>)e;
                    if (t.isPeriodic()? !keepPeriodic : !keepDelayed)
                        t.cancel(false);
                }
            }
            entries = null;
            purge();
        }
    }

    public boolean remove(Runnable task) {
        if (!(task instanceof ScheduledFutureTask))
            return false;
        return getQueue().remove(task);
    }
 
    // *** schedule commands ***
   
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        return schedule(command, null, delay, unit, null);
    }

    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit,
                                       ManagedTaskListener listener) {
        return schedule(command, null, delay, unit, listener);
    }
   
    protected <T> ScheduledFuture<T> schedule(Runnable command,
                                              T result,
                                              long delay,
                                              TimeUnit unit,
                                              ManagedTaskListener listener) {
        if (command == null || unit == null) {
            throw new NullPointerException();
        }
        Date triggerTime = new Date(System.currentTimeMillis() + unit.toMillis(delay));
        ManagedTaskListenerSupport listenerSupport = getManagedTaskListenerSupport(listener);
        ManagedContext managedContext = getManagedContext();
        ScheduledFutureTask<T> t = new ScheduledFutureTask<T>(command,
                                                              result,
                                                              managedContext,
                                                              listenerSupport,
                                                              triggerTime);
        delayedExecute(t);
        return t;
    }
   
    public ScheduledFuture<?> schedule(Runnable command,
                                       Trigger trigger,
                                       ManagedTaskListener listener) {
        if (command == null || trigger == null) {
            throw new NullPointerException();
        }
        ManagedTaskListenerSupport listenerSupport = getManagedTaskListenerSupport(listener);
        ManagedContext managedContext = getManagedContext();
        ScheduledFutureTask<?> t = new ScheduledFutureTask<Boolean>(command,
                                                                    null,
                                                                    managedContext,
                                                                    listenerSupport,
                                                                    trigger);
        delayedExecute(t);
        return t;
    }
   
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit) {
        return schedule(callable, delay, unit, null);
    }
   
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit,
                                           ManagedTaskListener listener) {
        if (callable == null || unit == null) {
            throw new NullPointerException();
        }
        if (delay < 0) delay = 0;
        Date triggerTime = new Date(System.currentTimeMillis() + unit.toMillis(delay));
        ManagedTaskListenerSupport listenerSupport = getManagedTaskListenerSupport(listener);
        ManagedContext managedContext = getManagedContext();
        ScheduledFutureTask<V> t = new ScheduledFutureTask<V>(callable,
                                                              managedContext,
                                                              listenerSupport,
                                                              triggerTime);
        delayedExecute(t);
        return t;
    }
   
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           Trigger trigger,
                                           ManagedTaskListener listener) {
        if (callable == null || trigger == null) {
            throw new NullPointerException();
        }
        ManagedTaskListenerSupport listenerSupport = getManagedTaskListenerSupport(listener);
        ManagedContext managedContext = getManagedContext();
        ScheduledFutureTask<V> t = new ScheduledFutureTask<V>(callable,
                                                              managedContext,
                                                              listenerSupport,
                                                              trigger);
        delayedExecute(t);
        return t;
    }
   
    // *** scheduleAt* commands *****
        
    private static class PeriodicTrigger implements Trigger {

        private long period;
        private long initialDelay;
        private boolean rate;

        public PeriodicTrigger(long initialDelay, long period, boolean rate) {
            this.initialDelay = initialDelay;
            this.period = period;
            this.rate = rate;
        }
       
        public Date getNextRunTime(Future<?> future,
                                   Date submitTime,
                                   Date lastActualRunTime,
                                   Date lastScheduledRunTime,
                                   Date lastCompleteTime) {
            long nextRunTime;
            if (lastScheduledRunTime == null) {
                nextRunTime = System.currentTimeMillis() + this.initialDelay;
            } else {
                if (this.rate) {
                    nextRunTime = lastScheduledRunTime.getTime() + this.period;
                } else {
                    nextRunTime = System.currentTimeMillis() + this.period;
                }
            }
            return new Date(nextRunTime);
        }

        public boolean skipRun(Future<?> arg0, Date arg1) {
            return false;
        }

    }
   
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay, 
                                                  long period,
                                                  TimeUnit unit) {
        return scheduleAtFixedRate(command, initialDelay, period, unit, null);
    }
   
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit,
                                                  ManagedTaskListener listener) {
        if (command == null || unit == null) {
            throw new NullPointerException();
        }
        if (period <= 0) {
            throw new IllegalArgumentException();
        }
        if (initialDelay < 0) initialDelay = 0;
        PeriodicTrigger trigger =
            new PeriodicTrigger(unit.toMillis(initialDelay), unit.toMillis(period), true);
        return schedule(command, trigger, listener);
    }
   
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay, 
                                                     long delay,
                                                     TimeUnit unit) {
        return scheduleWithFixedDelay(command, initialDelay, delay, unit, null);
    }
   
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit,
                                                     ManagedTaskListener listener) {
        if (command == null || unit == null) {
            throw new NullPointerException();
        }
        if (delay <= 0) {
            throw new IllegalArgumentException();
        }
        if (initialDelay < 0) initialDelay = 0;
        PeriodicTrigger trigger =
            new PeriodicTrigger(unit.toMillis(initialDelay), unit.toMillis(delay), false);
        return schedule(command, trigger, listener);
    }


    // Override ExecutorService methods

    public void execute(Runnable command) {
        schedule(command, null, 0, TimeUnit.NANOSECONDS, null);
    }
  
    public Future<?> submit(Runnable task) {
        return schedule(task, null, 0, TimeUnit.NANOSECONDS, null);
    }

    public <T> Future<T> submit(Runnable task, T result) {
        return schedule(task, result, 0, TimeUnit.NANOSECONDS, null);
    }

    public <T> Future<T> submit(Callable<T> task) {
        return schedule(task, 0, TimeUnit.NANOSECONDS, null);
    }
     
    public Future<?> submit(Runnable task, ManagedTaskListener listener) {
        return schedule(task, null, 0, TimeUnit.NANOSECONDS, listener);
    }

    public <T> Future<T> submit(Runnable task, T result, ManagedTaskListener listener) {
        return schedule(task, result, 0, TimeUnit.NANOSECONDS, listener);
    }
       
    public <T> Future<T> submit(Callable<T> task, ManagedTaskListener listener) {
        return schedule(task, 0, TimeUnit.NANOSECONDS, listener);
    }
   
    /*
     * This is called by invokeAll/invokeAny functions.    
     */
    protected void executeTask(ManagedFutureTask<?> task) { 
        // ManagedFutureTask will get wrapped in ScheduledFutureTask
        delayedExecute(task);
    }
   
    // Policy methods
   
    /**
     * Set policy on whether to continue executing existing periodic
     * tasks even when this executor has been <tt>shutdown</tt>. In
     * this case, these tasks will only terminate upon
     * <tt>shutdownNow</tt>, or after setting the policy to
     * <tt>false</tt> when already shutdown. This value is by default
     * false.
     * @param value if true, continue after shutdown, else don't.
     * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
     */
    public void setContinueExistingPeriodicTasksAfterShutdownPolicy(boolean value) {
        continueExistingPeriodicTasksAfterShutdown = value;
        if (!value && isShutdown())
            cancelUnwantedTasks();
    }

    /**
     * Get the policy on whether to continue executing existing
     * periodic tasks even when this executor has been
     * <tt>shutdown</tt>. In this case, these tasks will only
     * terminate upon <tt>shutdownNow</tt> or after setting the policy
     * to <tt>false</tt> when already shutdown. This value is by
     * default false.
     * @return true if will continue after shutdown.
     * @see #setContinueExistingPeriodicTasksAfterShutdownPolicy
     */
    public boolean getContinueExistingPeriodicTasksAfterShutdownPolicy() {
        return continueExistingPeriodicTasksAfterShutdown;
    }

    /**
     * Set policy on whether to execute existing delayed
     * tasks even when this executor has been <tt>shutdown</tt>. In
     * this case, these tasks will only terminate upon
     * <tt>shutdownNow</tt>, or after setting the policy to
     * <tt>false</tt> when already shutdown. This value is by default
     * true.
     * @param value if true, execute after shutdown, else don't.
     * @see #getExecuteExistingDelayedTasksAfterShutdownPolicy
     */
    public void setExecuteExistingDelayedTasksAfterShutdownPolicy(boolean value) {
        executeExistingDelayedTasksAfterShutdown = value;
        if (!value && isShutdown())
            cancelUnwantedTasks();
    }

    /**
     * Get policy on whether to execute existing delayed
     * tasks even when this executor has been <tt>shutdown</tt>. In
     * this case, these tasks will only terminate upon
     * <tt>shutdownNow</tt>, or after setting the policy to
     * <tt>false</tt> when already shutdown. This value is by default
     * true.
     * @return true if will execute after shutdown.
     * @see #setExecuteExistingDelayedTasksAfterShutdownPolicy
     */
    public boolean getExecuteExistingDelayedTasksAfterShutdownPolicy() {
        return executeExistingDelayedTasksAfterShutdown;
    }


    /**
     * Initiates an orderly shutdown in which previously submitted
     * tasks are executed, but no new tasks will be accepted. If the
     * <tt>ExecuteExistingDelayedTasksAfterShutdownPolicy</tt> has
     * been set <tt>false</tt>, existing delayed tasks whose delays
     * have not yet elapsed are cancelled. And unless the
     * <tt>ContinueExistingPeriodicTasksAfterShutdownPolicy</tt> has
     * been set <tt>true</tt>, future executions of existing periodic
     * tasks will be cancelled.
     */
    public void shutdown() {
        cancelUnwantedTasks();
        super.shutdown();
    }

    /**
     * Attempts to stop all actively executing tasks, halts the
     * processing of waiting tasks, and returns a list of the tasks that were
     * awaiting execution.
     * 
     * <p>There are no guarantees beyond best-effort attempts to stop
     * processing actively executing tasks.  This implementation
     * cancels tasks via {@link Thread#interrupt}, so if any tasks mask or
     * fail to respond to interrupts, they may never terminate.
     *
     * @return list of tasks that never commenced execution.  Each
     * element of this list is a {@link ScheduledFuture},
     * including those tasks submitted using <tt>execute</tt>, which
     * are for scheduling purposes used as the basis of a zero-delay
     * <tt>ScheduledFuture</tt>.
     */
    public List<Runnable> shutdownNow() {
        return super.shutdownNow();
    }

    /**
     * Returns the task queue used by this executor.  Each element of
     * this queue is a {@link ScheduledFuture}, including those
     * tasks submitted using <tt>execute</tt> which are for scheduling
     * purposes used as the basis of a zero-delay
     * <tt>ScheduledFuture</tt>. Iteration over this queue is
     * <em>not</em> guaranteed to traverse tasks in the order in
     * which they will execute.
     *
     * @return the task queue
     */
    public BlockingQueue<Runnable> getQueue() {
        return super.getQueue();
    }

    /**
     * An annoying wrapper class to convince generics compiler to
     * use a DelayQueue<ScheduledFutureTask> as a BlockingQueue<Runnable>
     */
    private static class DelayedWorkQueue
        extends AbstractCollection<Runnable>
        implements BlockingQueue<Runnable> {
       
        private final DelayQueue<ScheduledFutureTask> dq = new DelayQueue<ScheduledFutureTask>();
        public Runnable poll() { return dq.poll(); }
        public Runnable peek() { return dq.peek(); }
        public Runnable take() throws InterruptedException { return dq.take(); }
        public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
            return dq.poll(timeout, unit);
        }

        public boolean add(Runnable x) { return dq.add((ScheduledFutureTask)x); }
        public boolean offer(Runnable x) { return dq.offer((ScheduledFutureTask)x); }
        public void put(Runnable x)  {
            dq.put((ScheduledFutureTask)x);
        }
        public boolean offer(Runnable x, long timeout, TimeUnit unit) {
            return dq.offer((ScheduledFutureTask)x, timeout, unit);
        }

        public Runnable remove() { return dq.remove(); }
        public Runnable element() { return dq.element(); }
        public void clear() { dq.clear(); }
        public int drainTo(Collection<? super Runnable> c) { return dq.drainTo(c); }
        public int drainTo(Collection<? super Runnable> c, int maxElements) {
            return dq.drainTo(c, maxElements);
        }

        public int remainingCapacity() { return dq.remainingCapacity(); }
        public boolean remove(Object x) { return dq.remove(x); }
        public boolean contains(Object x) { return dq.contains(x); }
        public int size() { return dq.size(); }
        public boolean isEmpty() { return dq.isEmpty(); }
        public Object[] toArray() { return dq.toArray(); }
        public <T> T[] toArray(T[] array) { return dq.toArray(array); }
        public Iterator<Runnable> iterator() {
            return new Iterator<Runnable>() {
                private Iterator<ScheduledFutureTask> it = dq.iterator();
                public boolean hasNext() { return it.hasNext(); }
                public Runnable next() { return it.next(); }
                public void remove() {  it.remove(); }
            };
        }
    }
  
}
TOP

Related Classes of org.apache.geronimo.concurrent.executor.AbstractManagedScheduledExecutorService

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.