/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.codehaus.activemq.store.jdbm;
import jdbm.btree.BTree;
import jdbm.helper.Tuple;
import jdbm.helper.TupleBrowser;
import org.codehaus.activemq.AlreadyClosedException;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.service.MessageIdentity;
import org.codehaus.activemq.service.SubscriberEntry;
import org.codehaus.activemq.service.Subscription;
import org.codehaus.activemq.store.TopicMessageStore;
import org.codehaus.activemq.util.JMSExceptionHelper;
import javax.jms.JMSException;
import java.io.IOException;
/**
* @version $Revision: 1.5 $
*/
public class JdbmTopicMessageStore extends JdbmMessageStore implements TopicMessageStore {
private static final Integer ONE = new Integer(1);
private BTree ackDatabase;
private BTree messageCounts;
private BTree subscriberDetails;
public JdbmTopicMessageStore(BTree messageTable, BTree orderedIndex, BTree ackDatabase, BTree subscriberDetails, BTree messageCounts) {
super(messageTable, orderedIndex);
this.ackDatabase = ackDatabase;
this.subscriberDetails = subscriberDetails;
this.messageCounts = messageCounts;
}
public synchronized void incrementMessageCount(MessageIdentity messageId) throws JMSException {
try {
Integer number = (Integer) getMessageCounts().find(messageId);
if (number == null) {
number = ONE;
}
else {
number = new Integer(number.intValue() + 1);
}
getMessageCounts().insert(messageId, number, true);
}
catch (IOException e) {
throw JMSExceptionHelper.newJMSException("Failed to increment messageCount for messageID: " + messageId + ". Reason: " + e, e);
}
}
public synchronized void decrementMessageCountAndMaybeDelete(MessageIdentity messageIdentity, MessageAck ack) throws JMSException {
try {
Integer number = (Integer) getMessageCounts().find(messageIdentity);
if (number == null || number.intValue() <= 1) {
removeMessage(messageIdentity, ack);
if (number != null) {
getMessageCounts().remove(messageIdentity);
}
}
else {
getMessageCounts().insert(messageIdentity, new Integer(number.intValue() - 1), true);
number = ONE;
}
}
catch (IOException e) {
throw JMSExceptionHelper.newJMSException("Failed to increment messageCount for messageID: " + messageIdentity + ". Reason: " + e, e);
}
}
public synchronized void setLastAcknowledgedMessageIdentity(Subscription subscription, MessageIdentity messageIdentity) throws JMSException {
String key = subscription.getPersistentKey();
try {
getAckDatabase().insert(key, messageIdentity, true);
}
catch (IOException e) {
throw JMSExceptionHelper.newJMSException("Failed to set ack messageID: " + messageIdentity + " for consumerId: " + key + ". Reason: " + e, e);
}
}
public synchronized void recoverSubscription(Subscription subscription, MessageIdentity lastDispatchedMessage) throws JMSException {
try {
MessageIdentity lastAcked = getLastAcknowledgedMessageIdentity(subscription);
if (lastAcked == null) {
// for a new durable subscription lets write the last ack messageID
// as the previous one that the container delivered to ensure that
// if we go down before acking anything, we will recover to the right point
setLastAcknowledgedMessageIdentity(subscription, lastDispatchedMessage);
return;
}
Object lastAckedSequenceNumber = lastAcked.getSequenceNumber();
// lets iterate through all IDs from the
//Tuple tuple = new Tuple();
Tuple tuple = getOrderedIndex().findGreaterOrEqual(lastAckedSequenceNumber);
TupleBrowser iter = getOrderedIndex().browse();
while (iter.getNext(tuple)) {
Long sequenceNumber = (Long) tuple.getKey();
if (sequenceNumber.compareTo(lastAckedSequenceNumber) > 0) {
ActiveMQMessage message = null;
// TODO we could probably tune this some more since we have tuple.getValue() already
message = getMessageBySequenceNumber(sequenceNumber);
if (message != null) {
subscription.addMessage(getContainer(), message);
}
}
}
}
catch (IOException e) {
throw JMSExceptionHelper.newJMSException("Failed to recover subscription: " + subscription + ". Reason: " + e, e);
}
}
public synchronized MessageIdentity getLastestMessageIdentity() throws JMSException {
return new MessageIdentity(null, new Long(getLastSequenceNumber()));
}
public SubscriberEntry getSubscriberEntry(ConsumerInfo info) throws JMSException {
Object key = info.getConsumerKey();
try {
return (SubscriberEntry) subscriberDetails.find(key);
}
catch (IOException e) {
throw JMSExceptionHelper.newJMSException("Failed to lookup subscription for info: " + info + ". Reason: " + e, e);
}
}
public void setSubscriberEntry(ConsumerInfo info, SubscriberEntry subscriberEntry) throws JMSException {
Object key = info.getConsumerKey();
try {
subscriberDetails.insert(key, subscriberEntry, true);
}
catch (IOException e) {
throw JMSExceptionHelper.newJMSException("Failed to lookup subscription for info: " + info + ". Reason: " + e, e);
}
}
public synchronized void stop() throws JMSException {
JMSException firstException = closeTable(ackDatabase, null);
firstException = closeTable(messageCounts, firstException);
ackDatabase = null;
messageCounts = null;
super.stop();
if (firstException != null) {
throw firstException;
}
}
// Implementation methods
//-------------------------------------------------------------------------
protected BTree getMessageCounts() throws AlreadyClosedException {
if (messageCounts == null) {
throw new AlreadyClosedException("JDBM TopicMessageStore");
}
return messageCounts;
}
protected BTree getAckDatabase() throws AlreadyClosedException {
if (ackDatabase == null) {
throw new AlreadyClosedException("JDBM TopicMessageStore");
}
return ackDatabase;
}
protected MessageIdentity getLastAcknowledgedMessageIdentity(Subscription subscription) throws IOException, AlreadyClosedException {
return (MessageIdentity) getAckDatabase().find(subscription.getPersistentKey());
}
}