Package org.codehaus.activemq

Source Code of org.codehaus.activemq.ActiveMQSessionExecutor

/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/

package org.codehaus.activemq;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.util.MemoryBoundedQueue;

import javax.jms.JMSException;
import java.util.Iterator;

/**
* A utility class used by the Session for dispatching messages asycnronously to consumers
*
* @version $Revision: 1.4 $
* @see javax.jms.Session
*/
public class ActiveMQSessionExecutor implements Runnable {
    private static final Log log = LogFactory.getLog(ActiveMQSessionExecutor.class);
    private ActiveMQSession session;
    private MemoryBoundedQueue messageQueue;
    private boolean closed;
    private Thread runner;
    private boolean dispatchedBySessionPool;
    private boolean optimizedMessageDispatch;

    ActiveMQSessionExecutor(ActiveMQSession session, MemoryBoundedQueue queue) {
        this.session = session;
        this.messageQueue = queue;
    }

    void setDispatchedBySessionPool(boolean value) {
        dispatchedBySessionPool = value;
    }
   
    /**
     * @return Returns the optimizedMessageDispatch.
     */
    boolean isOptimizedMessageDispatch() {
        return optimizedMessageDispatch;
    }
    /**
     * @param optimizedMessageDispatch The optimizedMessageDispatch to set.
     */
    void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) {
        this.optimizedMessageDispatch = optimizedMessageDispatch;
    }

    void execute(ActiveMQMessage message) {
        if (optimizedMessageDispatch && !dispatchedBySessionPool){
            dispatch(message);
        }else {
            messageQueue.enqueue(message);
        }
      
    }

    void executeFirst(ActiveMQMessage message) {
        messageQueue.enqueueFirstNoBlock(message);
    }

    /**
     * implementation of Runnable
     */
    public void run() {
        while (!closed && !dispatchedBySessionPool) {
            ActiveMQMessage message = null;
            try {
                message = (ActiveMQMessage) messageQueue.dequeue(100);
            }
            catch (InterruptedException ie) {
            }
            if (!closed) {
                if (message != null) {
                    if (!dispatchedBySessionPool) {
                        dispatch(message);
                    }
                    else {
                        messageQueue.enqueueFirstNoBlock(message);
                    }
                }
            }
        }
    }
   
    void dispatch(ActiveMQMessage message){
        for (Iterator i = this.session.consumers.iterator(); i.hasNext();) {
            ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
            if (message.isConsumerTarget(consumer.getConsumerNumber())) {
                try {
                    consumer.processMessage(message.shallowCopy());
                }
                catch (JMSException e) {
                    this.session.connection.handleAsyncException(e);
                }
            }
        }
    }

    synchronized void start() {
        messageQueue.start();
        if (runner == null && (!dispatchedBySessionPool || optimizedMessageDispatch)) {
            runner = new Thread(this, "JmsSessionDispatcher: " + session.getSessionId());
            runner.setPriority(Thread.MAX_PRIORITY);
            //runner.setDaemon(true);
            runner.start();
        }
    }

    synchronized void stop() {
        messageQueue.stop();
    }

    synchronized void close() {
        closed = true;
        messageQueue.close();
    }

    void clear() {
        messageQueue.clear();
    }

    ActiveMQMessage dequeueNoWait() {
        try {
            return (ActiveMQMessage) messageQueue.dequeueNoWait();
        }
        catch (InterruptedException ie) {
            return null;
        }
    }
   
    protected void clearMessagesInProgress(){
        messageQueue.clear();
    }
   
   
}
TOP

Related Classes of org.codehaus.activemq.ActiveMQSessionExecutor

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.