/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.codehaus.activemq.service.impl;
import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.broker.BrokerClient;
import org.codehaus.activemq.filter.AndFilter;
import org.codehaus.activemq.filter.DestinationMap;
import org.codehaus.activemq.filter.Filter;
import org.codehaus.activemq.filter.FilterFactory;
import org.codehaus.activemq.filter.FilterFactoryImpl;
import org.codehaus.activemq.filter.NoLocalFilter;
import org.codehaus.activemq.message.ActiveMQDestination;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ActiveMQQueue;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.service.Dispatcher;
import org.codehaus.activemq.service.MessageContainer;
import org.codehaus.activemq.service.QueueList;
import org.codehaus.activemq.service.QueueListEntry;
import org.codehaus.activemq.service.QueueMessageContainer;
import org.codehaus.activemq.service.Subscription;
import org.codehaus.activemq.service.SubscriptionContainer;
import org.codehaus.activemq.service.RedeliveryPolicy;
import org.codehaus.activemq.service.boundedvm.TransientQueueBoundedMessageContainer;
import org.codehaus.activemq.store.PersistenceAdapter;
import javax.jms.Destination;
import javax.jms.JMSException;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
/**
* A default Broker used for Queue messages
*
* @version $Revision: 1.1 $
*/
public class DurableQueueMessageContainerManager extends MessageContainerManagerSupport {
private static final Log log = LogFactory.getLog(DurableQueueMessageContainerManager.class);
private static final int MAX_MESSAGES_DISPATCHED_FROM_POLL = 50;
private PersistenceAdapter persistenceAdapter;
protected SubscriptionContainer subscriptionContainer;
protected FilterFactory filterFactory;
protected Map activeSubscriptions = new ConcurrentHashMap();
protected Map browsers = new ConcurrentHashMap();
protected DestinationMap destinationMap = new DestinationMap();
private Object subscriptionMutex = new Object();
public DurableQueueMessageContainerManager(PersistenceAdapter persistenceAdapter, RedeliveryPolicy redeliveryPolicy) {
this(persistenceAdapter, new SubscriptionContainerImpl(redeliveryPolicy), new FilterFactoryImpl(), new DispatcherImpl());
}
public DurableQueueMessageContainerManager(PersistenceAdapter persistenceAdapter, SubscriptionContainer subscriptionContainer, FilterFactory filterFactory, Dispatcher dispatcher) {
super(dispatcher);
this.persistenceAdapter = persistenceAdapter;
this.subscriptionContainer = subscriptionContainer;
this.filterFactory = filterFactory;
}
/**
* @param client
* @param info
* @throws javax.jms.JMSException
*/
public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
if (log.isDebugEnabled()) {
log.debug("Adding consumer: " + info);
}
if (info.getDestination().isQueue() && !info.getDestination().isTemporary()) {
//ensure a matching container exists for the destination
getContainer(info.getDestination().getPhysicalName());
Subscription sub = subscriptionContainer.makeSubscription(dispatcher, client,info, createFilter(info));
dispatcher.addActiveSubscription(client, sub);
updateActiveSubscriptions(sub);
// set active last in case we end up dispatching some messages
// while recovering
sub.setActive(true);
}
}
/**
* @param client
* @param info
* @throws javax.jms.JMSException
*/
public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
if (log.isDebugEnabled()) {
log.debug("Removing consumer: " + info);
}
if (info.getDestination() != null && info.getDestination().isQueue()) {
synchronized (subscriptionMutex) {
Subscription sub = (Subscription) subscriptionContainer.removeSubscription(info.getConsumerId());
if (sub != null) {
sub.setActive(false);
sub.clear();//resets entries in the QueueMessageContainer
dispatcher.removeActiveSubscription(client, sub);
//need to do wildcards for this - but for now use exact matches
for (Iterator iter = messageContainers.values().iterator(); iter.hasNext();) {
QueueMessageContainer container = (QueueMessageContainer) iter.next();
//should change this for wild cards ...
if (container.getDestinationName().equals(sub.getDestination().getPhysicalName())) {
QueueList list = getSubscriptionList(container);
list.remove(sub);
if (list.isEmpty()) {
activeSubscriptions.remove(sub.getDestination().getPhysicalName());
}
list = getBrowserList(container);
list.remove(sub);
if (list.isEmpty()) {
browsers.remove(sub.getDestination().getPhysicalName());
}
}
}
}
}
}
}
/**
* Delete a durable subscriber
*
* @param clientId
* @param subscriberName
* @throws javax.jms.JMSException if the subscriber doesn't exist or is still active
*/
public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
}
/**
* @param client
* @param message
* @throws javax.jms.JMSException
*/
public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
ActiveMQDestination dest = (ActiveMQDestination) message.getJMSDestination();
if (dest != null && dest.isQueue() && !message.isTemporary()) {
if (log.isDebugEnabled()) {
log.debug("Dispaching message: " + message);
}
//ensure a matching container exists for the destination
getContainer(((ActiveMQDestination) message.getJMSDestination()).getPhysicalName());
Set set = destinationMap.get(message.getJMSActiveMQDestination());
for (Iterator i = set.iterator();i.hasNext();) {
QueueMessageContainer container = (QueueMessageContainer) i.next();
container.addMessage(message);
dispatcher.wakeup();
updateSendStats(client, message);
}
}
}
/**
* Acknowledge a message as being read and consumed by the Consumer
*
* @param client
* @param ack
* @throws javax.jms.JMSException
*/
public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException {
if (!ack.isTemporary() && ack.getDestination().isQueue()){
Subscription sub = subscriptionContainer.getSubscription(ack.getConsumerId());
if (sub != null) {
sub.messageConsumed(ack);
if (ack.isMessageRead()) {
updateAcknowledgeStats(client, sub);
}
}
}
}
public void acknowledgeTransactedMessage(BrokerClient client, String transactionId, MessageAck ack) throws JMSException {
if (!ack.isTemporary() && ack.getDestination().isQueue()){
Subscription sub = subscriptionContainer.getSubscription(ack.getConsumerId());
if (sub != null) {
sub.onAcknowledgeTransactedMessageBeforeCommit(ack);
}
}
}
public void redeliverMessage(BrokerClient client, MessageAck ack) throws JMSException {
if (!ack.isTemporary() && ack.getDestination().isQueue()){
Subscription sub = subscriptionContainer.getSubscription(ack.getConsumerId());
if (sub != null) {
sub.redeliverMessage(null, ack);
}
}
}
/**
* Poll for messages
*
* @throws javax.jms.JMSException
*/
public void poll() throws JMSException {
synchronized (subscriptionMutex) {
for (Iterator iter = activeSubscriptions.keySet().iterator(); iter.hasNext();) {
QueueMessageContainer container = (QueueMessageContainer) iter.next();
QueueList browserList = (QueueList) browsers.get(container);
doPeek(container, browserList);
QueueList list = (QueueList) activeSubscriptions.get(container);
doPoll(container, list);
}
}
}
public void commitTransaction(BrokerClient client, String transactionId) {
}
public void rollbackTransaction(BrokerClient client, String transactionId) {
}
public MessageContainer getContainer(String destinationName) throws JMSException {
synchronized (subscriptionMutex) {
return super.getContainer(destinationName);
}
}
// Implementation methods
//-------------------------------------------------------------------------
protected MessageContainer createContainer(String destinationName) throws JMSException {
QueueMessageContainer container = persistenceAdapter.createQueueMessageContainer(destinationName);
//Add any interested Subscriptions to the new Container
for (Iterator iter = subscriptionContainer.subscriptionIterator(); iter.hasNext();) {
Subscription sub = (Subscription) iter.next();
if (sub.isBrowser()) {
updateBrowsers(container, sub);
}
else {
updateActiveSubscriptions(container, sub);
}
}
ActiveMQDestination key = new ActiveMQQueue(destinationName);
destinationMap.put(key, container);
return container;
}
protected Destination createDestination(String destinationName) {
return new ActiveMQQueue(destinationName);
}
private void doPeek(QueueMessageContainer container, QueueList browsers) throws JMSException {
if (browsers != null && browsers.size() > 0) {
for (int i = 0; i < browsers.size(); i++) {
SubscriptionImpl sub = (SubscriptionImpl) browsers.get(i);
int count = 0;
ActiveMQMessage msg = null;
do {
msg = container.peekNext(sub.getLastMessageIdentity());
if (msg != null) {
if (sub.isTarget(msg)) {
sub.addMessage(container, msg);
dispatcher.wakeup(sub);
}
else {
sub.setLastMessageIdentifier(msg.getJMSMessageIdentity());
}
}
}
while (msg != null && !sub.isAtPrefetchLimit() && count++ < MAX_MESSAGES_DISPATCHED_FROM_POLL);
}
}
}
private void doPoll(QueueMessageContainer container, QueueList subList) throws JMSException {
int count = 0;
ActiveMQMessage msg = null;
if (subList != null && subList.size() > 0) {
do {
boolean dispatched = false;
msg = container.poll();
if (msg != null) {
QueueListEntry entry = subList.getFirstEntry();
boolean targeted = false;
while (entry != null) {
SubscriptionImpl sub = (SubscriptionImpl) entry.getElement();
if (sub.isTarget(msg)) {
targeted = true;
if (!sub.isAtPrefetchLimit()) {
sub.addMessage(container, msg);
dispatched = true;
dispatcher.wakeup(sub);
subList.rotate(); //round-robin the list
break;
}
}
entry = subList.getNextEntry(entry);
}
if (!dispatched) {
if (targeted) { //ie. it can be selected by current active consumers - but they are at
// pre-fectch
// limit
container.returnMessage(msg.getJMSMessageIdentity());
}
break;
}
}
}
while (msg != null && count++ < MAX_MESSAGES_DISPATCHED_FROM_POLL);
}
}
private void updateActiveSubscriptions(Subscription subscription) throws JMSException {
//need to do wildcards for this - but for now use exact matches
synchronized (subscriptionMutex) {
boolean processedSubscriptionContainer = false;
String subscriptionPhysicalName = subscription.getDestination().getPhysicalName();
for (Iterator iter = messageContainers.entrySet().iterator(); iter.hasNext();) {
Map.Entry entry = (Map.Entry) iter.next();
String destinationName = (String) entry.getKey();
QueueMessageContainer container = (QueueMessageContainer) entry.getValue();
if (destinationName.equals(subscriptionPhysicalName)) {
processedSubscriptionContainer = true;
}
processSubscription(subscription, container);
}
if (!processedSubscriptionContainer) {
processSubscription(subscription, (QueueMessageContainer) getContainer(subscriptionPhysicalName));
}
}
}
protected void processSubscription(Subscription subscription, QueueMessageContainer container) throws JMSException {
// TODO should change this for wild cards ...
if (subscription.isBrowser()) {
updateBrowsers(container, subscription);
}
else {
updateActiveSubscriptions(container, subscription);
}
}
private void updateActiveSubscriptions(QueueMessageContainer container, Subscription sub) throws JMSException {
//need to do wildcards for this - but for now use exact matches
//should change this for wild cards ...
if (container.getDestinationName().equals(sub.getDestination().getPhysicalName())) {
container.reset();//reset container - flushing all filter out messages to new consumer
QueueList list = getSubscriptionList(container);
if (!list.contains(sub)) {
list.add(sub);
}
}
}
private QueueList getSubscriptionList(QueueMessageContainer container) {
QueueList list = (QueueList) activeSubscriptions.get(container);
if (list == null) {
list = new DefaultQueueList();
activeSubscriptions.put(container, list);
}
return list;
}
private void updateBrowsers(QueueMessageContainer container, Subscription sub) throws JMSException {
//need to do wildcards for this - but for now use exact matches
//should change this for wild cards ...
if (container.getDestinationName().equals(sub.getDestination().getPhysicalName())) {
container.reset();//reset container - flushing all filter out messages to new consumer
QueueList list = getBrowserList(container);
if (!list.contains(sub)) {
list.add(sub);
}
}
}
private QueueList getBrowserList(QueueMessageContainer container) {
QueueList list = (QueueList) browsers.get(container);
if (list == null) {
list = new DefaultQueueList();
browsers.put(container, list);
}
return list;
}
/**
* Create filter for a Consumer
*
* @param info
* @return the Fitler
* @throws javax.jms.JMSException
*/
protected Filter createFilter(ConsumerInfo info) throws JMSException {
Filter filter = filterFactory.createFilter(info.getDestination(), info.getSelector());
if (info.isNoLocal()) {
filter = new AndFilter(filter, new NoLocalFilter(info.getClientId()));
}
return filter;
}
}