/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.codehaus.activemq.service.boundedvm;
import java.util.List;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import org.codehaus.activemq.broker.BrokerClient;
import org.codehaus.activemq.broker.BrokerConnector;
import org.codehaus.activemq.filter.Filter;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.BrokerInfo;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.Packet;
import org.codehaus.activemq.message.util.MemoryBoundedQueue;
/**
* A holder for Transient Queue consumer info and message routing
*
* @version $Revision: 1.6 $
*/
public class TransientQueueSubscription extends TransientSubscription {
private BrokerClient client;
private String brokerName;
private String clusterName;
private MemoryBoundedQueue dispatchedQueue;
/**
* Construct the TransientQueueSubscription
*
* @param client
* @param dispatchedQueue
* @param filter
* @param info
*/
public TransientQueueSubscription(BrokerClient client, MemoryBoundedQueue dispatchedQueue, Filter filter,
ConsumerInfo info) {
super(filter, info);
this.client = client;
this.dispatchedQueue = dispatchedQueue;
if (client != null){
BrokerConnector connector = client.getBrokerConnector();
if (connector != null){
BrokerInfo bi = connector.getBrokerInfo();
if (bi != null){
this.brokerName = bi.getBrokerName();
this.clusterName = bi.getClusterName();
}
}
}
}
/**
* determines if the Subscription is interested in the message
*
* @param message
* @return true if this Subscription will accept the message
* @throws JMSException
*/
public boolean isTarget(ActiveMQMessage message) throws JMSException {
boolean result = false;
if (message != null) {
//make sure we don't loop messages around the cluster
if (!client.isClusteredConnection() || !message.isEntryCluster(clusterName)
|| message.isEntryBroker(brokerName)) {
result = filter.matches(message)
&& (message.getJMSDeliveryMode() == DeliveryMode.NON_PERSISTENT || consumerInfo
.getDestination().isTemporary());
}
}
return result;
}
/**
* @return true if the consumer has capacity for more messages
*/
public boolean canAcceptMessages() {
return dispatchedQueue.size() <= consumerInfo.getPrefetchNumber();
}
/**
* Dispatch a message to the Consumer
*
* @param message
* @throws JMSException
*/
public void doDispatch(ActiveMQMessage message) throws JMSException {
addDispatchedMessage(message);
message = message.shallowCopy();
message.setConsumerNos(new int[]{consumerInfo.getConsumerNo()});
client.dispatch(message);
}
/**
* Add a dispatched message
*
* @param message
*/
private void addDispatchedMessage(ActiveMQMessage message) {
dispatchedQueue.enqueue(message);
}
/**
* Acknowledge the receipt of a message by a consumer
*
* @param id
* @return the removed ActiveMQMessage with the associated id
*/
public ActiveMQMessage acknowledgeMessage(String id) {
ActiveMQMessage msg = (ActiveMQMessage) dispatchedQueue.remove(id);
return msg;
}
/**
* @return all the unacknowledge messages
*/
public List getUndeliveredMessages() {
return dispatchedQueue.getContents();
}
/**
* close the subscription
*/
public void close() {
super.close();
dispatchedQueue.close();
}
}