/**
*
* Copyright 2004 Hiram Chirino
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.codehaus.activemq.store.jdbc;
import java.sql.Connection;
import java.sql.SQLException;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.io.WireFormat;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.service.MessageContainer;
import org.codehaus.activemq.service.MessageIdentity;
import org.codehaus.activemq.service.SubscriberEntry;
import org.codehaus.activemq.service.Subscription;
import org.codehaus.activemq.store.TopicMessageStore;
import org.codehaus.activemq.store.jdbc.JDBCAdapter.MessageListResultHandler;
import org.codehaus.activemq.util.JMSExceptionHelper;
/**
* @version $Revision: 1.5 $
*/
public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {
private static final Log log = LogFactory.getLog(JDBCTopicMessageStore.class);
private MessageContainer container;
public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, String destinationName) {
super(persistenceAdapter, adapter, wireFormat, destinationName);
}
public void setLastAcknowledgedMessageIdentity(String subscription, MessageIdentity messageIdentity) throws JMSException {
long seq = ((Long) messageIdentity.getSequenceNumber()).longValue();
// Get a connection and insert the message into the DB.
Connection c = null;
try {
c = persistenceAdapter.getConnection();
adapter.doSetLastAck(c, destinationName, subscription, seq);
}
catch (SQLException e) {
throw JMSExceptionHelper.newJMSException("Failed to store ack for: " + subscription + " on message " + messageIdentity + " in container: " + e, e);
}
finally {
persistenceAdapter.returnConnection(c);
}
}
/**
* @see org.codehaus.activemq.store.TopicMessageStore#getLastestMessageIdentity()
*/
public MessageIdentity getLastestMessageIdentity() throws JMSException {
return new MessageIdentity(null, new Long(sequenceGenerator.getLastSequenceId()));
}
/**
* @see org.codehaus.activemq.store.TopicMessageStore#recoverSubscription(org.codehaus.activemq.service.Subscription, org.codehaus.activemq.service.MessageIdentity)
*/
public void recoverSubscription(final Subscription subscription, MessageIdentity lastDispatchedMessage) throws JMSException {
Connection c = null;
try {
c = persistenceAdapter.getConnection();
adapter.doRecoverSubscription(c, destinationName, subscription.getPersistentKey(), new MessageListResultHandler() {
public void onMessage(long seq, String messageID) throws JMSException {
MessageIdentity messageIdentity = new MessageIdentity(messageID, new Long(seq));
ActiveMQMessage message = getMessage(messageIdentity);
subscription.addMessage(container, message);
}
});
}
catch (SQLException e) {
throw JMSExceptionHelper.newJMSException("Failed to recover subscription: " + subscription + ". Reason: " + e, e);
}
finally {
persistenceAdapter.returnConnection(c);
}
}
/**
* @see org.codehaus.activemq.store.TopicMessageStore#setSubscriberEntry(org.codehaus.activemq.message.ConsumerInfo, org.codehaus.activemq.service.SubscriberEntry)
*/
public void setSubscriberEntry(ConsumerInfo info, SubscriberEntry subscriberEntry) throws JMSException {
String key = info.getConsumerKey();
Connection c = null;
try {
c = persistenceAdapter.getConnection();
adapter.doSetSubscriberEntry(c, destinationName, key, subscriberEntry);
}
catch (SQLException e) {
throw JMSExceptionHelper.newJMSException("Failed to lookup subscription for info: " + info + ". Reason: " + e, e);
}
finally {
persistenceAdapter.returnConnection(c);
}
}
/**
* @see org.codehaus.activemq.store.TopicMessageStore#getSubscriberEntry(org.codehaus.activemq.message.ConsumerInfo)
*/
public SubscriberEntry getSubscriberEntry(ConsumerInfo info) throws JMSException {
String key = info.getConsumerKey();
Connection c = null;
try {
c = persistenceAdapter.getConnection();
return adapter.doGetSubscriberEntry(c, destinationName, key);
}
catch (SQLException e) {
throw JMSExceptionHelper.newJMSException("Failed to lookup subscription for info: " + info + ". Reason: " + e, e);
}
finally {
persistenceAdapter.returnConnection(c);
}
}
public void deleteSubscription(String subscription) throws JMSException {
Connection c = null;
try {
c = persistenceAdapter.getConnection();
adapter.doDeleteSubscription(c, destinationName, subscription);
}
catch (SQLException e) {
throw JMSExceptionHelper.newJMSException("Failed to remove subscription for: " + subscription + ". Reason: " + e, e);
}
finally {
persistenceAdapter.returnConnection(c);
}
}
/**
* @see org.codehaus.activemq.store.TopicMessageStore#setMessageContainer(org.codehaus.activemq.service.MessageContainer)
*/
public void setMessageContainer(MessageContainer container) {
this.container = container;
}
public void incrementMessageCount(MessageIdentity messageId) throws JMSException {
}
public void decrementMessageCountAndMaybeDelete(MessageIdentity messageIdentity) throws JMSException {
}
}