/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.codehaus.activemq.service.impl;
import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import org.codehaus.activemq.broker.BrokerClient;
import org.codehaus.activemq.filter.AndFilter;
import org.codehaus.activemq.filter.Filter;
import org.codehaus.activemq.filter.FilterFactory;
import org.codehaus.activemq.filter.FilterFactoryImpl;
import org.codehaus.activemq.filter.NoLocalFilter;
import org.codehaus.activemq.message.ActiveMQDestination;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.service.Dispatcher;
import org.codehaus.activemq.service.MessageContainer;
import org.codehaus.activemq.service.Subscription;
import org.codehaus.activemq.service.SubscriptionContainer;
import org.codehaus.activemq.service.TopicMessageContainer;
import org.codehaus.activemq.store.PersistenceAdapter;
import javax.jms.DeliveryMode;
import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import java.util.Iterator;
import java.util.Map;
/**
* A default Broker used for Topic messages for durable consumers
*
* @version $Revision: 1.15 $
*/
public class DurableTopicMessageContainerManager extends MessageContainerManagerSupport {
private PersistenceAdapter persistenceAdapter;
protected SubscriptionContainer subscriptionContainer;
protected FilterFactory filterFactory;
protected Map activeSubscriptions = new ConcurrentHashMap();
private boolean loadedMessageContainers;
public DurableTopicMessageContainerManager(PersistenceAdapter persistenceAdapter) {
this(persistenceAdapter, new DurableTopicSubscriptionContainerImpl(), new FilterFactoryImpl(), new DispatcherImpl());
}
public DurableTopicMessageContainerManager(PersistenceAdapter persistenceAdapter, SubscriptionContainer subscriptionContainer, FilterFactory filterFactory, Dispatcher dispatcher) {
super(dispatcher);
this.persistenceAdapter = persistenceAdapter;
this.subscriptionContainer = subscriptionContainer;
this.filterFactory = filterFactory;
}
/**
* @param client
* @param info
* @throws javax.jms.JMSException
*/
public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
if (info.isDurableTopic()) {
doAddMessageConsumer(client, info);
}
}
/**
* @param client
* @param info
* @throws javax.jms.JMSException
*/
public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
subscriptionContainer.removeSubscription(info.getConsumerId());
Subscription sub = (Subscription) activeSubscriptions.remove(info.getConsumerId());
if (sub != null) {
sub.setActive(false);
dispatcher.removeActiveSubscription(client, sub);
}
}
/**
* Delete a durable subscriber
*
* @param clientId
* @param subscriberName
* @throws javax.jms.JMSException if the subscriber doesn't exist or is still active
*/
public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
boolean subscriptionFound = false;
for (Iterator i = subscriptionContainer.subscriptionIterator(); i.hasNext();) {
Subscription sub = (Subscription) i.next();
if (sub.getClientId().equals(clientId) && sub.getSubscriberName().equals(subscriberName)) {
//only delete if not active
if (sub.isActive()) {
throw new JMSException("The Consummer " + subscriberName + " is still active");
}
else {
subscriptionContainer.removeSubscription(sub.getConsumerId());
sub.clear();
subscriptionFound = true;
}
}
}
if (!subscriptionFound) {
throw new IllegalStateException("The Consumer " + subscriberName + " does not exist for client: " + clientId);
}
}
/**
* @param client
* @param message
* @throws javax.jms.JMSException
*/
public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
ActiveMQDestination dest = (ActiveMQDestination) message.getJMSDestination();
if (dest != null && dest.isTopic() && message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT) {
MessageContainer container = getContainer(message.getJMSDestination().toString());
// TODO note if we don't have any durable subscriptions then we might back up here
container.addMessage(message);
for (Iterator i = subscriptionContainer.subscriptionIterator(); i.hasNext();) {
Subscription sub = (Subscription) i.next();
if (sub.isTarget(message)) {
sub.addMessage(container, message);
}
}
}
}
/**
* Acknowledge a message as being read and consumed byh the Consumer
*
* @param client
* @param ack
* @throws javax.jms.JMSException
*/
public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException {
Subscription sub = (Subscription) activeSubscriptions.get(ack.getConsumerId());
if (sub != null) {
sub.messageConsumed(ack);
}
}
public void acknowledgeTransactedMessage(BrokerClient client, String transactionId, MessageAck ack) throws JMSException {
Subscription sub = (Subscription) activeSubscriptions.get(ack.getConsumerId());
if (sub != null) {
sub.onAcknowledgeTransactedMessageBeforeCommit(ack);
}
}
public void redeliverMessage(BrokerClient client, MessageAck ack) throws JMSException {
Subscription sub = (Subscription) activeSubscriptions.get(ack.getConsumerId());
if (sub != null) {
// lets find all the containers that contain this message
for (Iterator iter = messageContainers.values().iterator(); iter.hasNext();) {
MessageContainer container = (MessageContainer) iter.next();
if (container.containsMessage(ack.getMessageIdentity())) {
sub.redeliverMessage(container, ack);
// we only need to redeliver the message from one container
break;
}
}
}
}
/**
* poll or messages
*
* @throws javax.jms.JMSException
*/
public void poll() throws JMSException {
//do nothing
}
public void commitTransaction(BrokerClient client, String transactionId) {
}
public void rollbackTransaction(BrokerClient client, String transactionId) {
}
public MessageContainer getContainer(String destinationName) throws JMSException {
TopicMessageContainer container = (TopicMessageContainer) messageContainers.get(destinationName);
if (container == null) {
container = persistenceAdapter.createTopicMessageContainer(destinationName);
container.start();
messageContainers.put(destinationName, container);
}
return container;
}
// Implementation methods
//-------------------------------------------------------------------------
protected void doAddMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
boolean shouldRecover = false;
if (info.getConsumerName() != null && info.getClientId() != null) {
subscriptionContainer.checkForDuplicateDurableSubscription(client, info);
}
Subscription subscription = subscriptionContainer.getSubscription(info.getConsumerId());
if (subscription != null && subscription.isDurableTopic()) {
//check the subscription hasn't changed
if (!subscription.getDestination().equals(subscription.getDestination()) || !subscription.getSelector().equals(info.getSelector())) {
subscriptionContainer.removeSubscription(info.getConsumerId());
subscription.clear();
subscription = subscriptionContainer.makeSubscription(dispatcher, info, createFilter(info));
}
}
else {
subscription = subscriptionContainer.makeSubscription(dispatcher, info, createFilter(info));
shouldRecover = true;
}
subscription.setActiveConsumer(info);
activeSubscriptions.put(info.getConsumerId(), subscription);
dispatcher.addActiveSubscription(client, subscription);
if (shouldRecover) {
recoverSubscriptions(subscription);
}
// lets not make the subscription active until later
// as we can't start dispatching until we've sent back the receipt
// TODO we might wish to register a post-receipt action here
// to perform the wakeup
subscription.setActive(true);
//dispatcher.wakeup(subscription);
}
/**
* This method is called when a new durable subscription is started and
* so we need to go through each matching message container
* and dispatch any matching messages that may be outstanding
*
* @param subscription
*/
protected void recoverSubscriptions(Subscription subscription) throws JMSException {
// we should load all of the message containers from disk if we're a wildcard
if (subscription.isWildcard()) {
synchronized (this) {
if (!loadedMessageContainers) {
loadAllMessageContainers();
loadedMessageContainers = true;
}
}
}
else {
// load the container
getContainer(subscription.getDestination().getPhysicalName());
}
for (Iterator iter = messageContainers.values().iterator(); iter.hasNext();) {
TopicMessageContainer container = (TopicMessageContainer) iter.next();
container.recoverSubscription(subscription);
}
}
/**
* Called when recovering a wildcard subscription
* where we need to load all the durable message containers
* (for which we have any outstanding messages to deliver) into RAM
*/
protected void loadAllMessageContainers() {
/** TODO */
}
/**
* Create filter for a Consumer
*
* @param info
* @return the Fitler
* @throws javax.jms.JMSException
*/
protected Filter createFilter(ConsumerInfo info) throws JMSException {
Filter filter = filterFactory.createFilter(info.getDestination(), info.getSelector());
if (info.isNoLocal()) {
filter = new AndFilter(filter, new NoLocalFilter(info.getClientId()));
}
return filter;
}
}