Package org.codehaus.activemq.transport.jgroups

Source Code of org.codehaus.activemq.transport.jgroups.JGroupsTransportChannel

/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.codehaus.activemq.transport.jgroups;

import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
import EDU.oswego.cs.dl.util.concurrent.Executor;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.message.Packet;
import org.codehaus.activemq.message.WireFormat;
import org.codehaus.activemq.transport.TransportChannelSupport;
import org.codehaus.activemq.util.JMSExceptionHelper;
import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.ChannelClosedException;
import org.jgroups.ChannelException;
import org.jgroups.ChannelNotConnectedException;
import org.jgroups.Message;
import org.jgroups.TimeoutException;

import javax.jms.JMSException;
import java.io.IOException;

/**
* A JGroups implementation of a TransportChannel
*
* @version $Revision: 1.9 $
*/
public class JGroupsTransportChannel extends TransportChannelSupport implements Runnable {
    private static final Log log = LogFactory.getLog(JGroupsTransportChannel.class);

    private Channel channel;
    private Address localAddress = null;
    private WireFormat wireFormat;
    private SynchronizedBoolean closed;
    private SynchronizedBoolean started;
    private Object outboundLock;
    private Executor executor;
    private Thread thread; //need to change this - and use a thread pool
    private boolean useAsyncSend = false;

    public JGroupsTransportChannel(WireFormat wireFormat, Channel channel, Executor executor) {
        this.wireFormat = wireFormat;
        this.channel = channel;
        this.executor = executor;
        this.localAddress = channel.getLocalAddress();

        closed = new SynchronizedBoolean(false);
        started = new SynchronizedBoolean(false);
        outboundLock = new Object();
        if (useAsyncSend) {
            executor = new PooledExecutor(new BoundedBuffer(1000), 1);
        }
    }

    public String toString() {
        return "JGroupsTransportChannel: " + channel;
    }

    /**
     * close the channel
     */
    public void stop() {
        if (closed.commit(false, true)) {
            super.stop();
            try {
                stopExecutor(executor);
                channel.disconnect();
                channel.close();
            }
            catch (Exception e) {
                log.warn("Caught while closing: " + e + ". Now Closed", e);
            }
        }
    }

    /**
     * start listeneing for events
     *
     * @throws javax.jms.JMSException if an error occurs
     */
    public void start() throws JMSException {
        if (started.commit(false, true)) {
            thread = new Thread(this, toString());
            if (isServerSide()) {
                thread.setDaemon(true);
            }
            thread.start();
        }
    }


    /**
     * Asynchronously send a Packet
     *
     * @param packet
     * @throws javax.jms.JMSException
     */
    public void asyncSend(final Packet packet) throws JMSException {
        if (executor != null) {
            try {
                executor.execute(new Runnable() {
                    public void run() {
                        try {
                            writePacket(packet);
                        }
                        catch (JMSException e) {
                            onAsyncException(e);
                        }
                    }
                });
            }
            catch (InterruptedException e) {
                log.info("Caught: " + e, e);
            }
        }
        else {
            writePacket(packet);
        }
    }


    public boolean isMulticast() {
        return true;
    }
   
    /**
     * Can this wireformat process packets of this version
     * @param version the version number to test
     * @return true if can accept the version
     */
    public boolean canProcessWireFormatVersion(int version){
        return wireFormat.canProcessWireFormatVersion(version);
    }
   
    /**
     * @return the current version of this wire format
     */
    public int getCurrentWireFormatVersion(){
        return wireFormat.getCurrentWireFormatVersion();
    }

    /**
     * reads packets from a Socket
     */
    public void run() {
        log.trace("JGroups consumer thread starting");
        while (!closed.get()) {
            try {
                Object value = channel.receive(0L);
                if (value instanceof Message) {
                    Message message = (Message) value;

                    // lets discard messages coming from the local address
                    // to avoid infinite loops when used with the JMS broker
                    if (!localAddress.equals(message.getSrc())) {
                        byte[] data = message.getBuffer();
                        Packet packet = wireFormat.fromBytes(data);
                        if (packet != null) {
                            doConsumePacket(packet);
                        }
                    }
                }
                /*
                else {
                    String type = "";
                    if (value != null) {
                        type = " of type: " + value.getClass();
                    }
                    log.warn("Expected instanceof Message but received: " + value + type);
                }
                */
            }
            catch (IOException e) {
                doClose(e);
            }
            catch (ChannelClosedException e) {
                stop();
            }
            catch (ChannelNotConnectedException e) {
                doClose(e);
            }
            catch (TimeoutException e) {
                // ignore timeouts
            }
        }
    }

    /**
     * writes the packet to the channel
     */
    protected void writePacket(Packet packet) throws JMSException {
        try {
            synchronized (outboundLock) {
                Address dest = null;
                Message message = new Message(dest, localAddress, wireFormat.toBytes(packet));
                channel.send(message);
            }
        }
        catch (ChannelException e) {
            throw JMSExceptionHelper.newJMSException("writePacket failed: " + e, e);
        }
        catch (IOException e) {
            throw JMSExceptionHelper.newJMSException("writePacket failed: " + e, e);
        }
    }


    private void doClose(Exception ex) {
        if (!closed.get()) {
            onAsyncException(JMSExceptionHelper.newJMSException("Error reading socket: " + ex, ex));
            stop();
        }
    }
}
TOP

Related Classes of org.codehaus.activemq.transport.jgroups.JGroupsTransportChannel

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.