Package org.codehaus.activemq.transport.composite

Source Code of org.codehaus.activemq.transport.composite.CompositeTransportChannel

/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.codehaus.activemq.transport.composite;

import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.message.Packet;
import org.codehaus.activemq.message.PacketListener;
import org.codehaus.activemq.message.Receipt;
import org.codehaus.activemq.message.WireFormat;
import org.codehaus.activemq.transport.AbstractTransportChannel;
import org.codehaus.activemq.transport.TransportChannel;
import org.codehaus.activemq.transport.TransportChannelProvider;

import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
* A Compsite implementation of a TransportChannel
*
* @version $Revision: 1.6 $
*/
public class CompositeTransportChannel extends AbstractTransportChannel {
    private static final Log log = LogFactory.getLog(CompositeTransportChannel.class);

    protected WireFormat wireFormat;
    protected URI[] uris;
    protected TransportChannel channel;
    protected SynchronizedBoolean closed;
    protected SynchronizedBoolean started;
    protected int retryCount = 10;
    protected long failureSleepTime = 500L;
    protected URI currentURI;


    public CompositeTransportChannel(WireFormat wireFormat, URI[] uris) {
        this.wireFormat = wireFormat;
        this.uris = uris;
        closed = new SynchronizedBoolean(false);
        started = new SynchronizedBoolean(false);
    }

    public String toString() {
        return "CompositeTransportChannel: " + channel;
    }

    public void start() throws JMSException {
        if (started.commit(false, true)) {
            establishConnection();
        }
    }

    /**
     * close the channel
     */
    public void stop() {
        if (closed.commit(false, true)) {
            if (channel != null) {
                try {
                    channel.stop();
                }
                catch (Exception e) {
                    log.warn("Caught while closing: " + e + ". Now Closed", e);
                }
                finally {
                    channel = null;
                    super.stop();
                }
            }
        }
    }

    public Receipt send(Packet packet) throws JMSException {
        return getChannel().send(packet);
    }


    public Receipt send(Packet packet, int timeout) throws JMSException {
        return getChannel().send(packet, timeout);
    }


    public void asyncSend(Packet packet) throws JMSException {
        getChannel().asyncSend(packet);
    }

    public void setPacketListener(PacketListener listener) {
        super.setPacketListener(listener);
        if (channel != null) {
            channel.setPacketListener(listener);
        }
    }


    public void setExceptionListener(ExceptionListener listener) {
        super.setExceptionListener(listener);
        if (channel != null) {
            channel.setExceptionListener(listener);
        }
    }


    public boolean isMulticast() {
        return false;
    }


    // Implementation methods
    //-------------------------------------------------------------------------
   
    protected void establishConnection() throws JMSException {

        // lets try connect
        boolean connected = false;
        long time = failureSleepTime;
        for (int i = 0; !connected && i < retryCount; i++) {
            if (i > 0) {
                log.info("Sleeping for: " + time + " millis and trying again");
                try {
                    Thread.sleep(time);
                }
                catch (InterruptedException e) {
                    log.warn("Sleep interupted: " + e, e);
                }
                time *= 2;
            }

            List list = new ArrayList(Arrays.asList(uris));
            while (!connected && !list.isEmpty()) {
                URI uri = extractURI(list);
                try {
                    attemptToConnect(uri);
                    configureChannel();
                    connected = true;
                    currentURI = uri;
                }
                catch (JMSException e) {
                    log.info("Could not connect to: " + uri + ". Reason: " + e);
                }
            }

        }
        if (!connected) {
            StringBuffer buffer = new StringBuffer("");
            for (int i = 0; i < uris.length; i++) {
                buffer.append(uris[i]);
                if (i < (uris.length - 1)) {
                    buffer.append(",");
                }
            }
            JMSException jmsEx = new JMSException("Failed to connect to resource(s): " + buffer.toString());
            throw jmsEx;
        }

    }

    protected TransportChannel getChannel() throws JMSException {
        if (channel == null) {
            throw new JMSException("No TransportChannel connection available");
        }
        return channel;
    }

    protected void configureChannel() {
        ExceptionListener exceptionListener = getExceptionListener();
        if (exceptionListener != null) {
            channel.setExceptionListener(exceptionListener);
        }
        PacketListener packetListener = getPacketListener();
        if (packetListener != null) {
            channel.setPacketListener(packetListener);
        }
    }


    protected URI extractURI(List list) throws JMSException {
        int idx = 0;
        if (list.size() > 1) {
            do {
                idx = (int) (Math.random() * list.size());
            }
            while (idx < 0 || idx >= list.size());
        }
        return (URI) list.remove(idx);
    }

    protected void attemptToConnect(URI uri) throws JMSException {
        channel = TransportChannelProvider.create(wireFormat, uri);
        if (started.get()) {
            channel.start();
        }
    }
}
TOP

Related Classes of org.codehaus.activemq.transport.composite.CompositeTransportChannel

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.