Package org.codehaus.activemq.service.boundedvm

Source Code of org.codehaus.activemq.service.boundedvm.TransientQueueSubscription

/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/

package org.codehaus.activemq.service.boundedvm;
import java.util.List;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import org.codehaus.activemq.broker.BrokerClient;
import org.codehaus.activemq.broker.BrokerConnector;
import org.codehaus.activemq.filter.Filter;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.BrokerInfo;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.Packet;
import org.codehaus.activemq.message.util.MemoryBoundedQueue;

/**
* A holder for Transient Queue consumer info and message routing
*
* @version $Revision: 1.1 $
*/
public class TransientQueueSubscription extends TransientSubscription {
    private BrokerClient client;
    private String brokerName;
    private String clusterName;
    private MemoryBoundedQueue dispatchedQueue;

    /**
     * Construct the TransientQueueSubscription
     *
     * @param client
     * @param dispatchedQueue
     * @param filter
     * @param info
     */
    public TransientQueueSubscription(BrokerClient client, MemoryBoundedQueue dispatchedQueue, Filter filter,
            ConsumerInfo info) {
        super(filter, info);
        this.client = client;
        this.dispatchedQueue = dispatchedQueue;
        if (client != null){
            BrokerConnector connector = client.getBrokerConnector();
            if (connector != null){
                BrokerInfo bi = connector.getBrokerInfo();
                if (bi != null){
                    this.brokerName = bi.getBrokerName();
                    this.clusterName = bi.getClusterName();
                }
            }
        }
    }

    /**
     * determines if the Subscription is interested in the message
     *
     * @param message
     * @return true if this Subscription will accept the message
     * @throws JMSException
     */
    public boolean isTarget(ActiveMQMessage message) throws JMSException {
        boolean result = false;
        if (message != null) {
            //make sure we don't loop messages around the cluster
            if (!client.isClusteredConnection() || !message.isEntryCluster(clusterName)
                    || message.isEntryBroker(brokerName)) {
                result = filter.matches(message)
                        && (message.getJMSDeliveryMode() == DeliveryMode.NON_PERSISTENT || consumerInfo
                                .getDestination().isTemporary());
            }
        }
        return result;
    }

    /**
     * @return true if the consumer has capacity for more messages
     */
    public boolean canAcceptMessages() {
        return dispatchedQueue.size() <= consumerInfo.getPrefetchNumber();
    }

    /**
     * Dispatch a message to the Consumer
     *
     * @param message
     * @throws JMSException
     */
    public void doDispatch(ActiveMQMessage message) throws JMSException {
        addDispatchedMessage(message);
        message = message.shallowCopy();
        message.setConsumerNos(new int[]{consumerInfo.getConsumerNo()});
        client.dispatch(message);
    }

    /**
     * Add a dispatched message
     *
     * @param message
     */
    private void addDispatchedMessage(ActiveMQMessage message) {
        dispatchedQueue.enqueue(message);
    }

    /**
     * Acknowledge the receipt of a message by a consumer
     *
     * @param id
     * @return the removed ActiveMQMessage with the associated id
     */
    public ActiveMQMessage acknowledgeMessage(String id) {
        ActiveMQMessage msg = (ActiveMQMessage) dispatchedQueue.remove(id);
        return msg;
    }

    /**
     * @return all the unacknowledge messages
     */
    public List getUndeliveredMessages() {
        return dispatchedQueue.getContents();
    }

    /**
     * close the subscription
     */
    public void close() {
        super.close();
        dispatchedQueue.close();
    }
}
TOP

Related Classes of org.codehaus.activemq.service.boundedvm.TransientQueueSubscription

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.