Package org.codehaus.activemq

Source Code of org.codehaus.activemq.ActiveMQSessionExecutor

/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/

package org.codehaus.activemq;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.util.MemoryBoundedQueue;

import javax.jms.JMSException;
import java.util.Iterator;

/**
* A utility class used by the Sessionfor dispatching messages asycnronously to consumers
*
* @version $Revision: 1.3 $
* @see javax.jms.Session
*/
class ActiveMQSessionExecutor implements Runnable {
    private static final Log log = LogFactory.getLog(ActiveMQSessionExecutor.class);
    private ActiveMQSession session;
    private MemoryBoundedQueue messageQueue;
    private boolean closed;
    private Thread runner;
    private boolean doDispatch;

    ActiveMQSessionExecutor(ActiveMQSession session, MemoryBoundedQueue queue) {
        this.session = session;
        this.messageQueue = queue;
        this.doDispatch = true;
    }

    void setDoDispatch(boolean value) {
        doDispatch = value;
    }

    void execute(ActiveMQMessage message) {
        messageQueue.enqueue(message);
    }

    void executeFirst(ActiveMQMessage message) {
        messageQueue.enqueueFirstNoBlock(message);
    }

    /**
     * implementation of Runnable
     */
    public void run() {
        while (!closed && doDispatch) {
            ActiveMQMessage message = null;
            try {
                message = (ActiveMQMessage) messageQueue.dequeue(100);
            }
            catch (InterruptedException ie) {
            }
            if (!closed) {
                if (message != null) {
                    if (doDispatch) {
                        for (Iterator i = this.session.consumers.iterator(); i.hasNext();) {
                            ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
                            if (message.isConsumerTarget(consumer.getConsumerNumber())) {
                                try {
                                    consumer.processMessage(message.shallowCopy());
                                }
                                catch (JMSException e) {
                                    this.session.connection.handleAsyncException(e);
                                }
                            }
                        }
                    }
                    else {
                        messageQueue.enqueueFirstNoBlock(message);
                    }
                }
            }
        }
    }

    synchronized void start() {
        messageQueue.start();
        if (runner == null && doDispatch) {
            runner = new Thread(this, "JmsSessionDispather: " + session.getSessionId());
            runner.setPriority(Thread.MAX_PRIORITY);
            //runner.setDaemon(true);
            runner.start();
        }
    }

    synchronized void stop() {
        messageQueue.stop();
    }

    synchronized void close() {
        closed = true;
        messageQueue.close();
    }

    void clear() {
        messageQueue.clear();
    }

    ActiveMQMessage dequeueNoWait() {
        try {
            return (ActiveMQMessage) messageQueue.dequeueNoWait();
        }
        catch (InterruptedException ie) {
            return null;
        }
    }
   
    protected void clearMessagesInProgress(){
        messageQueue.clear();
    }
}
TOP

Related Classes of org.codehaus.activemq.ActiveMQSessionExecutor

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.