Package org.jboss.internal.soa.esb.couriers.transport

Source Code of org.jboss.internal.soa.esb.couriers.transport.InVMTemporaryTransport$InVMQueueEntry

/*
* JBoss, Home of Professional Open Source
* Copyright 2006, JBoss Inc., and individual contributors as indicated
* by the @authors tag. See the copyright.txt in the distribution for a
* full listing of individual contributors.
*
* This is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as
* published by the Free Software Foundation; either version 2.1 of
* the License, or (at your option) any later version.
*
* This software is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
package org.jboss.internal.soa.esb.couriers.transport;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.log4j.Logger;
import org.jboss.internal.soa.esb.couriers.tx.InVMXAResource;
import org.jboss.internal.soa.esb.message.format.MessageSerializer;
import org.jboss.soa.esb.addressing.eprs.InVMEpr;
import org.jboss.soa.esb.common.Environment;
import org.jboss.soa.esb.common.ModulePropertyManager;
import org.jboss.soa.esb.common.TransactionStrategy;
import org.jboss.soa.esb.common.TransactionStrategyException;
import org.jboss.soa.esb.message.ByReferenceMessage;
import org.jboss.soa.esb.message.Message;

import com.arjuna.common.util.propertyservice.PropertyManager;

/**
* Provide support for the InVM temporary transports.
*
* The InVM transport is divided into two distinct types, permanent transports which
* represent ESB aware services and temporary transports which represent default reply
* queues.
*
* This class provides the support for temporary queues while support for the
* registry queries/permanent transports are through the InVMTransport class.
*
* @author kevin
*/
public class InVMTemporaryTransport
{
    /**
     * The logger for this instance.
     */
    private static final Logger LOGGER = Logger.getLogger(InVMTemporaryTransport.class) ;
    /**
     * The default expiry time.
     */
    private static final long DEFAULT_EXPIRY_TIME = 300000;
    /**
     * The expiry time for messages.
     */
    private final long expiryTime ;
    /**
     * Mapping of service ids to entries.
     */
    private final Map<String, InVMEntry> serviceIdToEntry = new HashMap<String, InVMEntry>() ;
    /**
     * Ordered entries for reaping.
     */
    private final LinkedHashSet<InVMQueueEntry> orderedEntries = new LinkedHashSet<InVMQueueEntry>() ;
    /**
     * The lock guarding access and modification to the structures.
     */
    private ReadWriteLock lock = new ReentrantReadWriteLock() ;
   
    /**
     * Reaper thread
     */
    private volatile ReaperThread reaperThread ;

    /**
     * Factory singleton instance.
     */
    private static InVMTemporaryTransport instance = new InVMTemporaryTransport();

    /**
     * Get the InVM Transport.
     * @return The InVM Transport instance.
     */
    public static InVMTemporaryTransport getInstance()
    {
        return instance;
    }

    /**
     * Deliver a message to the specified EPR.
     * @param inVMEpr The EPR to receive the message.
     * @param message The message to deliver.
     * @throws InVMException for InVM transport specific errors.
     */
    public void deliver(final InVMEpr inVMEpr, final Message message)
        throws InVMException
    {
        final String serviceId = inVMEpr.getAddr().getAddress() ;
        if (LOGGER.isDebugEnabled())
        {
            LOGGER.debug("Delivering message to " + serviceId) ;
        }

        final Object addedObject = InVMTransport.toDeliveryObject(message, inVMEpr.getPassByValue());
       
        if (InVMTransport.isTransactional())
        {
            if (LOGGER.isDebugEnabled())
            {
                LOGGER.debug("Delivering transactional message to " + serviceId) ;
            }
            /*
             * Can't do lockstep wait here because otherwise the transaction may not terminate if this
             * is the transaction controller thread!
             */
            final TransactionStrategy txStrategy = TransactionStrategy.getTransactionStrategy(true) ;
            try
            {
                txStrategy.enlistResource(new InVMXAResource(inVMEpr, addedObject, InVMXAResource.Operation.INSERT));
            }
            catch (final TransactionStrategyException tse)
            {
                throw new InVMException("Unexpected error enlisting transaction resource", tse) ;
            }
        }
        else
        {
            if (LOGGER.isDebugEnabled())
            {
                LOGGER.debug("Delivering message direct to " + serviceId + " queue") ;
            }
           
            final long lockstep = (inVMEpr.getLockstep() ? inVMEpr.getLockstepWaitTime() : 0) ;
           
            final InVMEntry entry ;
            acquireReadLock() ;
            try
            {
                entry = getEntry(serviceId) ;
                entry.incDelivery() ;
            }
            finally
            {
                releaseReadLock() ;
            }
            deliver(entry, addedObject, lockstep) ;
        }
    }

    /**
     * Pickup a message for the specified EPR.
     * @param inVMEpr The EPR to receive the message.
     * @param millis The number of milliseconds to wait for a message.
     * @return The message or null if nothing present within the timeout.
     * @throws InVMException for InVM transport specific errors.
     */
    public Message pickup(final InVMEpr inVMEpr, final long millis)
        throws InVMException
    {
        final String serviceId = inVMEpr.getAddr().getAddress() ;
        final InVMEntry entry ;
        acquireReadLock() ;
        try
        {
            entry = getEntry(serviceId) ;
            entry.incPickup() ;
        }
        finally
        {
            releaseReadLock() ;
        }
       
        final InVMQueueEntry queueEntry = entry.pickup(millis) ;
        clearEntry(entry, queueEntry) ;
        if (queueEntry != null)
        {
            if (LOGGER.isDebugEnabled())
            {
                LOGGER.debug("Pickup of message from " + serviceId) ;
            }
            final Object msgObject = queueEntry.getValue() ;
            if (msgObject != null)
            {

                final Message message = InVMTransport.fromDeliveryObject(msgObject, inVMEpr.getPassByValue());

                if (InVMTransport.isTransactional())
                {
                    /*
                     * Return the message, but don't remove it from the queue until the transaction
                     * commits. If the transaction rolls back then the message may not go back into the
                     * queue at the exact place it was originally: other messages may have been removed
                     * successfully by other threads. Plus, we would have to maintain a before and after
                     * image of the queue. This is more a compensation transaction.
                     */
                    if (LOGGER.isDebugEnabled())
                    {
                        LOGGER.debug("Pickup enlisting transactional resource for service " + serviceId) ;
                    }
                    final TransactionStrategy txStrategy = TransactionStrategy.getTransactionStrategy(true) ;
                    try
                    {
                        txStrategy.enlistResource(new InVMXAResource(inVMEpr, msgObject, InVMXAResource.Operation.REMOVE)) ;
                    }
                    catch (final TransactionStrategyException tse)
                    {
                        throw new InVMException("Unexpected error enlisting transaction resource", tse) ;
                    }
                }
                return message ;
            }
        }
        return null ;
    }

    /**
     * Deliver an object as a consequence of a transaction.  This will either be a
     * rollback, placing the object back on the source queue, or a commit delivering
     * to a target queue.
     *
     * @param inVMEpr The EPR to receive the message.
     * @param msgObject The object to deliver.
     * @throws InVMException for InVM transport specific errors.
     */
    public void deliverTx(final InVMEpr inVMEpr, final Object msgObject)
        throws InVMException
    {
        final String serviceId = inVMEpr.getAddr().getAddress() ;
       
        final InVMEntry entry ;
        acquireReadLock() ;
        try
        {
            entry = getEntry(serviceId) ;
            entry.incDelivery() ;
        }
        finally
        {
            releaseReadLock() ;
        }
       
        if (LOGGER.isDebugEnabled())
        {
            LOGGER.debug("Transactional redelivery of message to " + serviceId) ;
        }
       
        deliver(entry, msgObject, 0) ;
    }
   
    /**
     * Start the reaper thread.
     */
    public void startReaper()
    {
        LOGGER.info("Starting reaper thread") ;
        reaperThread = new ReaperThread() ;
        reaperThread.start() ;
    }
   
    /**
     * Stop the reaper thread.
     */
    public void stopReaper()
    {
        final ReaperThread currentReaperThread = reaperThread ;
        reaperThread = null ;
        if (currentReaperThread != null)
        {
            LOGGER.info("Stopping reaper thread") ;
            currentReaperThread.shutdown() ;
        }
    }
   
    /**
     * Deliver to the queue, adding to the reaper list if necessary.
     *
     * N.B. Must not hold external lock while calling this method.
     *
     * @param entry The invm entry containing the queue.
     * @param value The value being appended to the queue.
     * @param lockstep The lockstep timeout or 0 if not required.
     */
    private void deliver(final InVMEntry entry, final Object value, final long lockstep)
        throws InVMException
    {
        final InVMQueueEntry queueEntry = entry.deliver(value, lockstep, expiryTime) ;
        if (entry.isActive(queueEntry))
        {
            final boolean notifyReaper ;
            acquireWriteLock() ;
            try
            {
                if (entry.isActive(queueEntry))
                {
                    notifyReaper = orderedEntries.isEmpty() ;
                    orderedEntries.add(queueEntry) ;
                }
                else
                {
                    notifyReaper = false ;
                }
            }
            finally
            {
                releaseWriteLock() ;
            }
            if (notifyReaper)
            {
                final ReaperThread currentReaperThread = reaperThread ;
                if (currentReaperThread != null)
                {
                    currentReaperThread.entriesUpdated() ;
                }
            }
        }
    }
   
    /**
     * Get the size of the mapping for service ids to entries.
     * @return The size of the mapping.
     */
    public int getServiceIdToEntrySize()
    {
        acquireReadLock() ;
        try
        {
            return serviceIdToEntry.size() ;
        }
        finally
        {
            releaseReadLock() ;
        }
    }
   
    /**
     * Get the size of the ordered entries for reaping.
     * @return The size of the ordered entries.
     */
    public int getOrderedEntriesSize()
    {
        acquireReadLock() ;
        try
        {
            return orderedEntries.size() ;
        }
        finally
        {
            releaseReadLock() ;
        }
    }
   
    /**
     * Acquire a read lock for accessing the data.
     */
    private void acquireReadLock()
    {
        lock.readLock().lock() ;
    }
   
    /**
     * Release a read lock for accessing the data.
     */
    private void releaseReadLock()
    {
        lock.readLock().unlock() ;
    }
   
    /**
     * Acquire a write lock for accessing the data.
     */
    private void acquireWriteLock()
    {
        lock.writeLock().lock() ;
    }
   
    /**
     * Release a write lock for accessing the data.
     */
    private void releaseWriteLock()
    {
        lock.writeLock().unlock() ;
    }
   
    /**
     * Promote to a write lock for accessing the data.
     */
    private void promoteToWriteLock()
    {
        lock.readLock().unlock() ;
        lock.writeLock().lock() ;
    }
   
    /**
     * Demote to a read lock for accessing the data.
     */
    private void demoteToReadLock()
    {
        lock.readLock().lock() ;
        lock.writeLock().unlock() ;
    }

    /**
     * Get an entry representing the serviceId.
     *
     * Must hold read lock when calling this method.
     *
     * @param serviceId The service id to locate.
     * @return The entry associated with the service id.
     */
    private InVMEntry getEntry(final String serviceId)
    {
        InVMEntry entry = serviceIdToEntry.get(serviceId) ;
        if (entry == null)
        {
            promoteToWriteLock() ;
            try
            {
                entry = new InVMEntry(serviceId) ;
                final InVMEntry current = serviceIdToEntry.put(serviceId, entry) ;
                if ((current != null) && (current != entry))
                {
                    entry = current ;
                    serviceIdToEntry.put(serviceId, current) ;
                }
            }
            finally
            {
                demoteToReadLock() ;
            }
        }
        return entry ;
    }

    /**
     * Clear the queue entry.
     *
     * @param entry The service entry to clear.
     * @param queueEntry The queue entry to clear or null if not required.
     */
    private void clearEntry(final InVMEntry entry, final InVMQueueEntry queueEntry)
    {
        acquireWriteLock() ;
        try
        {
            if (queueEntry != null)
            {
                orderedEntries.remove(queueEntry) ;
            }
           
            if (entry.isFree())
            {
                final String serviceId = entry.getServiceId() ;
                final InVMEntry current = serviceIdToEntry.remove(serviceId) ;
                if ((current != null) && (current != entry))
                {
                    serviceIdToEntry.put(serviceId, current) ;
                }
            }
        }
        finally
        {
            releaseWriteLock() ;
        }
    }
   
    /**
     * Data representing an entry in the message queue.
     * @author kevin
     */
    private static class InVMQueueEntry
    {
        /**
         * The associated InVM entry.
         */
        private final InVMEntry entry ;
        /**
         * The value enqueued.
         */
        private final Object value ;
        /**
         * Condition associated with lockstep.
         */
        private final Condition condition ;
        /**
         * Expiry of this entry in milli seconds.
         */
        private final long expiry ;
        /**
         * The flag indicating that this entry is now inactive.
         */
        private boolean inactive ;
       
        /**
         * Construct the InVM queue entry.
         *
         * @param entry The associated InVM entry.
         * @param value The value being enqueued.
         * @param condition The condition representing the lockstep or null if not required.
         * @param expiry The expiry time of this entry.
         */
        InVMQueueEntry(final InVMEntry entry, final Object value, final Condition condition,
            final long expiry)
        {
            this.entry = entry ;
            this.value = value ;
            this.condition = condition ;
            this.expiry = expiry ;
        }
       
        /**
         * Get the associated entry.
         * @return The associated entry.
         */
        InVMEntry getEntry()
        {
            return entry ;
        }
       
        /**
         * Get the enqueued value.
         * @return The enqueued value.
         */
        Object getValue()
        {
            return value ;
        }
       
        /**
         * Get the lockstep condition.
         * @return The lockstep condition or null if not necessary.
         */
        Condition getCondition()
        {
            return condition ;
        }
       
        /**
         * Get the expiry time of this entry.
         * @return The expiry time.
         */
        long getExpiryTime()
        {
            return expiry ;
        }
       
        /**
         * Set the inactive flag.
         * Must be invoked with InVMEntry lock
         */
        void setInactive()
        {
            inactive = true ;
        }
       
        /**
         * Get the inactive flag.
         * Must be invoked with InVMEntry lock
         */
        boolean isInactive()
        {
            return inactive ;
        }
    }
   
    /**
     * The InVM entries in the data structures.
     * @author kevin
     */
    private static class InVMEntry
    {
        /**
         * The lock for the queue.
         */
        private final Lock lock = new ReentrantLock() ;
        /**
         * The condition on which to await messages.
         */
        private final Condition waitingCondition = lock.newCondition() ;
        /**
         * Entries in the queue.
         */
        private final Queue<InVMQueueEntry> entries = new LinkedList<InVMQueueEntry>() ;
        /**
         * The service id for this entry.
         */
        private final String serviceId ;
        /**
         * The number of waiters on the waiting condition.
         */
        private int numWaiters ;
        /**
         * The number of deliveries about to occur.
         */
        private int numDeliveries ;
        /**
         * The number of pickups about to occur.
         */
        private int numPickups ;
       
        /**
         * Create an entry with the specified service id.
         * @param serviceId The service id.
         */
        InVMEntry(final String serviceId)
        {
            this.serviceId = serviceId ;
        }
       
        /**
         * Get the service id associated with this entry.
         * @return the service id.
         */
        String getServiceId()
        {
            return serviceId ;
        }
       
        /**
         * Increment the number of deliveries about to occur.
         * Can hold external read lock.
         */
        void incDelivery()
        {
            lock.lock() ;
            try
            {
                numDeliveries++ ;
            }
            finally
            {
                lock.unlock();
            }
        }
       
        /**
         * Increment the number of pickups about to occur.
         * Can hold external read lock.
         */
        void incPickup()
        {
            lock.lock() ;
            try
            {
                numPickups++ ;
            }
            finally
            {
                lock.unlock();
            }
        }

       
        /**
         * Is this entry free?
         * @return true if there are no entries, waiters, deliveries or pickups about to occur.
         */
        boolean isFree()
        {
            lock.lock() ;
            try
            {
                return ((numDeliveries == 0) && (numPickups == 0) && (numWaiters == 0) && (entries.size() == 0)) ;
            }
            finally
            {
                lock.unlock();
            }
        }
       
        /**
         * Deliver the specified value onto the queue.
         *
         * N.B. Must not hold external lock while calling this method.
         *
         * @param value The value being appended to the queue.
         * @param lockstep The lockstep timeout or 0 if not required.
         * @param expiryTime The expiry time for the message.
         *
         * @throws InVMException for InVM transport specific errors.
         */
        InVMQueueEntry deliver(final Object value, final long lockstep, final long expiryTime)
            throws InVMException
        {
            lock.lock() ;
            try
            {
                numDeliveries-- ;
                final Condition condition = (lockstep > 0 ? lock.newCondition() : null) ;
                final InVMQueueEntry queueEntry = new InVMQueueEntry(this, value, condition, System.currentTimeMillis() + expiryTime) ;
                if (!entries.offer(queueEntry))
                {
                    throw new InVMException("Failed to append message to InVM queue") ;
                }
                if (numWaiters > 0)
                {
                    waitingCondition.signal() ;
                }
               
                if (condition != null)
                {
                    try
                    {
                        condition.await(lockstep, TimeUnit.MILLISECONDS) ;
                    }
                    catch (final InterruptedException ie)
                    {
                        LOGGER.warn("Waiting delivery thread interupted while waiting on message pickup on InVM queue '" + serviceId + "'.  Exiting pickup wait state.") ;
                    }
                }
                return queueEntry ;
            }
            finally
            {
                lock.unlock() ;
            }
        }
       
        /**
         * Pickup an entry from the queue.
         *
         * N.B. Must not hold external lock while calling this method.
         *
         * @param millis The number of milliseconds to wait for a message.
         * @return The message or null if nothing present within the timeout.
         *
         * @throws InVMException for InVM transport specific errors.
         */
        InVMQueueEntry pickup(final long millis)
            throws InVMException
        {
            final long end = System.currentTimeMillis() + millis ;
            lock.lock() ;
            try
            {
                numPickups-- ;
                if (entries.isEmpty())
                {
                    final long delay = end - System.currentTimeMillis() ;
                    if (delay > 0)
                    {
                        numWaiters++ ;
                        try
                        {
                            waitingCondition.await(delay, TimeUnit.MILLISECONDS) ;
                        }
                        catch (final InterruptedException ioe) {} // ignore
                        finally
                        {
                            numWaiters-- ;
                        }
                    }
                }
                final InVMQueueEntry entry = entries.poll() ;
                if (entry != null)
                {
                    entry.setInactive() ;
                    final Condition condition = entry.getCondition() ;
                    if (condition != null)
                    {
                        condition.signal() ;
                    }
                    return entry ;
                }
                return null ;
            }
            finally
            {
                lock.unlock() ;
            }
        }
       
        /**
         * Is the specified queue entry still active?
         * @param queueEntry The queue entry to check.
         * @return true if still active, false otherwise.
         */
        boolean isActive(final InVMQueueEntry queueEntry)
        {
            lock.lock() ;
            try
            {
                return !queueEntry.isInactive() ;
            }
            finally
            {
                lock.unlock() ;
            }
        }
       
        /**
         * Expire the specified entry.
         * @param queueEntry The queue entry to expire.
         */
        void expire(final InVMQueueEntry queueEntry)
        {
            lock.lock() ;
            try
            {
                if (entries.remove(queueEntry))
                {
                    queueEntry.setInactive() ;
                }
            }
            finally
            {
                lock.unlock() ;
            }
        }
    }
   
    /**
     * The reaper thread for expired tasks.
     * @author kevin
     */
    private class ReaperThread extends Thread
    {
        /**
         * Shutdown flag.
         */
        private AtomicBoolean shutdown = new AtomicBoolean() ;
        /**
         * Lock for notifications.
         */
        private Lock lock = new ReentrantLock() ;
        /**
         * Notify condition.
         */
        private Condition notifyCondition = lock.newCondition() ;
        /**
         * The notify flag.
         */
        private boolean notify ;
       
        public void run()
        {
            while(!shutdown.get())
            {
                long nextExpiry = Long.MAX_VALUE ;
                acquireWriteLock() ;
                try
                {
                    final Iterator<InVMQueueEntry> orderedEntriesIter = orderedEntries.iterator() ;
                    if (orderedEntriesIter.hasNext())
                    {
                        final long now = System.currentTimeMillis() ;
                        do
                        {
                            final InVMQueueEntry queueEntry = orderedEntriesIter.next() ;
                            final long expiryTime = queueEntry.getExpiryTime() ;
                            if (expiryTime <= now)
                            {
                                orderedEntriesIter.remove() ;
                                final InVMEntry entry = queueEntry.getEntry();
                                entry.expire(queueEntry) ;
                                clearEntry(queueEntry.getEntry(), null) ;
                            }
                            else
                            {
                                nextExpiry = expiryTime ;
                                break ;
                            }
                        }
                        while(orderedEntriesIter.hasNext()) ;
                    }
                }
                finally
                {
                    releaseWriteLock() ;
                }
                LOGGER.debug("Reaper thread next expiry: " + nextExpiry) ;
                lock.lock() ;
                try
                {
                    if (!notify && !shutdown.get())
                    {
                        final long delay = nextExpiry - System.currentTimeMillis() ;
                        if (delay > 0)
                        {
                            try
                            {
                                notifyCondition.await(delay, TimeUnit.MILLISECONDS) ;
                            }
                            catch (final InterruptedException ie) {} // ignore
                        }
                    }
                    notify = false ;
                }
                finally
                {
                    lock.unlock() ;
                }
            }
        }
       
        /**
         * Notify the reaper that entries have been updated.
         * Must not be called with locks held.
         */
        void entriesUpdated()
        {
            lock.lock() ;
            try
            {
                notify = true ;
                notifyCondition.signal() ;
            }
            finally
            {
                lock.unlock();
            }
        }
       
        /**
         * Indicate a reaper shutdown.
         */
        void shutdown()
        {
            shutdown.set(true) ;
            lock.lock() ;
            try
            {
                notifyCondition.signal() ;
            }
            finally
            {
                lock.unlock();
            }
        }
    }
   
    {
        final PropertyManager prop = ModulePropertyManager.getPropertyManager(ModulePropertyManager.TRANSPORTS_MODULE);
        final String value = prop.getProperty(Environment.INVM_EXPIRY_TIME);
       
        long expiryTime = DEFAULT_EXPIRY_TIME ;
        if (value != null)
        {
            try
            {
                final long parsedExpiryTime = Long.parseLong(value);
                if (parsedExpiryTime > 0)
                {
                    expiryTime = parsedExpiryTime ;
                }
                else
                {
                    LOGGER.warn("Invalid InVM expiry time, using default") ;
                }
            }
            catch (final NumberFormatException ex)
            {
                LOGGER.warn("Failed to parse InVM expiry time, using default") ;
            }
        }
        this.expiryTime = expiryTime ;
        LOGGER.debug("InVM expiry time set to " + expiryTime) ;
    }
}
TOP

Related Classes of org.jboss.internal.soa.esb.couriers.transport.InVMTemporaryTransport$InVMQueueEntry

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.