Package org.codehaus.activemq.broker.impl

Source Code of org.codehaus.activemq.broker.impl.BrokerClientImpl

/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/

package org.codehaus.activemq.broker.impl;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.security.auth.Subject;
import javax.transaction.xa.XAException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.broker.BrokerClient;
import org.codehaus.activemq.broker.BrokerConnector;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ActiveMQXid;
import org.codehaus.activemq.message.BrokerInfo;
import org.codehaus.activemq.message.CapacityInfo;
import org.codehaus.activemq.message.ConnectionInfo;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.DurableUnsubscribe;
import org.codehaus.activemq.message.IntResponseReceipt;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.message.Packet;
import org.codehaus.activemq.message.PacketListener;
import org.codehaus.activemq.message.ProducerInfo;
import org.codehaus.activemq.message.Receipt;
import org.codehaus.activemq.message.ResponseReceipt;
import org.codehaus.activemq.message.SessionInfo;
import org.codehaus.activemq.message.TransactionInfo;
import org.codehaus.activemq.message.XATransactionInfo;
import org.codehaus.activemq.message.util.BoundedPacketQueue;
import org.codehaus.activemq.message.util.SpooledBoundedPacketQueue;
import org.codehaus.activemq.transport.NetworkChannel;
import org.codehaus.activemq.transport.NetworkConnector;
import org.codehaus.activemq.transport.TransportChannel;
import org.codehaus.activemq.util.IdGenerator;
import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
import EDU.oswego.cs.dl.util.concurrent.ThreadedExecutor;

/**
* A Broker client side proxy representing a JMS Connnection
*
* @version $Revision: 1.9 $
*/
public class BrokerClientImpl implements BrokerClient, ExceptionListener, PacketListener {
    private static final Log log = LogFactory.getLog(BrokerClientImpl.class);
    private BrokerConnector brokerConnector;
    private TransportChannel channel;
    private ConnectionInfo connectionInfo;
    private IdGenerator packetIdGenerator;
    private SynchronizedBoolean closed;
    private Set activeConsumers;
    private CopyOnWriteArrayList consumers;
    private CopyOnWriteArrayList producers;
    private CopyOnWriteArrayList transactions;
    private CopyOnWriteArrayList sessions;
    private boolean started;
    private boolean brokerConnection;
    private boolean clusteredConnection;
    private String remoteBrokerName;
    private int capacity = 100;
    private SpooledBoundedPacketQueue spoolQueue;
    private boolean cleanedUp;
    private boolean registered;
    private ArrayList dispatchQueue = new ArrayList();
  private Subject subject;
    private boolean remoteNetworkConnector;

    /**
     * Default Constructor of BrokerClientImpl
     */
    public BrokerClientImpl() {
        this.packetIdGenerator = new IdGenerator();
        this.closed = new SynchronizedBoolean(false);
        this.activeConsumers = new HashSet();
        this.consumers = new CopyOnWriteArrayList();
        this.producers = new CopyOnWriteArrayList();
        this.transactions = new CopyOnWriteArrayList();
        this.sessions = new CopyOnWriteArrayList();
    }

    /**
     * Initialize the BrokerClient
     *
     * @param brokerConnector
     * @param channel
     */
    public void initialize(BrokerConnector brokerConnector, TransportChannel channel) {
        this.brokerConnector = brokerConnector;
        this.channel = channel;
        this.channel.setPacketListener(this);
        this.channel.setExceptionListener(this);
        log.trace("brokerConnectorConnector client initialized");
    }
   
    /**
     * @return the BrokerConnector this client is associated with
     */
    public BrokerConnector getBrokerConnector(){
        return this.brokerConnector;
    }

    /**
     * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
     */
    public void onException(JMSException jmsEx) {
        log.info("Client disconnected: "+this);
        log.debug("Disconnect cuase: ", jmsEx);
        close();
    }

    /**
     * @return pretty print for this brokerConnector-client
     */
    public String toString() {
        String str = "brokerConnector-client:("+hashCode()+") ";
        str += connectionInfo == null ? "" : connectionInfo.getClientId();
        str += ": " + channel;
        return str;
    }

    /**
     * Dispatch an ActiveMQMessage to the end client
     *
     * @param message
     */
    public void dispatch(ActiveMQMessage message) {
        if ( !isSlowConsumer() ) {
            dispatchToClient(message);
        }else{
            if (spoolQueue == null) {
                log.warn("Connection: " + connectionInfo.getClientId() + " is a slow consumer");
                String spoolName = brokerConnector.getBrokerInfo().getBrokerName() + "_" + connectionInfo.getClientId();
                try {
                    spoolQueue = new SpooledBoundedPacketQueue(brokerConnector.getBrokerContainer().getBroker()
                            .getTempDir(), spoolName);
                    final BoundedPacketQueue bpq = spoolQueue;
                    ThreadedExecutor exec = new ThreadedExecutor();
                    exec.execute(new Runnable() {
                        public void run() {
                            while (!closed.get()) {
                                try {
                                    Packet packet = bpq.dequeue();
                                    if (packet != null){
                                        dispatchToClient(packet);
                                    }
                                }
                                catch (InterruptedException e) {
                                    log.warn("async dispatch got an interupt", e);
                                }
                                catch (JMSException e) {
                                    log.error("async dispatch got an problem", e);
                                }
                            }
                        }
                    });
                }
                catch (IOException e) {
                    log.error("Could not create SpooledBoundedQueue for this slow consumer", e);
                    close();
                }
                catch (InterruptedException e) {
                    log.error("Could not create SpooledBoundedQueue for this slow consumer", e);
                    close();
                }
            }
            if (spoolQueue != null) {
                try {
                    spoolQueue.enqueue(message);
                }
                catch (JMSException e) {
                    log.error(
                            "Could not enqueue message " + message + " to SpooledBoundedQueue for this slow consumer",
                            e);
                    close();
                }
            }
        }
       
    }
   
    private void dispatchToClient(Packet message) {
        boolean started;
        synchronized (this) {
            started = this.started;
        }
        if (started) {
            send(message);
        }
        else {
            // If the connection is stopped.. we have to hold the message till it is started.
            synchronized (this) {
                dispatchQueue.add(message);
            }
        }
    }

    /**
     * @return true if the peer for this Client is itself another Broker
     */
    public boolean isBrokerConnection() {
        return brokerConnection;
    }
   
    /**
     * @return true id this client is part of a cluster
     */
    public boolean isClusteredConnection(){
        return clusteredConnection;
    }


    /**
     * Get the Capacity for in-progress messages at the peer (probably a JMSConnection) Legimate values between 0-100. 0
     * capacity representing that the peer cannot process any more messages at the current time
     *
     * @return
     */
    public int getCapacity() {
        return capacity;
    }

    /**
     * @return the client id of the remote connection
     */
    public String getClientID() {
        if (connectionInfo != null) {
            return connectionInfo.getClientId();
        }
        return null;
    }

    /**
     * @return the channel used
     */
    public TransportChannel getChannel() {
        return channel;
    }

    /**
     * Get an indication if the peer should be considered as a slow consumer
     *
     * @return true id the peer should be considered as a slow consumer
     */
    public boolean isSlowConsumer() {
        return capacity <= 20; //don't want to fill the peer completely - as this may effect it's processing!
    }

    /**
     * Consume a Packet from the underlying TransportChannel for processing
     *
     * @param packet
     */
    public void consume(Packet packet) {
        if (!closed.get() && packet != null) {
            Throwable requestEx = null;
            boolean failed = false;
            String brokerName = brokerConnector.getBrokerInfo().getBrokerName();
            String clusterName = brokerConnector.getBrokerInfo().getClusterName();
            try {
                if (brokerConnection) {
                    packet.addBrokerVisited(remoteBrokerName); //got from the remote broker
                    packet.addBrokerVisited(brokerName);
                }
              
                if (packet.isJMSMessage()) {
                    ActiveMQMessage message = (ActiveMQMessage) packet;
                    // lets mark all messages from where they came from so that NoLocal can filter out loops
                    // e.g. when receiving messages over multicast, we don't want to publish them back out again
                    if (connectionInfo != null) {
                        message.setProducerID(connectionInfo.getClientId());
                    }
                    else {
                        log.warn("No connection info available! Maybe the client forgot to start() the Connection?");
                    }
                    if (!brokerConnection){
                        message.setEntryBrokerName(brokerName);
                        message.setEntryClusterName(clusterName);
                    }
                    consumeActiveMQMessage(message);
                }
                else {
                    switch (packet.getPacketType()) {
                        case Packet.ACTIVEMQ_MSG_ACK : {
                            MessageAck ack = (MessageAck) packet;
                            consumeMessageAck(ack);
                            break;
                        }
                        case Packet.XA_TRANSACTION_INFO : {
                            XATransactionInfo info = (XATransactionInfo) packet;
                            consumeXATransactionInfo(info);
                            break;
                        }
                        case Packet.TRANSACTION_INFO : {
                            TransactionInfo info = (TransactionInfo) packet;
                            consumeTransactionInfo(info);
                            break;
                        }
                        case Packet.CONSUMER_INFO : {
                            ConsumerInfo info = (ConsumerInfo) packet;
                            consumeConsumerInfo(info);
                            break;
                        }
                        case Packet.PRODUCER_INFO : {
                            ProducerInfo info = (ProducerInfo) packet;
                            consumeProducerInfo(info);
                            break;
                        }
                        case Packet.SESSION_INFO : {
                            SessionInfo info = (SessionInfo) packet;
                            consumeSessionInfo(info);
                            break;
                        }
                        case Packet.ACTIVEMQ_CONNECTION_INFO : {
                            ConnectionInfo info = (ConnectionInfo) packet;
                            consumeConnectionInfo(info);
                            break;
                        }
                        case Packet.DURABLE_UNSUBSCRIBE : {
                            DurableUnsubscribe ds = (DurableUnsubscribe) packet;
                            brokerConnector.durableUnsubscribe(this, ds);
                            break;
                        }
                        case Packet.CAPACITY_INFO : {
                            CapacityInfo info = (CapacityInfo) packet;
                            consumeCapacityInfo(info);
                            break;
                        }
                        case Packet.CAPACITY_INFO_REQUEST : {
                            updateCapacityInfo(packet.getId());
                            break;
                        }
                        case Packet.ACTIVEMQ_BROKER_INFO : {
                            consumeBrokerInfo((BrokerInfo) packet);
                            break;
                        } case Packet.KEEP_ALIVE: {
                          // Ignore as the packet contains no additional information to consume
                          break;
                        }
                        default : {
                            log.warn("Unknown Packet received: " + packet);
                            break;
                        }
                    }
                }
            }
            catch (Throwable e) {
                requestEx = e;
                log.warn("caught exception consuming packet: " + packet, e);
                failed = true;
            }
            sendReceipt(packet, requestEx, failed);
        }
    }

    /**
     * Register/deregister MessageConsumer with the Broker
     *
     * @param info
     * @throws JMSException
     */
    public void consumeConsumerInfo(ConsumerInfo info) throws JMSException {
        String localBrokerName = brokerConnector.getBrokerInfo().getBrokerName();
        if (info.isStarted()) {
            consumers.add(info);
            if (this.activeConsumers.add(info)) {
                this.brokerConnector.registerMessageConsumer(this, info);
            }
        }
        else {
            consumers.remove(info);
            if (activeConsumers.remove(info)) {
                this.brokerConnector.deregisterMessageConsumer(this, info);
            }
        }
    }

    /**
     * Update the peer Connection about the Broker's capacity for messages
     *
     * @param capacity
     */
    public void updateBrokerCapacity(int capacity) {
        CapacityInfo info = new CapacityInfo();
        info.setResourceName(this.brokerConnector.getBrokerInfo().getBrokerName());
        info.setCapacity(capacity);
        info.setFlowControlTimeout(getFlowControlTimeout(capacity));
        send(info);
    }

    /**
     * register with the Broker
     *
     * @param info
     * @throws JMSException
     */
    public void consumeConnectionInfo(ConnectionInfo info) throws JMSException {
        this.connectionInfo = info;
        if (info.isClosed()) {
            cleanUp();
            try {
                sendReceipt(info);
                info.setReceiptRequired(false);
                try {
                    Thread.sleep(500);
                }
                catch (Throwable e) {
                }
            }
            finally {
                close();
            }
        }
        else {
            if( !registered ) {
                this.brokerConnector.registerClient(this, info);
                registered=true;
            }
           
            synchronized( this ) {
              if (!started && info.isStarted()) {
                  started = true;
                  // Dispatch any queued
                 
                  log.debug(this + " has started running client version " + info.getClientVersion() + " , wire format = " + info.getWireFormatVersion());
                
                  //go through consumers and producers - setting their clientId (which might not have been set)
                  for (Iterator i = consumers.iterator();i.hasNext();) {
                      ConsumerInfo ci = (ConsumerInfo) i.next();
                      ci.setClientId(info.getClientId());
                  }
                  for (Iterator i = producers.iterator();i.hasNext();) {
                      ProducerInfo pi = (ProducerInfo) i.next();
                      pi.setClientId(info.getClientId());
                  }
                 
                  for( int i=0; i < dispatchQueue.size(); i++ ) {
                    ActiveMQMessage msg = (ActiveMQMessage) dispatchQueue.get(i);
                    dispatch(msg);
                  }
                  dispatchQueue.clear();
              }
             
              if(started && !info.isStarted()) {
                started=false;
                log.debug(this + " has stopped");                 
              }
            }
        }
    }

    /**
     * start consuming messages
     *
     * @throws JMSException
     */
    public void start() throws JMSException {
        channel.start();
    }

    /**
     * stop consuming messages
     *
     * @throws JMSException
     */
    public void stop() throws JMSException {
        log.trace("Stopping channel: " + channel);
        channel.stop();
    }

    /**
     * cleanup
     */
    public synchronized void cleanUp() {
        // we could be called here from 2 different code paths
        // based on if we get a transport failure or we do a clean shutdown
        // so lets only run this stuff once
        if (!cleanedUp) {
            cleanedUp = true;
            try {
                try {
                    for (Iterator i = consumers.iterator();i.hasNext();) {
                        ConsumerInfo info = (ConsumerInfo) i.next();
                        info.setStarted(false);
                        this.brokerConnector.deregisterMessageConsumer(this, info);
                    }
                    for (Iterator i = producers.iterator();i.hasNext();) {
                        ProducerInfo info = (ProducerInfo) i.next();
                        info.setStarted(false);
                        this.brokerConnector.deregisterMessageProducer(this, info);
                    }
                    for (Iterator i = sessions.iterator();i.hasNext();) {
                        SessionInfo info = (SessionInfo) i.next();
                        info.setStarted(false);
                        this.brokerConnector.deregisterSession(this, info);
                    }
                    for (Iterator i = transactions.iterator();i.hasNext();) {
                        this.brokerConnector.rollbackTransaction(this, i.next().toString());
                    }
                }
                finally {
                    // whatever happens, lets make sure we unregister & clean things down
                    if (log.isDebugEnabled()) {
                        log.debug(this + " has stopped");
                    }
                    this.consumers.clear();
                    this.producers.clear();
                    this.transactions.clear();
                    this.sessions.clear();
                    this.brokerConnector.deregisterClient(this, connectionInfo);
                    registered=false;
                }
            }
            catch (JMSException e) {
                log.warn("failed to de-register Broker client: " + e, e);
            }
        }
        else {
            log.debug("We are ignoring a duplicate cleanup() method called for: " + this);
        }
    }

    // Implementation methods
    //-------------------------------------------------------------------------
    protected void send(Packet packet) {
        if (!closed.get()) {
            try {
                if (brokerConnection) {
                    String brokerName = brokerConnector.getBrokerContainer().getBroker().getBrokerName();
                    packet.addBrokerVisited(brokerName);
                    if (packet.hasVisited(remoteBrokerName)) {
                        if (log.isDebugEnabled()) {
                            log.debug("Not Forwarding: " + remoteBrokerName + " has already been visited by packet: "
                                    + packet);
                        }
                        return;
                    }
                }
                this.channel.asyncSend(packet);
            }
            catch (JMSException e) {
                log.warn(this + " caught exception ", e);
                close();
            }
        }
    }

    protected void close() {
        if (closed.commit(false, true)) {
            this.channel.stop();
            log.debug(this + " has closed");
        }
    }

    /**
     * Send message to Broker
     *
     * @param message
     * @throws JMSException
     */
    private void consumeActiveMQMessage(ActiveMQMessage message) throws JMSException {
        message = message.shallowCopy();
        if (message.isPartOfTransaction()) {
            this.brokerConnector.sendTransactedMessage(this, message.getTransactionId(), message);
        }
        else {
            this.brokerConnector.sendMessage(this, message);
        }
    }

    /**
     * Send Message acknowledge to the Broker
     *
     * @param ack
     * @throws JMSException
     */
    private void consumeMessageAck(MessageAck ack) throws JMSException {
        if (ack.isPartOfTransaction()) {
            this.brokerConnector.acknowledgeTransactedMessage(this, ack.getTransactionId(), ack);
        }
        else {
            this.brokerConnector.acknowledgeMessage(this, ack);
        }
    }

    /**
     * Handle transaction start/commit/rollback
     *
     * @param info
     * @throws JMSException
     */
    private void consumeTransactionInfo(TransactionInfo info) throws JMSException {
        if (info.getType() == TransactionInfo.START) {
            transactions.add(info.getTransactionId());
            this.brokerConnector.startTransaction(this, info.getTransactionId());
        }
        else {
            if (info.getType() == TransactionInfo.ROLLBACK) {
                this.brokerConnector.rollbackTransaction(this, info.getTransactionId());
            }
            else if (info.getType() == TransactionInfo.COMMIT) {
                this.brokerConnector.commitTransaction(this, info.getTransactionId());
            }
            transactions.remove(info.getTransactionId());
        }
    }

    /**
     * Handle XA transaction start/prepare/commit/rollback
     *
     * @param info
     * @throws JMSException
     * @throws XAException
     */
    private void consumeXATransactionInfo(XATransactionInfo info) throws JMSException, XAException {
        if (info.getType() == XATransactionInfo.START) {
            this.brokerConnector.startTransaction(this, info.getXid());
        }
        else if (info.getType() == XATransactionInfo.XA_RECOVER) {
            ActiveMQXid rc[] = this.brokerConnector.getPreparedTransactions(this);
            // We will be sending our own receipt..
            info.setReceiptRequired(false);
            // Send the receipt..
            ResponseReceipt receipt = new ResponseReceipt();
            receipt.setId(this.packetIdGenerator.generateId());
            receipt.setCorrelationId(info.getId());
            receipt.setResult(rc);
            send(receipt);
        }
        else if (info.getType() == XATransactionInfo.GET_RM_ID) {
            String rc = this.brokerConnector.getResourceManagerId(this);
            // We will be sending our own receipt..
            info.setReceiptRequired(false);
            // Send the receipt..
            ResponseReceipt receipt = new ResponseReceipt();
            receipt.setId(this.packetIdGenerator.generateId());
            receipt.setCorrelationId(info.getId());
            receipt.setResult(rc);
            send(receipt);
        }
        else if (info.getType() == XATransactionInfo.END) {
            // we don't do anything..
        }
        else {
            if (info.getType() == XATransactionInfo.PRE_COMMIT) {
                int rc = this.brokerConnector.prepareTransaction(this, info.getXid());
                // We will be sending our own receipt..
                info.setReceiptRequired(false);
                // Send the receipt..
                IntResponseReceipt receipt = new IntResponseReceipt();
                receipt.setId(this.packetIdGenerator.generateId());
                receipt.setCorrelationId(info.getId());
                receipt.setResult(rc);
                send(receipt);
            }
            else if (info.getType() == XATransactionInfo.ROLLBACK) {
                this.brokerConnector.rollbackTransaction(this, info.getXid());
            }
            else if (info.getType() == XATransactionInfo.COMMIT_ONE_PHASE) {
                this.brokerConnector.commitTransaction(this, info.getXid(), true);
            }
            else if (info.getType() == XATransactionInfo.COMMIT) {
                this.brokerConnector.commitTransaction(this, info.getXid(), false);
            }
            else {
                throw new JMSException("Packet type: " + info.getType() + " not recognized.");
            }
        }
    }

    /**
     * register/deregister MessageProducer in the Broker
     *
     * @param info
     * @throws JMSException
     */
    private void consumeProducerInfo(ProducerInfo info) throws JMSException {
        if (info.isStarted()) {
            producers.add(info);
            this.brokerConnector.registerMessageProducer(this, info);
        }
        else {
            producers.remove(info);
            this.brokerConnector.deregisterMessageProducer(this, info);
        }
    }

    /**
     * register/deregister Session in a Broker
     *
     * @param info
     * @throws JMSException
     */
    private void consumeSessionInfo(SessionInfo info) throws JMSException {
        if (info.isStarted()) {
            sessions.add(info);
            this.brokerConnector.registerSession(this, info);
        }
        else {
            sessions.remove(info);
            this.brokerConnector.deregisterSession(this, info);
        }
    }

    /**
     * Update capacity for the peer
     *
     * @param info
     */
    private void consumeCapacityInfo(CapacityInfo info) {
        this.capacity = info.getCapacity();
    }

    private void updateCapacityInfo(String correlationId) {
        CapacityInfo info = new CapacityInfo();
        info.setResourceName(this.brokerConnector.getBrokerInfo().getBrokerName());
        info.setCorrelationId(correlationId);
        info.setCapacity(this.brokerConnector.getBrokerCapacity());
        info.setFlowControlTimeout(getFlowControlTimeout(info.getCapacity()));
        send(info);
    }

    private long getFlowControlTimeout(int capacity) {
        long result = -1;
        if (capacity <= 0) {
            result = 10000;
        }
        else if (capacity <= 10) {
            result = 1000;
        }
        else if (capacity <= 20) {
            result = 10;
        }
        return result;
    }

    private void consumeBrokerInfo(BrokerInfo info) {
        brokerConnection = true;
        remoteBrokerName = info.getBrokerName();
        String localBrokerName = brokerConnector.getBrokerInfo().getBrokerName();
      
        String clusterName = getBrokerConnector().getBrokerContainer().getBroker().getBrokerClusterName();
      
        if (clusterName.equals(info.getClusterName())){
            clusteredConnection = true;
        }
        if (!remoteNetworkConnector && info.isRemote()){
            try {
                NetworkConnector networkConnector = new NetworkConnector(brokerConnector.getBrokerContainer());
                NetworkChannel networkChannel = new NetworkChannel(networkConnector,brokerConnector.getBrokerContainer(),channel);
                networkConnector.addNetworkChannel(networkChannel);
                brokerConnector.getBrokerContainer().addNetworkConnector(networkConnector);
                networkConnector.start();
                log.info("Started reverse remote channel to " + remoteBrokerName);
                remoteNetworkConnector = true;
            }
            catch (JMSException e) {
                log.error("Failed to create reverse remote channel",e);
            }
        }
    }

    private void sendReceipt(Packet packet) {
        sendReceipt(packet, null, false);
    }

    private void sendReceipt(Packet packet, Throwable requestEx, boolean failed) {
        if (packet != null && packet.isReceiptRequired()) {
            Receipt receipt = new Receipt();
            receipt.setId(this.packetIdGenerator.generateId());
            receipt.setCorrelationId(packet.getId());
            receipt.setBrokerName(brokerConnector.getBrokerInfo().getBrokerName());
            receipt.setClusterName(brokerConnector.getBrokerInfo().getClusterName());
            receipt.setException(requestEx);
            receipt.setFailed(failed);
            send(receipt);
        }
    }

    /**
     * @param subject
     *
     */
  public void setSubject(Subject subject) {
    this.subject=subject;
  }

    /**
     * @return the subject
     */
  public Subject getSubject() {
    return subject;
  }
}
TOP

Related Classes of org.codehaus.activemq.broker.impl.BrokerClientImpl

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.