/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.codehaus.activemq.transport.reliable;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.message.Packet;
import org.codehaus.activemq.message.PacketListener;
import org.codehaus.activemq.message.Receipt;
import org.codehaus.activemq.message.WireFormat;
import org.codehaus.activemq.transport.TransportChannel;
import org.codehaus.activemq.transport.TransportStatusEvent;
import org.codehaus.activemq.transport.composite.CompositeTransportChannel;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import java.net.URI;
import java.util.LinkedList;
import java.util.List;
/**
* A Compsite implementation of a TransportChannel
*
* @version $Revision: 1.4 $
*/
public class ReliableTransportChannel extends CompositeTransportChannel implements PacketListener, ExceptionListener {
private static final Log log = LogFactory.getLog(ReliableTransportChannel.class);
private Object lock = new Object();
private LinkedList packetList = new LinkedList();
private boolean cacheMessagesForFailover;
/**
* Constructor for ReliableTransportChannel
*
* @param wireFormat
* @param uris
* @throws JMSException
*/
public ReliableTransportChannel(WireFormat wireFormat, URI[] uris) throws JMSException {
super(wireFormat, uris);
establishConnection();
}
/**
* @return pretty print for this
*/
public String toString() {
return "ReliableTransportChannel: " + channel;
}
/**
* start the connection
*
* @throws JMSException
*/
public void start() throws JMSException {
if (started.commit(false, true)) {
if (channel != null) {
channel.start();
}
}
}
/**
* @param packet
* @param timeout
* @return @throws JMSException
*/
public Receipt send(Packet packet, int timeout) throws JMSException {
TransportChannel tc = this.channel;
while (true) {
try {
return tc.send(packet, timeout);
}
catch (JMSException jmsEx) {
doReconnect(tc);
}
}
}
public void asyncSend(Packet packet) throws JMSException {
TransportChannel tc = this.channel;
while (true) {
try {
tc.asyncSend(packet);
break;
}
catch (JMSException jmsEx) {
doReconnect(tc);
}
}
}
protected void configureChannel() {
channel.setPacketListener(this);
channel.setExceptionListener(this);
}
protected URI extractURI(List list) throws JMSException {
int idx = 0;
if (list.size() > 1) {
SMLCGRandom rand = new SMLCGRandom();
do {
idx = (int) (rand.nextDouble() * list.size());
}
while (idx < 0 || idx >= list.size());
}
return (URI) list.remove(idx);
}
/**
* consume a packet from the enbedded channel
*
* @param packet to consume
*/
public void consume(Packet packet) {
//do processing
//avoid a lock
PacketListener listener = getPacketListener();
if (listener != null) {
listener.consume(packet);
}
}
/**
* handle exception from the embedded channel
*
* @param jmsEx
*/
public void onException(JMSException jmsEx) {
TransportChannel tc = this.channel;
try {
doReconnect(tc);
}
catch (JMSException ex) {
ex.setLinkedException(jmsEx);
fireException(ex);
}
}
public void stop() {
super.stop();
fireStatusEvent(super.currentURI, TransportStatusEvent.DISCONNECTED);
}
/**
* Fire a JMSException to the exception listener
*
* @param jmsEx
*/
protected void fireException(JMSException jmsEx) {
ExceptionListener listener = getExceptionListener();
if (listener != null) {
listener.onException(jmsEx);
}
}
protected void doReconnect(TransportChannel currentChannel) throws JMSException {
if (!closed.get()) {
synchronized (lock) {
//Loss of connectivity can be signalled from more than one
//thread - hence the check here - we want to avoid doing it more than once
if (this.channel == currentChannel) {
fireStatusEvent(super.currentURI, TransportStatusEvent.DISCONNECTED);
try {
establishConnection();
}
catch (JMSException jmsEx) {
fireStatusEvent(super.currentURI, TransportStatusEvent.FAILED);
throw jmsEx;
}
fireStatusEvent(super.currentURI, TransportStatusEvent.RECONNECTED);
}
}
}
}
}