/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.codehaus.activemq.service.impl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.broker.BrokerClient;
import org.codehaus.activemq.filter.FilterFactory;
import org.codehaus.activemq.filter.FilterFactoryImpl;
import org.codehaus.activemq.message.ActiveMQDestination;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.service.Dispatcher;
import org.codehaus.activemq.service.MessageContainer;
import org.codehaus.activemq.service.Subscription;
import org.codehaus.activemq.service.SubscriptionContainer;
import org.codehaus.activemq.service.RedeliveryPolicy;
import org.codehaus.activemq.service.DeadLetterPolicy;
import org.codehaus.activemq.store.PersistenceAdapter;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import java.util.Iterator;
import java.util.Set;
/**
* A default implementation of a Broker of Topic messages for transient consumers
*
* @version $Revision: 1.2 $
*/
public class TransientTopicMessageContainerManager extends DurableTopicMessageContainerManager {
private static final Log log = LogFactory.getLog(TransientTopicMessageContainerManager.class);
public TransientTopicMessageContainerManager(PersistenceAdapter persistenceAdapter) {
this(persistenceAdapter, new SubscriptionContainerImpl(new RedeliveryPolicy(), new DeadLetterPolicy()), new FilterFactoryImpl(), new DispatcherImpl());
}
public TransientTopicMessageContainerManager(PersistenceAdapter persistenceAdapter, SubscriptionContainer subscriptionContainer, FilterFactory filterFactory, Dispatcher dispatcher) {
super(persistenceAdapter, subscriptionContainer, filterFactory, dispatcher);
}
/**
* @param client
* @param info
* @throws javax.jms.JMSException
*/
public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
if (info.getDestination().isTopic()) {
doAddMessageConsumer(client, info);
}
}
/**
* @param client
* @param info
* @throws javax.jms.JMSException
*/
public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
Subscription sub = (Subscription) activeSubscriptions.remove(info.getConsumerId());
if (sub != null) {
sub.setActive(false);
dispatcher.removeActiveSubscription(client, sub);
subscriptionContainer.removeSubscription(info.getConsumerId());
sub.clear();
}
}
/**
* @param client
* @param message
* @throws javax.jms.JMSException
*/
public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
ActiveMQDestination destination = message.getJMSActiveMQDestination();
if (destination != null && destination.isTopic()) {
MessageContainer container = null;
if (log.isDebugEnabled()) {
log.debug("Dispaching to " + subscriptionContainer + " subscriptions with message: " + message);
}
Set subscriptions = subscriptionContainer.getSubscriptions(destination);
for (Iterator i = subscriptions.iterator(); i.hasNext();) {
Subscription sub = (Subscription) i.next();
if (sub.isTarget(message) && (!sub.isDurableTopic() || message.getJMSDeliveryMode() == DeliveryMode.NON_PERSISTENT)) {
if (container == null) {
container = getContainer(message.getJMSDestination().toString());
container.addMessage(message);
}
sub.addMessage(container, message);
}
}
updateSendStats(client, message);
}
}
/**
* Delete a durable subscriber
*
* @param clientId
* @param subscriberName
* @throws javax.jms.JMSException if the subscriber doesn't exist or is still active
*/
public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
}
}