Package org.codehaus.activemq.service.impl

Source Code of org.codehaus.activemq.service.impl.DurableTopicMessageContainerManager

/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.codehaus.activemq.service.impl;

import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import org.codehaus.activemq.broker.BrokerClient;
import org.codehaus.activemq.filter.AndFilter;
import org.codehaus.activemq.filter.Filter;
import org.codehaus.activemq.filter.FilterFactory;
import org.codehaus.activemq.filter.FilterFactoryImpl;
import org.codehaus.activemq.filter.NoLocalFilter;
import org.codehaus.activemq.message.ActiveMQDestination;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.service.Dispatcher;
import org.codehaus.activemq.service.MessageContainer;
import org.codehaus.activemq.service.Subscription;
import org.codehaus.activemq.service.SubscriptionContainer;
import org.codehaus.activemq.service.TopicMessageContainer;
import org.codehaus.activemq.store.PersistenceAdapter;

import javax.jms.DeliveryMode;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import java.util.Iterator;
import java.util.Map;

/**
* A default Broker used for Topic messages for durable consumers
*
* @version $Revision: 1.15 $
*/
public class DurableTopicMessageContainerManager extends MessageContainerManagerSupport {
    private PersistenceAdapter persistenceAdapter;
    protected SubscriptionContainer subscriptionContainer;
    protected FilterFactory filterFactory;
    protected Map activeSubscriptions = new ConcurrentHashMap();
    private boolean loadedMessageContainers;

    public DurableTopicMessageContainerManager(PersistenceAdapter persistenceAdapter) {
        this(persistenceAdapter, new DurableTopicSubscriptionContainerImpl(), new FilterFactoryImpl(), new DispatcherImpl());
    }

    public DurableTopicMessageContainerManager(PersistenceAdapter persistenceAdapter, SubscriptionContainer subscriptionContainer, FilterFactory filterFactory, Dispatcher dispatcher) {
        super(dispatcher);
        this.persistenceAdapter = persistenceAdapter;
        this.subscriptionContainer = subscriptionContainer;
        this.filterFactory = filterFactory;
    }

    /**
     * @param client
     * @param info
     * @throws javax.jms.JMSException
     */
    public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
        if (info.isDurableTopic()) {
            doAddMessageConsumer(client, info);
        }
    }

    /**
     * @param client
     * @param info
     * @throws javax.jms.JMSException
     */
    public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
        subscriptionContainer.removeSubscription(info.getConsumerId());
        Subscription sub = (Subscription) activeSubscriptions.remove(info.getConsumerId());
        if (sub != null) {
            sub.setActive(false);
            dispatcher.removeActiveSubscription(client, sub);
        }
    }

    /**
     * Delete a durable subscriber
     *
     * @param clientId
     * @param subscriberName
     * @throws javax.jms.JMSException if the subscriber doesn't exist or is still active
     */
    public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
        boolean subscriptionFound = false;
        for (Iterator i = subscriptionContainer.subscriptionIterator(); i.hasNext();) {
            Subscription sub = (Subscription) i.next();
            if (sub.getClientId().equals(clientId) && sub.getSubscriberName().equals(subscriberName)) {
                //only delete if not active
                if (sub.isActive()) {
                    throw new JMSException("The Consummer " + subscriberName + " is still active");
                }
                else {
                    subscriptionContainer.removeSubscription(sub.getConsumerId());
                    sub.clear();
                    subscriptionFound = true;
                }
            }
        }
        if (!subscriptionFound) {
            throw new IllegalStateException("The Consumer " + subscriberName + " does not exist for client: " + clientId);
        }
    }

    /**
     * @param client
     * @param message
     * @throws javax.jms.JMSException
     */
    public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
        ActiveMQDestination dest = (ActiveMQDestination) message.getJMSDestination();
        if (dest != null && dest.isTopic() && message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT) {
            MessageContainer container = getContainer(message.getJMSDestination().toString());

            // TODO note if we don't have any durable subscriptions then we might back up here
            container.addMessage(message);
            for (Iterator i = subscriptionContainer.subscriptionIterator(); i.hasNext();) {
                Subscription sub = (Subscription) i.next();
                if (sub.isTarget(message)) {
                    sub.addMessage(container, message);
                }
            }
        }
    }

    /**
     * Acknowledge a message as being read and consumed byh the Consumer
     *
     * @param client
     * @param ack
     * @throws javax.jms.JMSException
     */
    public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException {
        Subscription sub = (Subscription) activeSubscriptions.get(ack.getConsumerId());
        if (sub != null) {
            sub.messageConsumed(ack);
        }
    }

    public void acknowledgeTransactedMessage(BrokerClient client, String transactionId, MessageAck ack) throws JMSException {
        Subscription sub = (Subscription) activeSubscriptions.get(ack.getConsumerId());
        if (sub != null) {
            sub.onAcknowledgeTransactedMessageBeforeCommit(ack);
        }
    }

    public void redeliverMessage(BrokerClient client, MessageAck ack) throws JMSException {
        Subscription sub = (Subscription) activeSubscriptions.get(ack.getConsumerId());
        if (sub != null) {
            // lets find all the containers that contain this message
            for (Iterator iter = messageContainers.values().iterator(); iter.hasNext();) {
                MessageContainer container = (MessageContainer) iter.next();
                if (container.containsMessage(ack.getMessageIdentity())) {
                    sub.redeliverMessage(container, ack);
                   
                    // we only need to redeliver the message from one container
                    break;
                }
            }
        }
    }


    /**
     * poll or messages
     *
     * @throws javax.jms.JMSException
     */
    public void poll() throws JMSException {
        //do nothing
    }

    public void commitTransaction(BrokerClient client, String transactionId) {
    }

    public void rollbackTransaction(BrokerClient client, String transactionId) {
    }


    public MessageContainer getContainer(String destinationName) throws JMSException {
        TopicMessageContainer container = (TopicMessageContainer) messageContainers.get(destinationName);
        if (container == null) {
            container = persistenceAdapter.createTopicMessageContainer(destinationName);
            container.start();
            messageContainers.put(destinationName, container);
        }
        return container;
    }

    // Implementation methods
    //-------------------------------------------------------------------------


    protected void doAddMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
        boolean shouldRecover = false;
        if (info.getConsumerName() != null && info.getClientId() != null) {
            subscriptionContainer.checkForDuplicateDurableSubscription(client, info);
        }
        Subscription subscription = subscriptionContainer.getSubscription(info.getConsumerId());
        if (subscription != null && subscription.isDurableTopic()) {
            //check the subscription hasn't changed
            if (!subscription.getDestination().equals(subscription.getDestination()) || !subscription.getSelector().equals(info.getSelector())) {
                subscriptionContainer.removeSubscription(info.getConsumerId());
                subscription.clear();
                subscription = subscriptionContainer.makeSubscription(dispatcher, info, createFilter(info));
            }
        }
        else {
            subscription = subscriptionContainer.makeSubscription(dispatcher, info, createFilter(info));
            shouldRecover = true;
        }
        subscription.setActiveConsumer(info);
        activeSubscriptions.put(info.getConsumerId(), subscription);

        dispatcher.addActiveSubscription(client, subscription);

        if (shouldRecover) {
            recoverSubscriptions(subscription);
        }

        // lets not make the subscription active until later
        // as we can't start dispatching until we've sent back the receipt
       
        // TODO we might wish to register a post-receipt action here
        // to perform the wakeup
        subscription.setActive(true);
        //dispatcher.wakeup(subscription);
    }

    /**
     * This method is called when a new durable subscription is started and
     * so we need to go through each matching message container
     * and dispatch any matching messages that may be outstanding
     *
     * @param subscription
     */
    protected void recoverSubscriptions(Subscription subscription) throws JMSException {
        // we should load all of the message containers from disk if we're a wildcard
        if (subscription.isWildcard()) {
            synchronized (this) {
                if (!loadedMessageContainers) {
                    loadAllMessageContainers();
                    loadedMessageContainers = true;
                }
            }
        }
        else {
            // load the container
            getContainer(subscription.getDestination().getPhysicalName());
        }
        for (Iterator iter = messageContainers.values().iterator(); iter.hasNext();) {
            TopicMessageContainer container = (TopicMessageContainer) iter.next();
            container.recoverSubscription(subscription);
        }
    }


    /**
     * Called when recovering a wildcard subscription
     * where we need to load all the durable message containers
     * (for which we have any outstanding messages to deliver) into RAM
     */
    protected void loadAllMessageContainers() {
        /** TODO */
    }

    /**
     * Create filter for a Consumer
     *
     * @param info
     * @return the Fitler
     * @throws javax.jms.JMSException
     */
    protected Filter createFilter(ConsumerInfo info) throws JMSException {
        Filter filter = filterFactory.createFilter(info.getDestination(), info.getSelector());
        if (info.isNoLocal()) {
            filter = new AndFilter(filter, new NoLocalFilter(info.getClientId()));
        }
        return filter;
    }


}
TOP

Related Classes of org.codehaus.activemq.service.impl.DurableTopicMessageContainerManager

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.