/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.codehaus.activemq.broker.impl;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.security.auth.Subject;
import javax.transaction.xa.XAException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.broker.BrokerClient;
import org.codehaus.activemq.broker.BrokerConnector;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ActiveMQXid;
import org.codehaus.activemq.message.BrokerInfo;
import org.codehaus.activemq.message.CapacityInfo;
import org.codehaus.activemq.message.ConnectionInfo;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.DurableUnsubscribe;
import org.codehaus.activemq.message.IntResponseReceipt;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.message.Packet;
import org.codehaus.activemq.message.PacketListener;
import org.codehaus.activemq.message.ProducerInfo;
import org.codehaus.activemq.message.Receipt;
import org.codehaus.activemq.message.ResponseReceipt;
import org.codehaus.activemq.message.SessionInfo;
import org.codehaus.activemq.message.TransactionInfo;
import org.codehaus.activemq.message.XATransactionInfo;
import org.codehaus.activemq.message.util.BoundedPacketQueue;
import org.codehaus.activemq.message.util.SpooledBoundedPacketQueue;
import org.codehaus.activemq.transport.NetworkChannel;
import org.codehaus.activemq.transport.NetworkConnector;
import org.codehaus.activemq.transport.TransportChannel;
import org.codehaus.activemq.util.IdGenerator;
import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import EDU.oswego.cs.dl.util.concurrent.ThreadedExecutor;
/**
* A Broker client side proxy representing a JMS Connnection
*
* @version $Revision: 1.9 $
*/
public class BrokerClientImpl implements BrokerClient, ExceptionListener, PacketListener {
private static final Log log = LogFactory.getLog(BrokerClientImpl.class);
private BrokerConnector brokerConnector;
private TransportChannel channel;
private ConnectionInfo connectionInfo;
private IdGenerator packetIdGenerator;
private SynchronizedBoolean closed;
private Set activeConsumers;
private CopyOnWriteArrayList consumers;
private CopyOnWriteArrayList producers;
private CopyOnWriteArrayList transactions;
private CopyOnWriteArrayList sessions;
private boolean started;
private boolean brokerConnection;
private boolean clusteredConnection;
private String remoteBrokerName;
private int capacity = 100;
private SpooledBoundedPacketQueue spoolQueue;
private boolean cleanedUp;
private boolean registered;
private ArrayList dispatchQueue = new ArrayList();
private Subject subject;
private boolean remoteNetworkConnector;
/**
* Default Constructor of BrokerClientImpl
*/
public BrokerClientImpl() {
this.packetIdGenerator = new IdGenerator();
this.closed = new SynchronizedBoolean(false);
this.activeConsumers = new HashSet();
this.consumers = new CopyOnWriteArrayList();
this.producers = new CopyOnWriteArrayList();
this.transactions = new CopyOnWriteArrayList();
this.sessions = new CopyOnWriteArrayList();
}
/**
* Initialize the BrokerClient
*
* @param brokerConnector
* @param channel
*/
public void initialize(BrokerConnector brokerConnector, TransportChannel channel) {
this.brokerConnector = brokerConnector;
this.channel = channel;
this.channel.setPacketListener(this);
this.channel.setExceptionListener(this);
log.trace("brokerConnectorConnector client initialized");
}
/**
* @return the BrokerConnector this client is associated with
*/
public BrokerConnector getBrokerConnector(){
return this.brokerConnector;
}
/**
* @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
*/
public void onException(JMSException jmsEx) {
log.info("Client disconnected: "+this);
log.debug("Disconnect cuase: ", jmsEx);
close();
}
/**
* @return pretty print for this brokerConnector-client
*/
public String toString() {
String str = "brokerConnector-client:("+hashCode()+") ";
str += connectionInfo == null ? "" : connectionInfo.getClientId();
str += ": " + channel;
return str;
}
/**
* Dispatch an ActiveMQMessage to the end client
*
* @param message
*/
public void dispatch(ActiveMQMessage message) {
if ( !isSlowConsumer() ) {
dispatchToClient(message);
}else{
if (spoolQueue == null) {
log.warn("Connection: " + connectionInfo.getClientId() + " is a slow consumer");
String spoolName = brokerConnector.getBrokerInfo().getBrokerName() + "_" + connectionInfo.getClientId();
try {
spoolQueue = new SpooledBoundedPacketQueue(brokerConnector.getBrokerContainer().getBroker()
.getTempDir(), spoolName);
final BoundedPacketQueue bpq = spoolQueue;
ThreadedExecutor exec = new ThreadedExecutor();
exec.execute(new Runnable() {
public void run() {
while (!closed.get()) {
try {
Packet packet = bpq.dequeue();
if (packet != null){
dispatchToClient(packet);
}
}
catch (InterruptedException e) {
log.warn("async dispatch got an interupt", e);
}
catch (JMSException e) {
log.error("async dispatch got an problem", e);
}
}
}
});
}
catch (IOException e) {
log.error("Could not create SpooledBoundedQueue for this slow consumer", e);
close();
}
catch (InterruptedException e) {
log.error("Could not create SpooledBoundedQueue for this slow consumer", e);
close();
}
}
if (spoolQueue != null) {
try {
spoolQueue.enqueue(message);
}
catch (JMSException e) {
log.error(
"Could not enqueue message " + message + " to SpooledBoundedQueue for this slow consumer",
e);
close();
}
}
}
}
private void dispatchToClient(Packet message) {
boolean started;
synchronized (this) {
started = this.started;
}
if (started) {
send(message);
}
else {
// If the connection is stopped.. we have to hold the message till it is started.
synchronized (this) {
dispatchQueue.add(message);
}
}
}
/**
* @return true if the peer for this Client is itself another Broker
*/
public boolean isBrokerConnection() {
return brokerConnection;
}
/**
* @return true id this client is part of a cluster
*/
public boolean isClusteredConnection(){
return clusteredConnection;
}
/**
* Get the Capacity for in-progress messages at the peer (probably a JMSConnection) Legimate values between 0-100. 0
* capacity representing that the peer cannot process any more messages at the current time
*
* @return
*/
public int getCapacity() {
return capacity;
}
/**
* @return the client id of the remote connection
*/
public String getClientID() {
if (connectionInfo != null) {
return connectionInfo.getClientId();
}
return null;
}
/**
* @return the channel used
*/
public TransportChannel getChannel() {
return channel;
}
/**
* Get an indication if the peer should be considered as a slow consumer
*
* @return true id the peer should be considered as a slow consumer
*/
public boolean isSlowConsumer() {
return capacity <= 20; //don't want to fill the peer completely - as this may effect it's processing!
}
/**
* Consume a Packet from the underlying TransportChannel for processing
*
* @param packet
*/
public void consume(Packet packet) {
if (!closed.get() && packet != null) {
Throwable requestEx = null;
boolean failed = false;
String brokerName = brokerConnector.getBrokerInfo().getBrokerName();
String clusterName = brokerConnector.getBrokerInfo().getClusterName();
try {
if (brokerConnection) {
packet.addBrokerVisited(remoteBrokerName); //got from the remote broker
packet.addBrokerVisited(brokerName);
}
if (packet.isJMSMessage()) {
ActiveMQMessage message = (ActiveMQMessage) packet;
// lets mark all messages from where they came from so that NoLocal can filter out loops
// e.g. when receiving messages over multicast, we don't want to publish them back out again
if (connectionInfo != null) {
message.setProducerID(connectionInfo.getClientId());
}
else {
log.warn("No connection info available! Maybe the client forgot to start() the Connection?");
}
if (!brokerConnection){
message.setEntryBrokerName(brokerName);
message.setEntryClusterName(clusterName);
}
consumeActiveMQMessage(message);
}
else {
switch (packet.getPacketType()) {
case Packet.ACTIVEMQ_MSG_ACK : {
MessageAck ack = (MessageAck) packet;
consumeMessageAck(ack);
break;
}
case Packet.XA_TRANSACTION_INFO : {
XATransactionInfo info = (XATransactionInfo) packet;
consumeXATransactionInfo(info);
break;
}
case Packet.TRANSACTION_INFO : {
TransactionInfo info = (TransactionInfo) packet;
consumeTransactionInfo(info);
break;
}
case Packet.CONSUMER_INFO : {
ConsumerInfo info = (ConsumerInfo) packet;
consumeConsumerInfo(info);
break;
}
case Packet.PRODUCER_INFO : {
ProducerInfo info = (ProducerInfo) packet;
consumeProducerInfo(info);
break;
}
case Packet.SESSION_INFO : {
SessionInfo info = (SessionInfo) packet;
consumeSessionInfo(info);
break;
}
case Packet.ACTIVEMQ_CONNECTION_INFO : {
ConnectionInfo info = (ConnectionInfo) packet;
consumeConnectionInfo(info);
break;
}
case Packet.DURABLE_UNSUBSCRIBE : {
DurableUnsubscribe ds = (DurableUnsubscribe) packet;
brokerConnector.durableUnsubscribe(this, ds);
break;
}
case Packet.CAPACITY_INFO : {
CapacityInfo info = (CapacityInfo) packet;
consumeCapacityInfo(info);
break;
}
case Packet.CAPACITY_INFO_REQUEST : {
updateCapacityInfo(packet.getId());
break;
}
case Packet.ACTIVEMQ_BROKER_INFO : {
consumeBrokerInfo((BrokerInfo) packet);
break;
} case Packet.KEEP_ALIVE: {
// Ignore as the packet contains no additional information to consume
break;
}
default : {
log.warn("Unknown Packet received: " + packet);
break;
}
}
}
}
catch (Throwable e) {
requestEx = e;
log.warn("caught exception consuming packet: " + packet, e);
failed = true;
}
sendReceipt(packet, requestEx, failed);
}
}
/**
* Register/deregister MessageConsumer with the Broker
*
* @param info
* @throws JMSException
*/
public void consumeConsumerInfo(ConsumerInfo info) throws JMSException {
String localBrokerName = brokerConnector.getBrokerInfo().getBrokerName();
if (info.isStarted()) {
consumers.add(info);
if (this.activeConsumers.add(info)) {
this.brokerConnector.registerMessageConsumer(this, info);
}
}
else {
consumers.remove(info);
if (activeConsumers.remove(info)) {
this.brokerConnector.deregisterMessageConsumer(this, info);
}
}
}
/**
* Update the peer Connection about the Broker's capacity for messages
*
* @param capacity
*/
public void updateBrokerCapacity(int capacity) {
CapacityInfo info = new CapacityInfo();
info.setResourceName(this.brokerConnector.getBrokerInfo().getBrokerName());
info.setCapacity(capacity);
info.setFlowControlTimeout(getFlowControlTimeout(capacity));
send(info);
}
/**
* register with the Broker
*
* @param info
* @throws JMSException
*/
public void consumeConnectionInfo(ConnectionInfo info) throws JMSException {
this.connectionInfo = info;
if (info.isClosed()) {
cleanUp();
try {
sendReceipt(info);
info.setReceiptRequired(false);
try {
Thread.sleep(500);
}
catch (Throwable e) {
}
}
finally {
close();
}
}
else {
if( !registered ) {
this.brokerConnector.registerClient(this, info);
registered=true;
}
synchronized( this ) {
if (!started && info.isStarted()) {
started = true;
// Dispatch any queued
log.debug(this + " has started running client version " + info.getClientVersion() + " , wire format = " + info.getWireFormatVersion());
//go through consumers and producers - setting their clientId (which might not have been set)
for (Iterator i = consumers.iterator();i.hasNext();) {
ConsumerInfo ci = (ConsumerInfo) i.next();
ci.setClientId(info.getClientId());
}
for (Iterator i = producers.iterator();i.hasNext();) {
ProducerInfo pi = (ProducerInfo) i.next();
pi.setClientId(info.getClientId());
}
for( int i=0; i < dispatchQueue.size(); i++ ) {
ActiveMQMessage msg = (ActiveMQMessage) dispatchQueue.get(i);
dispatch(msg);
}
dispatchQueue.clear();
}
if(started && !info.isStarted()) {
started=false;
log.debug(this + " has stopped");
}
}
}
}
/**
* start consuming messages
*
* @throws JMSException
*/
public void start() throws JMSException {
channel.start();
}
/**
* stop consuming messages
*
* @throws JMSException
*/
public void stop() throws JMSException {
log.trace("Stopping channel: " + channel);
channel.stop();
}
/**
* cleanup
*/
public synchronized void cleanUp() {
// we could be called here from 2 different code paths
// based on if we get a transport failure or we do a clean shutdown
// so lets only run this stuff once
if (!cleanedUp) {
cleanedUp = true;
try {
try {
for (Iterator i = consumers.iterator();i.hasNext();) {
ConsumerInfo info = (ConsumerInfo) i.next();
info.setStarted(false);
this.brokerConnector.deregisterMessageConsumer(this, info);
}
for (Iterator i = producers.iterator();i.hasNext();) {
ProducerInfo info = (ProducerInfo) i.next();
info.setStarted(false);
this.brokerConnector.deregisterMessageProducer(this, info);
}
for (Iterator i = sessions.iterator();i.hasNext();) {
SessionInfo info = (SessionInfo) i.next();
info.setStarted(false);
this.brokerConnector.deregisterSession(this, info);
}
for (Iterator i = transactions.iterator();i.hasNext();) {
this.brokerConnector.rollbackTransaction(this, i.next().toString());
}
}
finally {
// whatever happens, lets make sure we unregister & clean things down
if (log.isDebugEnabled()) {
log.debug(this + " has stopped");
}
this.consumers.clear();
this.producers.clear();
this.transactions.clear();
this.sessions.clear();
this.brokerConnector.deregisterClient(this, connectionInfo);
registered=false;
}
}
catch (JMSException e) {
log.warn("failed to de-register Broker client: " + e, e);
}
}
else {
log.debug("We are ignoring a duplicate cleanup() method called for: " + this);
}
}
// Implementation methods
//-------------------------------------------------------------------------
protected void send(Packet packet) {
if (!closed.get()) {
try {
if (brokerConnection) {
String brokerName = brokerConnector.getBrokerContainer().getBroker().getBrokerName();
packet.addBrokerVisited(brokerName);
if (packet.hasVisited(remoteBrokerName)) {
if (log.isDebugEnabled()) {
log.debug("Not Forwarding: " + remoteBrokerName + " has already been visited by packet: "
+ packet);
}
return;
}
}
this.channel.asyncSend(packet);
}
catch (JMSException e) {
log.warn(this + " caught exception ", e);
close();
}
}
}
protected void close() {
if (closed.commit(false, true)) {
this.channel.stop();
log.debug(this + " has closed");
}
}
/**
* Send message to Broker
*
* @param message
* @throws JMSException
*/
private void consumeActiveMQMessage(ActiveMQMessage message) throws JMSException {
message = message.shallowCopy();
if (message.isPartOfTransaction()) {
this.brokerConnector.sendTransactedMessage(this, message.getTransactionId(), message);
}
else {
this.brokerConnector.sendMessage(this, message);
}
}
/**
* Send Message acknowledge to the Broker
*
* @param ack
* @throws JMSException
*/
private void consumeMessageAck(MessageAck ack) throws JMSException {
if (ack.isPartOfTransaction()) {
this.brokerConnector.acknowledgeTransactedMessage(this, ack.getTransactionId(), ack);
}
else {
this.brokerConnector.acknowledgeMessage(this, ack);
}
}
/**
* Handle transaction start/commit/rollback
*
* @param info
* @throws JMSException
*/
private void consumeTransactionInfo(TransactionInfo info) throws JMSException {
if (info.getType() == TransactionInfo.START) {
transactions.add(info.getTransactionId());
this.brokerConnector.startTransaction(this, info.getTransactionId());
}
else {
if (info.getType() == TransactionInfo.ROLLBACK) {
this.brokerConnector.rollbackTransaction(this, info.getTransactionId());
}
else if (info.getType() == TransactionInfo.COMMIT) {
this.brokerConnector.commitTransaction(this, info.getTransactionId());
}
transactions.remove(info.getTransactionId());
}
}
/**
* Handle XA transaction start/prepare/commit/rollback
*
* @param info
* @throws JMSException
* @throws XAException
*/
private void consumeXATransactionInfo(XATransactionInfo info) throws JMSException, XAException {
if (info.getType() == XATransactionInfo.START) {
this.brokerConnector.startTransaction(this, info.getXid());
}
else if (info.getType() == XATransactionInfo.XA_RECOVER) {
ActiveMQXid rc[] = this.brokerConnector.getPreparedTransactions(this);
// We will be sending our own receipt..
info.setReceiptRequired(false);
// Send the receipt..
ResponseReceipt receipt = new ResponseReceipt();
receipt.setId(this.packetIdGenerator.generateId());
receipt.setCorrelationId(info.getId());
receipt.setResult(rc);
send(receipt);
}
else if (info.getType() == XATransactionInfo.GET_RM_ID) {
String rc = this.brokerConnector.getResourceManagerId(this);
// We will be sending our own receipt..
info.setReceiptRequired(false);
// Send the receipt..
ResponseReceipt receipt = new ResponseReceipt();
receipt.setId(this.packetIdGenerator.generateId());
receipt.setCorrelationId(info.getId());
receipt.setResult(rc);
send(receipt);
}
else if (info.getType() == XATransactionInfo.END) {
// we don't do anything..
}
else {
if (info.getType() == XATransactionInfo.PRE_COMMIT) {
int rc = this.brokerConnector.prepareTransaction(this, info.getXid());
// We will be sending our own receipt..
info.setReceiptRequired(false);
// Send the receipt..
IntResponseReceipt receipt = new IntResponseReceipt();
receipt.setId(this.packetIdGenerator.generateId());
receipt.setCorrelationId(info.getId());
receipt.setResult(rc);
send(receipt);
}
else if (info.getType() == XATransactionInfo.ROLLBACK) {
this.brokerConnector.rollbackTransaction(this, info.getXid());
}
else if (info.getType() == XATransactionInfo.COMMIT_ONE_PHASE) {
this.brokerConnector.commitTransaction(this, info.getXid(), true);
}
else if (info.getType() == XATransactionInfo.COMMIT) {
this.brokerConnector.commitTransaction(this, info.getXid(), false);
}
else {
throw new JMSException("Packet type: " + info.getType() + " not recognized.");
}
}
}
/**
* register/deregister MessageProducer in the Broker
*
* @param info
* @throws JMSException
*/
private void consumeProducerInfo(ProducerInfo info) throws JMSException {
if (info.isStarted()) {
producers.add(info);
this.brokerConnector.registerMessageProducer(this, info);
}
else {
producers.remove(info);
this.brokerConnector.deregisterMessageProducer(this, info);
}
}
/**
* register/deregister Session in a Broker
*
* @param info
* @throws JMSException
*/
private void consumeSessionInfo(SessionInfo info) throws JMSException {
if (info.isStarted()) {
sessions.add(info);
this.brokerConnector.registerSession(this, info);
}
else {
sessions.remove(info);
this.brokerConnector.deregisterSession(this, info);
}
}
/**
* Update capacity for the peer
*
* @param info
*/
private void consumeCapacityInfo(CapacityInfo info) {
this.capacity = info.getCapacity();
}
private void updateCapacityInfo(String correlationId) {
CapacityInfo info = new CapacityInfo();
info.setResourceName(this.brokerConnector.getBrokerInfo().getBrokerName());
info.setCorrelationId(correlationId);
info.setCapacity(this.brokerConnector.getBrokerCapacity());
info.setFlowControlTimeout(getFlowControlTimeout(info.getCapacity()));
send(info);
}
private long getFlowControlTimeout(int capacity) {
long result = -1;
if (capacity <= 0) {
result = 10000;
}
else if (capacity <= 10) {
result = 1000;
}
else if (capacity <= 20) {
result = 10;
}
return result;
}
private void consumeBrokerInfo(BrokerInfo info) {
brokerConnection = true;
remoteBrokerName = info.getBrokerName();
String localBrokerName = brokerConnector.getBrokerInfo().getBrokerName();
String clusterName = getBrokerConnector().getBrokerContainer().getBroker().getBrokerClusterName();
if (clusterName.equals(info.getClusterName())){
clusteredConnection = true;
}
if (!remoteNetworkConnector && info.isRemote()){
try {
NetworkConnector networkConnector = new NetworkConnector(brokerConnector.getBrokerContainer());
NetworkChannel networkChannel = new NetworkChannel(networkConnector,brokerConnector.getBrokerContainer(),channel);
networkConnector.addNetworkChannel(networkChannel);
brokerConnector.getBrokerContainer().addNetworkConnector(networkConnector);
networkConnector.start();
log.info("Started reverse remote channel to " + remoteBrokerName);
remoteNetworkConnector = true;
}
catch (JMSException e) {
log.error("Failed to create reverse remote channel",e);
}
}
}
private void sendReceipt(Packet packet) {
sendReceipt(packet, null, false);
}
private void sendReceipt(Packet packet, Throwable requestEx, boolean failed) {
if (packet != null && packet.isReceiptRequired()) {
Receipt receipt = new Receipt();
receipt.setId(this.packetIdGenerator.generateId());
receipt.setCorrelationId(packet.getId());
receipt.setBrokerName(brokerConnector.getBrokerInfo().getBrokerName());
receipt.setClusterName(brokerConnector.getBrokerInfo().getClusterName());
receipt.setException(requestEx);
receipt.setFailed(failed);
send(receipt);
}
}
/**
* @param subject
*
*/
public void setSubject(Subject subject) {
this.subject=subject;
}
/**
* @return the subject
*/
public Subject getSubject() {
return subject;
}
}