Package org.apache.maven.surefire.junitcore.pc

Source Code of org.apache.maven.surefire.junitcore.pc.Scheduler$Controller

package org.apache.maven.surefire.junitcore.pc;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import org.junit.runner.Description;
import org.junit.runners.model.RunnerScheduler;

import java.util.Collection;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

/**
* Schedules tests, controls thread resources, awaiting tests and other schedulers finished, and
* a master scheduler can shutdown slaves.
* <p/>
* The scheduler objects should be first created (and wired) and set in runners
* {@link org.junit.runners.ParentRunner#setScheduler(org.junit.runners.model.RunnerScheduler)}.
* <p/>
* A new instance of scheduling strategy should be passed to the constructor of this scheduler.
*
* @author Tibor Digana (tibor17)
* @since 2.16
*/
public class Scheduler
    implements RunnerScheduler
{
    private final Balancer balancer;

    private final SchedulingStrategy strategy;

    private final Set<Controller> slaves = new CopyOnWriteArraySet<Controller>();

    private final Description description;

    private volatile boolean shutdown = false;

    private volatile boolean started = false;

    private volatile boolean finished = false;

    private volatile Controller masterController;

    /**
     * Use e.g. parallel classes have own non-shared thread pool, and methods another pool.
     * <p/>
     * You can use it with one infinite thread pool shared in strategies across all
     * suites, class runners, etc.
     */
    public Scheduler( Description description, SchedulingStrategy strategy )
    {
        this( description, strategy, -1 );
    }

    /**
     * Should be used if schedulers in parallel children and parent use one instance of bounded thread pool.
     * <p/>
     * Set this scheduler in a e.g. one suite of classes, then every individual class runner should reference
     * {@link #Scheduler(org.junit.runner.Description, Scheduler, SchedulingStrategy)}
     * or {@link #Scheduler(org.junit.runner.Description, Scheduler, SchedulingStrategy, int)}.
     *
     * @param description description of current runner
     * @param strategy    scheduling strategy with a shared thread pool
     * @param concurrency determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)}
     * @throws NullPointerException if null <tt>strategy</tt>
     */
    public Scheduler( Description description, SchedulingStrategy strategy, int concurrency )
    {
        this( description, strategy, BalancerFactory.createBalancer( concurrency ) );
    }

    /**
     * New instances should be used by schedulers with limited concurrency by <tt>balancer</tt>
     * against other groups of schedulers. The schedulers share one pool.
     * <p/>
     * Unlike in {@link #Scheduler(org.junit.runner.Description, SchedulingStrategy, int)} which was limiting
     * the <tt>concurrency</tt> of children of a runner where this scheduler was set, <em>this</em> <tt>balancer</tt>
     * is limiting the concurrency of all children in runners having schedulers created by this constructor.
     *
     * @param description description of current runner
     * @param strategy    scheduling strategy which may share threads with other strategy
     * @param balancer    determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)}
     * @throws NullPointerException if null <tt>strategy</tt> or <tt>balancer</tt>
     */
    public Scheduler( Description description, SchedulingStrategy strategy, Balancer balancer )
    {
        strategy.setDefaultShutdownHandler( newShutdownHandler() );
        this.description = description;
        this.strategy = strategy;
        this.balancer = balancer;
        masterController = null;
    }

    /**
     * Can be used by e.g. a runner having parallel classes in use case with parallel
     * suites, classes and methods sharing the same thread pool.
     *
     * @param description     description of current runner
     * @param masterScheduler scheduler sharing own threads with this slave
     * @param strategy        scheduling strategy for this scheduler
     * @param balancer        determines maximum concurrent children scheduled a time via {@link #schedule(Runnable)}
     * @throws NullPointerException if null <tt>masterScheduler</tt>, <tt>strategy</tt> or <tt>balancer</tt>
     */
    public Scheduler( Description description, Scheduler masterScheduler, SchedulingStrategy strategy,
                      Balancer balancer )
    {
        this( description, strategy, balancer );
        strategy.setDefaultShutdownHandler( newShutdownHandler() );
        masterScheduler.register( this );
    }

    /**
     * @param masterScheduler a reference to {@link #Scheduler(org.junit.runner.Description, SchedulingStrategy, int)}
     *                        or {@link #Scheduler(org.junit.runner.Description, SchedulingStrategy)}
     * @see #Scheduler(org.junit.runner.Description, SchedulingStrategy)
     * @see #Scheduler(org.junit.runner.Description, SchedulingStrategy, int)
     */
    public Scheduler( Description description, Scheduler masterScheduler, SchedulingStrategy strategy, int concurrency )
    {
        this( description, strategy, concurrency );
        strategy.setDefaultShutdownHandler( newShutdownHandler() );
        masterScheduler.register( this );
    }

    /**
     * Should be used with individual pools on suites, classes and methods, see
     * {@link org.apache.maven.surefire.junitcore.pc.ParallelComputerBuilder#useSeparatePools()}.
     * <p/>
     * Cached thread pool is infinite and can be always shared.
     */
    public Scheduler( Description description, Scheduler masterScheduler, SchedulingStrategy strategy )
    {
        this( description, masterScheduler, strategy, 0 );
    }

    private void setController( Controller masterController )
    {
        if ( masterController == null )
        {
            throw new NullPointerException( "null ExecutionController" );
        }
        this.masterController = masterController;
    }

    /**
     * @param slave a slave scheduler to register
     * @return <tt>true</tt> if successfully registered the <tt>slave</tt>.
     */
    private boolean register( Scheduler slave )
    {
        boolean canRegister = slave != null && slave != this;
        if ( canRegister )
        {
            Controller controller = new Controller( slave );
            canRegister = !slaves.contains( controller );
            if ( canRegister )
            {
                slaves.add( controller );
                slave.setController( controller );
            }
        }
        return canRegister;
    }

    /**
     * @return <tt>true</tt> if new tasks can be scheduled.
     */
    private boolean canSchedule()
    {
        return !shutdown && ( masterController == null || masterController.canSchedule() );
    }

    protected void logQuietly( Throwable t )
    {
        t.printStackTrace( System.out );
    }

    protected void logQuietly( String msg )
    {
        System.out.println( msg );
    }

    /**
     * Attempts to stop all actively executing tasks and immediately returns a collection
     * of descriptions of those tasks which have started prior to this call.
     * <p/>
     * This scheduler and other registered schedulers will stop, see {@link #register(Scheduler)}.
     * If <tt>shutdownNow</tt> is set, waiting methods will be interrupted via {@link Thread#interrupt}.
     *
     * @param stopNow if <tt>true</tt> interrupts waiting test methods
     * @return collection of descriptions started before shutting down
     */
    protected ShutdownResult describeStopped( boolean stopNow )
    {
        Collection<Description> executedTests = new ConcurrentLinkedQueue<Description>();
        Collection<Description> incompleteTests = new ConcurrentLinkedQueue<Description>();
        stop( executedTests, incompleteTests, false, stopNow );
        return new ShutdownResult( executedTests, incompleteTests );
    }

    /**
     * Stop/Shutdown/Interrupt scheduler and its children (if any).
     *
     * @param executedTests       Started tests which have finished normally or abruptly till called this method.
     * @param incompleteTests     Started tests which have finished incomplete due to shutdown.
     * @param tryCancelFutures    Useful to set to {@code false} if a timeout is specified in plugin config.
     *                            When the runner of
     *                            {@link ParallelComputer#getSuite(org.junit.runners.model.RunnerBuilder, Class[])}
     *                            is finished in
     *                            {@link org.junit.runners.Suite#run(org.junit.runner.notification.RunNotifier)}
     *                            all the thread-pools created by {@link ParallelComputerBuilder.PC} are already dead.
     *                            See the unit test <em>ParallelComputerBuilder#timeoutAndForcedShutdown()</em>.
     * @param stopNow             Interrupting tests by {@link java.util.concurrent.ExecutorService#shutdownNow()} or
     *                            {@link java.util.concurrent.Future#cancel(boolean) Future#cancel(true)} or
     *                            {@link Thread#interrupt()}.
     */
    private void stop( Collection<Description> executedTests, Collection<Description> incompleteTests,
                       boolean tryCancelFutures, boolean stopNow )
    {
        shutdown = true;
        try
        {
            if ( started && !ParallelComputerUtil.isUnusedDescription( description ) )
            {
                if ( executedTests != null )
                {
                    executedTests.add( description );
                }

                if ( incompleteTests != null && !finished )
                {
                    incompleteTests.add( description );
                }
            }

            for ( Controller slave : slaves )
            {
                slave.stop( executedTests, incompleteTests, tryCancelFutures, stopNow );
            }
        }
        finally
        {
            try
            {
                balancer.releaseAllPermits();
            }
            finally
            {
                if ( stopNow )
                {
                    strategy.stopNow();
                }
                else if ( tryCancelFutures )
                {
                    strategy.stop();
                }
                else
                {
                    strategy.disable();
                }
            }
        }
    }

    protected boolean shutdownThreadPoolsAwaitingKilled()
    {
        if ( masterController == null )
        {
            stop( null, null, true, false );
            boolean isNotInterrupted = true;
            if ( strategy != null )
            {
                isNotInterrupted = strategy.destroy();
            }
            for ( Controller slave : slaves )
            {
                isNotInterrupted &= slave.destroy();
            }
            return isNotInterrupted;
        }
        else
        {
            throw new UnsupportedOperationException( "cannot call this method if this is not a master scheduler" );
        }
    }

    protected void beforeExecute()
    {
    }

    protected void afterExecute()
    {
    }

    public void schedule( Runnable childStatement )
    {
        if ( childStatement == null )
        {
            logQuietly( "cannot schedule null" );
        }
        else if ( canSchedule() && strategy.canSchedule() )
        {
            try
            {
                boolean isNotInterrupted = balancer.acquirePermit();
                if ( isNotInterrupted && !shutdown )
                {
                    Runnable task = wrapTask( childStatement );
                    strategy.schedule( task );
                    started = true;
                }
            }
            catch ( RejectedExecutionException e )
            {
                stop( null, null, true, false );
            }
            catch ( Throwable t )
            {
                balancer.releasePermit();
                logQuietly( t );
            }
        }
    }

    public void finished()
    {
        try
        {
            strategy.finished();
        }
        catch ( InterruptedException e )
        {
            logQuietly( e );
        }
        finally
        {
            finished = true;
        }
    }

    private Runnable wrapTask( final Runnable task )
    {
        return new Runnable()
        {
            public void run()
            {
                try
                {
                    beforeExecute();
                    task.run();
                }
                finally
                {
                    try
                    {
                        afterExecute();
                    }
                    finally
                    {
                        balancer.releasePermit();
                    }
                }
            }
        };
    }

    protected ShutdownHandler newShutdownHandler()
    {
        return new ShutdownHandler();
    }

    /**
     * If this is a master scheduler, the slaves can stop scheduling by the master through the controller.
     */
    private final class Controller
    {
        private final Scheduler slave;

        private Controller( Scheduler slave )
        {
            this.slave = slave;
        }

        /**
         * @return <tt>true</tt> if new children can be scheduled.
         */
        boolean canSchedule()
        {
            return Scheduler.this.canSchedule();
        }

        void stop( Collection<Description> executedTests, Collection<Description> incompleteTests,
                   boolean tryCancelFutures, boolean shutdownNow )
        {
            slave.stop( executedTests, incompleteTests, tryCancelFutures, shutdownNow );
        }

        /**
         * @see org.apache.maven.surefire.junitcore.pc.Destroyable#destroy()
         */
        boolean destroy()
        {
            return slave.strategy.destroy();
        }

        @Override
        public int hashCode()
        {
            return slave.hashCode();
        }

        @Override
        public boolean equals( Object o )
        {
            return o == this || ( o instanceof Controller ) && slave.equals( ( (Controller) o ).slave );
        }
    }

    /**
     * There is a way to shutdown the hierarchy of schedulers. You can do it in master scheduler via
     * {@link #shutdownThreadPoolsAwaitingKilled()} which kills the current master and children recursively.
     * If alternatively a shared {@link java.util.concurrent.ExecutorService} used by the master and children
     * schedulers is shutdown from outside, then the {@link ShutdownHandler} is a hook calling current
     * {@link #describeStopped(boolean)}. The method {@link #describeStopped(boolean)} is again shutting down children
     * schedulers recursively as well.
     */
    public class ShutdownHandler
        implements RejectedExecutionHandler
    {
        private volatile RejectedExecutionHandler poolHandler;

        protected ShutdownHandler()
        {
            poolHandler = null;
        }

        public void setRejectedExecutionHandler( RejectedExecutionHandler poolHandler )
        {
            this.poolHandler = poolHandler;
        }

        public void rejectedExecution( Runnable r, ThreadPoolExecutor executor )
        {
            if ( executor.isShutdown() )
            {
                Scheduler.this.stop( null, null, true, false );
            }
            final RejectedExecutionHandler poolHandler = this.poolHandler;
            if ( poolHandler != null )
            {
                poolHandler.rejectedExecution( r, executor );
            }
        }
    }
}
TOP

Related Classes of org.apache.maven.surefire.junitcore.pc.Scheduler$Controller

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.