Package org.codehaus.activemq.service.impl

Source Code of org.codehaus.activemq.service.impl.TransientTopicMessageContainerManager

/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.codehaus.activemq.service.impl;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.broker.BrokerClient;
import org.codehaus.activemq.filter.FilterFactory;
import org.codehaus.activemq.filter.FilterFactoryImpl;
import org.codehaus.activemq.message.ActiveMQDestination;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.service.Dispatcher;
import org.codehaus.activemq.service.MessageContainer;
import org.codehaus.activemq.service.Subscription;
import org.codehaus.activemq.service.SubscriptionContainer;
import org.codehaus.activemq.service.RedeliveryPolicy;
import org.codehaus.activemq.service.DeadLetterPolicy;
import org.codehaus.activemq.store.PersistenceAdapter;

import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import java.util.Iterator;
import java.util.Set;

/**
* A default implementation of a Broker of Topic messages for transient consumers
*
* @version $Revision: 1.2 $
*/
public class TransientTopicMessageContainerManager extends DurableTopicMessageContainerManager {
    private static final Log log = LogFactory.getLog(TransientTopicMessageContainerManager.class);

    public TransientTopicMessageContainerManager(PersistenceAdapter persistenceAdapter) {
        this(persistenceAdapter, new SubscriptionContainerImpl(new RedeliveryPolicy(), new DeadLetterPolicy()), new FilterFactoryImpl(), new DispatcherImpl());
    }

    public TransientTopicMessageContainerManager(PersistenceAdapter persistenceAdapter, SubscriptionContainer subscriptionContainer, FilterFactory filterFactory, Dispatcher dispatcher) {
        super(persistenceAdapter, subscriptionContainer, filterFactory, dispatcher);
    }

    /**
     * @param client
     * @param info
     * @throws javax.jms.JMSException
     */
    public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
        if (info.getDestination().isTopic()) {
            doAddMessageConsumer(client, info);
        }
    }


    /**
     * @param client
     * @param info
     * @throws javax.jms.JMSException
     */
    public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
        Subscription sub = (Subscription) activeSubscriptions.remove(info.getConsumerId());
        if (sub != null) {
            sub.setActive(false);
            dispatcher.removeActiveSubscription(client, sub);
            subscriptionContainer.removeSubscription(info.getConsumerId());
            sub.clear();
        }
    }


    /**
     * @param client
     * @param message
     * @throws javax.jms.JMSException
     */
    public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
        ActiveMQDestination destination = message.getJMSActiveMQDestination();
        if (destination != null && destination.isTopic()) {
            MessageContainer container = null;
            if (log.isDebugEnabled()) {
                log.debug("Dispaching to " + subscriptionContainer + " subscriptions with message: " + message);
            }
            Set subscriptions = subscriptionContainer.getSubscriptions(destination);
            for (Iterator i = subscriptions.iterator(); i.hasNext();) {
                Subscription sub = (Subscription) i.next();
                if (sub.isTarget(message) && (!sub.isDurableTopic() || message.getJMSDeliveryMode() == DeliveryMode.NON_PERSISTENT)) {
                    if (container == null) {
                        container = getContainer(message.getJMSDestination().toString());
                        container.addMessage(message);
                    }
                    sub.addMessage(container, message);
                }
            }
            updateSendStats(client, message);
        }
    }

    /**
     * Delete a durable subscriber
     *
     * @param clientId
     * @param subscriberName
     * @throws javax.jms.JMSException if the subscriber doesn't exist or is still active
     */
    public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
    }
}
TOP

Related Classes of org.codehaus.activemq.service.impl.TransientTopicMessageContainerManager

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.