/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.codehaus.activemq.transport.jgroups;
import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
import EDU.oswego.cs.dl.util.concurrent.Executor;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.message.Packet;
import org.codehaus.activemq.message.WireFormat;
import org.codehaus.activemq.transport.TransportChannelSupport;
import org.codehaus.activemq.util.JMSExceptionHelper;
import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.ChannelClosedException;
import org.jgroups.ChannelException;
import org.jgroups.ChannelNotConnectedException;
import org.jgroups.Message;
import org.jgroups.TimeoutException;
import javax.jms.JMSException;
import java.io.IOException;
/**
* A JGroups implementation of a TransportChannel
*
* @version $Revision: 1.9 $
*/
public class JGroupsTransportChannel extends TransportChannelSupport implements Runnable {
private static final Log log = LogFactory.getLog(JGroupsTransportChannel.class);
private Channel channel;
private Address localAddress = null;
private WireFormat wireFormat;
private SynchronizedBoolean closed;
private SynchronizedBoolean started;
private Object outboundLock;
private Executor executor;
private Thread thread; //need to change this - and use a thread pool
private boolean useAsyncSend = false;
public JGroupsTransportChannel(WireFormat wireFormat, Channel channel, Executor executor) {
this.wireFormat = wireFormat;
this.channel = channel;
this.executor = executor;
this.localAddress = channel.getLocalAddress();
closed = new SynchronizedBoolean(false);
started = new SynchronizedBoolean(false);
outboundLock = new Object();
if (useAsyncSend) {
executor = new PooledExecutor(new BoundedBuffer(1000), 1);
}
}
public String toString() {
return "JGroupsTransportChannel: " + channel;
}
/**
* close the channel
*/
public void stop() {
if (closed.commit(false, true)) {
super.stop();
try {
stopExecutor(executor);
channel.disconnect();
channel.close();
}
catch (Exception e) {
log.warn("Caught while closing: " + e + ". Now Closed", e);
}
}
}
/**
* start listeneing for events
*
* @throws javax.jms.JMSException if an error occurs
*/
public void start() throws JMSException {
if (started.commit(false, true)) {
thread = new Thread(this, toString());
if (isServerSide()) {
thread.setDaemon(true);
}
thread.start();
}
}
/**
* Asynchronously send a Packet
*
* @param packet
* @throws javax.jms.JMSException
*/
public void asyncSend(final Packet packet) throws JMSException {
if (executor != null) {
try {
executor.execute(new Runnable() {
public void run() {
try {
writePacket(packet);
}
catch (JMSException e) {
onAsyncException(e);
}
}
});
}
catch (InterruptedException e) {
log.info("Caught: " + e, e);
}
}
else {
writePacket(packet);
}
}
public boolean isMulticast() {
return true;
}
/**
* Can this wireformat process packets of this version
* @param version the version number to test
* @return true if can accept the version
*/
public boolean canProcessWireFormatVersion(int version){
return wireFormat.canProcessWireFormatVersion(version);
}
/**
* @return the current version of this wire format
*/
public int getCurrentWireFormatVersion(){
return wireFormat.getCurrentWireFormatVersion();
}
/**
* reads packets from a Socket
*/
public void run() {
log.trace("JGroups consumer thread starting");
while (!closed.get()) {
try {
Object value = channel.receive(0L);
if (value instanceof Message) {
Message message = (Message) value;
// lets discard messages coming from the local address
// to avoid infinite loops when used with the JMS broker
if (!localAddress.equals(message.getSrc())) {
byte[] data = message.getBuffer();
Packet packet = wireFormat.fromBytes(data);
if (packet != null) {
doConsumePacket(packet);
}
}
}
/*
else {
String type = "";
if (value != null) {
type = " of type: " + value.getClass();
}
log.warn("Expected instanceof Message but received: " + value + type);
}
*/
}
catch (IOException e) {
doClose(e);
}
catch (ChannelClosedException e) {
stop();
}
catch (ChannelNotConnectedException e) {
doClose(e);
}
catch (TimeoutException e) {
// ignore timeouts
}
}
}
/**
* writes the packet to the channel
*/
protected void writePacket(Packet packet) throws JMSException {
try {
synchronized (outboundLock) {
Address dest = null;
Message message = new Message(dest, localAddress, wireFormat.toBytes(packet));
channel.send(message);
}
}
catch (ChannelException e) {
throw JMSExceptionHelper.newJMSException("writePacket failed: " + e, e);
}
catch (IOException e) {
throw JMSExceptionHelper.newJMSException("writePacket failed: " + e, e);
}
}
private void doClose(Exception ex) {
if (!closed.get()) {
onAsyncException(JMSExceptionHelper.newJMSException("Error reading socket: " + ex, ex));
stop();
}
}
}