Package org.activemq.transport.ibmaio

Source Code of org.activemq.transport.ibmaio.AIOTransportChannel

/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.activemq.transport.ibmaio;

import EDU.oswego.cs.dl.util.concurrent.Latch;
import EDU.oswego.cs.dl.util.concurrent.Sync;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import com.ibm.io.async.AsyncSocketChannel;
import com.ibm.io.async.IAbstractAsyncFuture;
import com.ibm.io.async.ICompletionListener;
import com.ibm.io.async.IConnectionFuture;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.activemq.io.WireFormat;
import org.activemq.message.Packet;
import org.activemq.transport.TransportChannelSupport;
import org.activemq.transport.TransportStatusEventListener;
import org.activemq.util.JMSExceptionHelper;

import javax.jms.JMSException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;

/**
* An Async IO implementation of a TransportChannel using IBM's AIO4J library
*
* @version $Revision: 1.1 $
*/
public class AIOTransportChannel extends TransportChannelSupport implements ICompletionListener {
    private static final int SOCKET_BUFFER_SIZE = 32 * 1024;
    private static final Log log = LogFactory.getLog(AIOTransportChannel.class);

    protected AsyncSocketChannel channel;
    private WireFormat wireFormat;
    private SynchronizedBoolean closed;
    private SynchronizedBoolean started;
    private Sync readSync;
    private Sync writeSync;

    protected AIOTransportChannel(WireFormat wireFormat) {
        this.wireFormat = wireFormat;
        closed = new SynchronizedBoolean(false);
        started = new SynchronizedBoolean(false);
        readSync = new Latch();
        writeSync = new Latch();
    }

    public AIOTransportChannel(WireFormat wireFormat, URI remoteLocation) throws JMSException {
        this(wireFormat);
        try {
            this.channel = createSocket(remoteLocation);
            configureSocket();
        }
        catch (Exception ioe) {
            throw JMSExceptionHelper.newJMSException("Initialization of TcpTransportChannel failed: " + ioe, ioe);
        }
    }

    public AIOTransportChannel(WireFormat wireFormat, URI remoteLocation, URI localLocation) throws JMSException {
        this(wireFormat);
        try {
            this.channel = createSocket(remoteLocation, localLocation);
            configureSocket();
        }
        catch (Exception ioe) {
            throw JMSExceptionHelper.newJMSException("Initialization of TcpTransportChannel failed: " + ioe, ioe);
        }
    }

    public AIOTransportChannel(WireFormat wireFormat, AsyncSocketChannel channel) throws JMSException {
        this(wireFormat);
        this.channel = channel;
        try {
            configureSocket();
        }
        catch (IOException ioe) {
            throw JMSExceptionHelper.newJMSException("Initialization of TcpTransportChannel failed: " + ioe, ioe);
        }
    }

    public String toString() {
        return "AIOTransportChannel: " + channel;
    }

    public void stop() {
        if (closed.commit(false, true)) {
            try {
                channel.close();
                super.stop();
            }
            catch (Exception e) {
                log.warn("Caught while closing: " + e + ". Now Closed", e);
            }
        }
    }

    public void start() throws JMSException {
        if (started.commit(false, true)) {
        }
    }


    public void asyncSend(Packet packet) throws JMSException {
        try {
            byte[] data = wireFormat.toBytes(packet);

            // TODO could we use the following code
            // ByteBuffer buffer = ByteBuffer.wrap(data);
            // I suspect not as its not a 'direct' buffer

            ByteBuffer buffer = ByteBuffer.allocateDirect(data.length);
            buffer.put(data);

            // lets wait until the buffer is free...
            while (true) {
                try {
                    writeSync.acquire();
                    break;
                }
                catch (InterruptedException e) {
                    // ignore
                }
            }
            channel.write(buffer);
        }
        catch (IOException e) {
            throw JMSExceptionHelper.newJMSException("syncSend failed " + e.getMessage(), e);
        }
    }

    public boolean isMulticast() {
        return false;
    }

    public void addTransportStatusEventListener(TransportStatusEventListener listener) {
        /** TODO */
    }

    public void removeTransportStatusEventListener(TransportStatusEventListener listener) {
        /** TODO */
    }

    public void forceDisconnect() {
        /** TODO */
    }

    public boolean canProcessWireFormatVersion(int version) {
        return wireFormat.canProcessWireFormatVersion(version);
    }

    public int getCurrentWireFormatVersion() {
        return wireFormat.getCurrentWireFormatVersion();
    }

    public void futureCompleted(IAbstractAsyncFuture iAbstractAsyncFuture, Object element) {
        // notify other threads that they can now write
        writeSync.release();
    }

    protected AsyncSocketChannel createSocket(URI remoteLocation) throws UnknownHostException, IOException, InterruptedException {
        AsyncSocketChannel answer = AsyncSocketChannel.open();
        IConnectionFuture future = answer.connect(new InetSocketAddress(remoteLocation.getHost(), remoteLocation.getPort()));
        future.waitForCompletion();
        return answer;
    }

    protected AsyncSocketChannel createSocket(URI remoteLocation, URI localLocation) throws IOException, UnknownHostException, InterruptedException {
        return createSocket(remoteLocation);
    }

    protected void configureSocket() throws SocketException {
        channel.socket().setReceiveBufferSize(SOCKET_BUFFER_SIZE);
        channel.socket().setSendBufferSize(SOCKET_BUFFER_SIZE);
    }
}
TOP

Related Classes of org.activemq.transport.ibmaio.AIOTransportChannel

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.