Package org.apache.mina.util

Source Code of org.apache.mina.util.BaseThreadPool$Worker

/*
*   @(#) $Id: BaseThreadPool.java 327113 2005-10-21 06:59:15Z trustin $
*
*   Copyright 2004 The Apache Software Foundation
*
*   Licensed under the Apache License, Version 2.0 (the "License");
*   you may not use this file except in compliance with the License.
*   You may obtain a copy of the License at
*
*       http://www.apache.org/licenses/LICENSE-2.0
*
*   Unless required by applicable law or agreed to in writing, software
*   distributed under the License is distributed on an "AS IS" BASIS,
*   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*   See the License for the specific language governing permissions and
*   limitations under the License.
*
*/
package org.apache.mina.util;

import java.util.HashSet;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Set;

import org.apache.mina.common.Session;

/**
* A base implementation of Thread-pooling filters.
* This filter forwards events to its thread pool.  This is an implementation of
* <a href="http://deuce.doc.wustl.edu/doc/pspdfs/lf.pdf">Leader/Followers
* thread pool</a> by Douglas C. Schmidt et al.
*
* @author The Apache Directory Project (dev@directory.apache.org)
* @version $Rev: 327113 $, $Date: 2005-10-21 15:59:15 +0900 $
*/
public abstract class BaseThreadPool implements ThreadPool
{
    /**
     * Default maximum size of thread pool (2G).
     */
    public static final int DEFAULT_MAXIMUM_POOL_SIZE = Integer.MAX_VALUE;

    /**
     * Default keep-alive time of thread pool (1 min).
     */
    public static final int DEFAULT_KEEP_ALIVE_TIME = 60 * 1000;

    /**
     * A queue which contains {@link Integer}s which represents reusable
     * thread IDs.  {@link Worker} first checks this queue and then
     * uses {@link #threadId} when no reusable thread ID is available.
     */
    private static final Queue threadIdReuseQueue = new Queue();
    private static int threadId = 0;
   
    private static int acquireThreadId()
    {
        synchronized( threadIdReuseQueue )
        {
            Integer id = ( Integer ) threadIdReuseQueue.pop();
            if( id == null )
            {
                return ++ threadId;
            }
            else
            {
                return id.intValue();
            }
        }
    }
   
    private static void releaseThreadId( int id )
    {
        synchronized( threadIdReuseQueue )
        {
            threadIdReuseQueue.push( new Integer( id ) );
        }
    }

    private final String threadNamePrefix;
    private final Map buffers = new IdentityHashMap();
    private final Stack followers = new Stack();
    private final BlockingQueue unfetchedSessionBuffers = new BlockingQueue();
    private final Set allSessionBuffers = new HashSet();

    private Worker leader;

    private int maximumPoolSize = DEFAULT_MAXIMUM_POOL_SIZE;
    private int keepAliveTime = DEFAULT_KEEP_ALIVE_TIME;

    private boolean started;
    private boolean shuttingDown;

    private int poolSize;
    private final Object poolSizeLock = new Object();

    /**
     * Creates a new instance with default thread pool settings.
     * You'll have to invoke {@link #start()} method to start threads actually.
     *
     * @param threadNamePrefix the prefix of the thread names this pool will create.
     */
    public BaseThreadPool( String threadNamePrefix )
    {
        if( threadNamePrefix == null )
        {
            throw new NullPointerException( "threadNamePrefix" );
        }
        threadNamePrefix = threadNamePrefix.trim();
        if( threadNamePrefix.length() == 0 )
        {
            throw new IllegalArgumentException( "threadNamePrefix is empty." );
        }
        this.threadNamePrefix = threadNamePrefix;
    }
   
    public String getThreadNamePrefix()
    {
        return threadNamePrefix;
    }

    public int getPoolSize()
    {
        synchronized( poolSizeLock )
        {
            return poolSize;
        }
    }

    public int getMaximumPoolSize()
    {
        return maximumPoolSize;
    }

    public int getKeepAliveTime()
    {
        return keepAliveTime;
    }

    public void setMaximumPoolSize( int maximumPoolSize )
    {
        if( maximumPoolSize <= 0 )
            throw new IllegalArgumentException();
        this.maximumPoolSize = maximumPoolSize;
    }

    public void setKeepAliveTime( int keepAliveTime )
    {
        this.keepAliveTime = keepAliveTime;
    }

    public synchronized void start()
    {
        if( started )
            return;

        shuttingDown = false;

        leader = new Worker();
        leader.start();
        leader.lead();

        started = true;
    }

    public synchronized void stop()
    {
        if( !started )
            return;

        shuttingDown = true;
        Worker lastLeader = null;
        for( ;; )
        {
            Worker leader = this.leader;
            if( lastLeader == leader )
                break;

            while( leader.isAlive() )
            {
                leader.interrupt();
                try
                {
                    leader.join();
                }
                catch( InterruptedException e )
                {
                }
            }

            lastLeader = leader;
        }

        started = false;
    }

    private void increasePoolSize()
    {
        synchronized( poolSizeLock )
        {
            poolSize++;
        }
    }

    private void decreasePoolSize()
    {
        synchronized( poolSizeLock )
        {
            poolSize--;
        }
    }

    protected void fireEvent( Object nextFilter, Session session,
                              EventType type, Object data )
    {
        final BlockingQueue unfetchedSessionBuffers = this.unfetchedSessionBuffers;
        final Set allSessionBuffers = this.allSessionBuffers;
        final Event event = new Event( type, nextFilter, data );

        synchronized( unfetchedSessionBuffers )
        {
            final SessionBuffer buf = getSessionBuffer( session );
            final Queue eventQueue = buf.eventQueue;

            synchronized( buf )
            {
                eventQueue.push( event );
            }

            if( !allSessionBuffers.contains( buf ) )
            {
                allSessionBuffers.add( buf );
                unfetchedSessionBuffers.push( buf );
            }
        }
    }

    /**
     * Implement this method to forward events to <tt>nextFilter</tt>.
     */
    protected abstract void processEvent( Object nextFilter, Session session,
                                          EventType type, Object data );

   
    /**
     * Implement this method to fetch (or pop) a {@link SessionBuffer} from
     * the given <tt>unfetchedSessionBuffers</tt>.  The default implementation
     * simply pops the buffer from it.  You could prioritize the fetch order.
     *
     * @return A non-null {@link SessionBuffer}
     */
    protected SessionBuffer fetchSessionBuffer( Queue unfetchedSessionBuffers )
    {
        return ( SessionBuffer ) unfetchedSessionBuffers.pop();
    }

    private SessionBuffer getSessionBuffer( Session session )
    {
        final Map buffers = this.buffers;
        SessionBuffer buf = ( SessionBuffer ) buffers.get( session );
        if( buf == null )
        {
            synchronized( buffers )
            {
                buf = ( SessionBuffer ) buffers.get( session );
                if( buf == null )
                {
                    buf = new SessionBuffer( session );
                    buffers.put( session, buf );
                }
            }
        }
        return buf;
    }

    private void removeSessionBuffer( SessionBuffer buf )
    {
        final Map buffers = this.buffers;
        final Session session = buf.session;
        synchronized( buffers )
        {
            buffers.remove( session );
        }
    }

    protected static class SessionBuffer
    {
        private final Session session;

        private final Queue eventQueue = new Queue();

        private SessionBuffer( Session session )
        {
            this.session = session;
        }
       
        public Session getSession()
        {
            return session;
        }
       
        public Queue getEventQueue()
        {
            return eventQueue;
        }
    }

    private class Worker extends Thread
    {
        private final int id;
        private final Object promotionLock = new Object();
        private boolean dead;

        private Worker()
        {
            int id = acquireThreadId();
            this.id = id;
            this.setName( threadNamePrefix + '-' + id );
            increasePoolSize();
        }

        public boolean lead()
        {
            final Object promotionLock = this.promotionLock;
            synchronized( promotionLock )
            {
                if( dead )
                {
                    return false;
                }

                leader = this;
                promotionLock.notify();
            }
           
            return true;
        }

        public void run()
        {
            for( ;; )
            {
                if( !waitForPromotion() )
                    break;

                SessionBuffer buf = fetchBuffer();
                giveUpLead();

                if( buf == null )
                {
                    break;
                }

                processEvents( buf );
                follow();
                releaseBuffer( buf );
            }

            decreasePoolSize();
            releaseThreadId( id );
        }

        private SessionBuffer fetchBuffer()
        {
            BlockingQueue unfetchedSessionBuffers = BaseThreadPool.this.unfetchedSessionBuffers;
            synchronized( unfetchedSessionBuffers )
            {
                for( ;; )
                {
                    if( shuttingDown )
                    {
                        return null;
                    }

                    try
                    {
                        unfetchedSessionBuffers.waitForNewItem();
                    }
                    catch( InterruptedException e )
                    {
                        if( shuttingDown )
                        {
                            return null;
                        }
                        else
                        {
                            continue;
                        }
                    }

                    return BaseThreadPool.this.fetchSessionBuffer( unfetchedSessionBuffers );
                }
            }
        }

        private void processEvents( SessionBuffer buf )
        {
            final Session session = buf.session;
            final Queue eventQueue = buf.eventQueue;
            for( ;; )
            {
                Event event;
                synchronized( buf )
                {
                    event = ( Event ) eventQueue.pop();
                    if( event == null )
                        break;
                }
                processEvent( event.getNextFilter(), session,
                              event.getType(), event.getData() );
            }
        }

        private void follow()
        {
            final Object promotionLock = this.promotionLock;
            final Stack followers = BaseThreadPool.this.followers;
            synchronized( promotionLock )
            {
                if( this != leader )
                {
                    synchronized( followers )
                    {
                        followers.push( this );
                    }
                }
            }
        }

        private void releaseBuffer( SessionBuffer buf )
        {
            final BlockingQueue unfetchedSessionBuffers = BaseThreadPool.this.unfetchedSessionBuffers;
            final Set allSessionBuffers = BaseThreadPool.this.allSessionBuffers;
            final Queue eventQueue = buf.eventQueue;

            synchronized( unfetchedSessionBuffers )
            {
                if( eventQueue.isEmpty() )
                {
                    allSessionBuffers.remove( buf );
                    removeSessionBuffer( buf );
                }
                else
                {
                    unfetchedSessionBuffers.push( buf );
                }
            }
        }

        private boolean waitForPromotion()
        {
            final Object promotionLock = this.promotionLock;

            final long startTime = System.currentTimeMillis();
            long currentTime = startTime;
           
            synchronized( promotionLock )
            {
                while( this != leader )
                {
                    // Calculate remaining keep-alive time
                    int keepAliveTime = getKeepAliveTime();
                    if( keepAliveTime > 0 )
                    {
                        keepAliveTime -= ( currentTime - startTime );
                    }
                    else
                    {
                        keepAliveTime = Integer.MAX_VALUE;
                    }
                   
                    // Break the loop if there's no remaining keep-alive time.
                    if( keepAliveTime <= 0 )
                    {
                        break;
                    }

                    // Wait for promotion
                    try
                    {
                        promotionLock.wait( keepAliveTime );
                    }
                    catch( InterruptedException e )
                    {
                        if( shuttingDown )
                        {
                            break;
                        }
                    }

                    // Update currentTime for the next iteration
                    currentTime = System.currentTimeMillis();
                }

                boolean timeToLead = this == leader && !shuttingDown;

                if( !timeToLead )
                {
                    // time to die
                    synchronized( followers )
                    {
                        followers.remove( this );
                    }

                    // Mark as dead explicitly when we've got promotionLock.
                    dead = true;
                }

                return timeToLead;
            }
        }

        private void giveUpLead()
        {
            final Stack followers = BaseThreadPool.this.followers;
            Worker worker;
            do
            {
                synchronized( followers )
                {
                    worker = ( Worker ) followers.pop();
                }

                if( worker == null )
                {
                    // Increase the number of threads if we
                    // are not shutting down and we can increase the number.
                    if( !shuttingDown
                        && getPoolSize() < getMaximumPoolSize() )
                    {
                        worker = new Worker();
                        worker.lead();
                        worker.start();
                    }

                    // This loop should end because:
                    // 1) lead() is called already,
                    // 2) or it is shutting down and there's no more threads left.
                    break;
                }
            }
            while( !worker.lead() );
        }
    }
}
TOP

Related Classes of org.apache.mina.util.BaseThreadPool$Worker

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.