Package org.codehaus.activemq.transport.reliable

Source Code of org.codehaus.activemq.transport.reliable.ReliableTransportChannel

/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/

package org.codehaus.activemq.transport.reliable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.TimeoutExpiredException;
import org.codehaus.activemq.UnsupportedWireFormatException;
import org.codehaus.activemq.message.Packet;
import org.codehaus.activemq.message.PacketListener;
import org.codehaus.activemq.message.Receipt;
import org.codehaus.activemq.message.WireFormat;
import org.codehaus.activemq.transport.TransportChannel;
import org.codehaus.activemq.transport.TransportStatusEvent;
import org.codehaus.activemq.transport.composite.CompositeTransportChannel;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import java.net.URI;
import java.util.LinkedList;
import java.util.List;

/**
* A Compsite implementation of a TransportChannel
*
* @version $Revision: 1.11 $
*/
public class ReliableTransportChannel extends CompositeTransportChannel implements PacketListener, ExceptionListener {
    private static final Log log = LogFactory.getLog(ReliableTransportChannel.class);
    private Object lock = new Object();
    private LinkedList packetList = new LinkedList();
    private boolean cacheMessagesForFailover;

    /**
     * Construct this transport
     *
     * @param wireFormat
     */
    public ReliableTransportChannel(WireFormat wireFormat) {
        super(wireFormat);
    }

    /**
     * Construct this transport
     *
     * @param wireFormat
     * @param uris
     */
    public ReliableTransportChannel(WireFormat wireFormat, List uris) {
        super(wireFormat, uris);
    }

    /**
     * @return pretty print for this
     */
    public String toString() {
        return "ReliableTransportChannel: " + channel;
    }

    /**
     * start the connection
     *
     * @throws JMSException
     */
    public void start() throws JMSException {
        if (started.commit(false, true)) {
            if (channel != null) {
                channel.start();
            }
        }
    }

    /**
     * @param packet
     * @param timeout
     * @return receipt - or null
     * @throws JMSException
     */
    public Receipt send(Packet packet, int timeout) throws JMSException {
        do {
            TransportChannel tc = getEstablishedChannel(timeout);
            if (tc != null) {
                try {
                    return tc.send(packet, timeout);
                }
                catch (TimeoutExpiredException e) {
                    throw e;
                }
                catch (UnsupportedWireFormatException uwf) {
                    throw uwf;
                }
                catch (JMSException jmsEx) {
                    if (isPendingStop()) {
                        break;
                    }
                    doReconnect(tc, timeout);
                }
            }
        }
        while (!closed.get() && !isPendingStop());
        return null;
    }

    /**
     * @param packet
     * @throws JMSException
     */
    public void asyncSend(Packet packet) throws JMSException {
        long timeout = getEstablishConnectionTimeout();
        do {
            TransportChannel tc = getEstablishedChannel(timeout);
            if (tc != null) {
                try {
                    tc.asyncSend(packet);
                    break;
                }
                catch (TimeoutExpiredException e) {
                    throw e;
                }
                catch (UnsupportedWireFormatException uwf) {
                    throw uwf;
                }
                catch (JMSException jmsEx) {
                    if (isPendingStop()) {
                        break;
                    }
                    doReconnect(tc, timeout);
                }
            }
        }
        while (!closed.get() && !isPendingStop());
    }

    protected void configureChannel() {
        channel.setPacketListener(this);
        channel.setExceptionListener(this);
    }

    protected URI extractURI(List list) throws JMSException {
        int idx = 0;
        if (list.size() > 1) {
            SMLCGRandom rand = new SMLCGRandom();
            do {
                idx = (int) (rand.nextDouble() * list.size());
            }
            while (idx < 0 || idx >= list.size());
        }
        Object answer = list.remove(idx);
        if (answer instanceof URI) {
            return (URI) answer;
        }
        else {
            log.error("#### got: " + answer + " of type: " + answer.getClass());
            return null;
        }
    }

    /**
     * consume a packet from the enbedded channel
     *
     * @param packet to consume
     */
    public void consume(Packet packet) {
        //do processing
        //avoid a lock
        PacketListener listener = getPacketListener();
        if (listener != null) {
            listener.consume(packet);
        }
    }

    /**
     * handle exception from the embedded channel
     *
     * @param jmsEx
     */
    public void onException(JMSException jmsEx) {
        TransportChannel tc = this.channel;
        if (jmsEx instanceof UnsupportedWireFormatException) {
            fireException(jmsEx);
        }
        else {
            try {
                doReconnect(tc, getEstablishConnectionTimeout());
            }
            catch (JMSException ex) {
                ex.setLinkedException(jmsEx);
                fireException(ex);
            }
        }
    }

    /**
     * stop this channel
     */
    public void stop() {
        super.stop();
        fireStatusEvent(super.currentURI, TransportStatusEvent.STOPPED);
    }

    /**
     * Fire a JMSException to the exception listener
     *
     * @param jmsEx
     */
    protected void fireException(JMSException jmsEx) {
        ExceptionListener listener = getExceptionListener();
        if (listener != null) {
            listener.onException(jmsEx);
        }
    }

    protected TransportChannel getEstablishedChannel(long timeout) throws JMSException {
        if (!closed.get() && this.channel == null && !isPendingStop()) {
            establishConnection(timeout);
        }
        return this.channel;
    }

    protected void doReconnect(TransportChannel currentChannel, long timeout) throws JMSException {
        setTransportConnected(false);
        if (!closed.get() && !isPendingStop()) {
            synchronized (lock) {
                //Loss of connectivity can be signalled from more than one
                //thread - hence the check here - we want to avoid doing it more than once
                if (this.channel == currentChannel) {
                    fireStatusEvent(super.currentURI, TransportStatusEvent.DISCONNECTED);
                    try {
                        establishConnection(timeout);
                    }
                    catch (JMSException jmsEx) {
                        fireStatusEvent(super.currentURI, TransportStatusEvent.FAILED);
                        throw jmsEx;
                    }
                    setTransportConnected(true);
                    fireStatusEvent(super.currentURI, TransportStatusEvent.RECONNECTED);
                }
            }
        }
    }
}
TOP

Related Classes of org.codehaus.activemq.transport.reliable.ReliableTransportChannel

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.