Package org.codehaus.activemq.store.jdbc

Source Code of org.codehaus.activemq.store.jdbc.JDBCTopicMessageStore

/**
*
* Copyright 2004 Hiram Chirino
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.codehaus.activemq.store.jdbc;

import java.sql.Connection;
import java.sql.SQLException;

import javax.jms.JMSException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.io.WireFormat;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.ConsumerInfo;
import org.codehaus.activemq.service.MessageContainer;
import org.codehaus.activemq.service.MessageIdentity;
import org.codehaus.activemq.service.SubscriberEntry;
import org.codehaus.activemq.service.Subscription;
import org.codehaus.activemq.store.TopicMessageStore;
import org.codehaus.activemq.store.jdbc.JDBCAdapter.MessageListResultHandler;
import org.codehaus.activemq.util.JMSExceptionHelper;

/**
* @version $Revision: 1.5 $
*/
public class JDBCTopicMessageStore extends JDBCMessageStore implements TopicMessageStore {

    private static final Log log = LogFactory.getLog(JDBCTopicMessageStore.class);
    private MessageContainer container;

    public JDBCTopicMessageStore(JDBCPersistenceAdapter persistenceAdapter, JDBCAdapter adapter, WireFormat wireFormat, String destinationName) {
        super(persistenceAdapter, adapter, wireFormat, destinationName);
    }

    public void setLastAcknowledgedMessageIdentity(String subscription, MessageIdentity messageIdentity) throws JMSException {
        long seq = ((Long) messageIdentity.getSequenceNumber()).longValue();
        // Get a connection and insert the message into the DB.
        Connection c = null;
        try {
            c = persistenceAdapter.getConnection();
            adapter.doSetLastAck(c, destinationName, subscription,  seq);
        }
        catch (SQLException e) {
            throw JMSExceptionHelper.newJMSException("Failed to store ack for: " + subscription + " on message " + messageIdentity + " in container: " + e, e);
        }
        finally {
            persistenceAdapter.returnConnection(c);
        }
    }

    /**
     * @see org.codehaus.activemq.store.TopicMessageStore#getLastestMessageIdentity()
     */
    public MessageIdentity getLastestMessageIdentity() throws JMSException {
        return new MessageIdentity(null, new Long(sequenceGenerator.getLastSequenceId()));
    }

    /**
     * @see org.codehaus.activemq.store.TopicMessageStore#recoverSubscription(org.codehaus.activemq.service.Subscription, org.codehaus.activemq.service.MessageIdentity)
     */
    public void recoverSubscription(final Subscription subscription, MessageIdentity lastDispatchedMessage) throws JMSException {

        Connection c = null;
        try {
            c = persistenceAdapter.getConnection();
            adapter.doRecoverSubscription(c, destinationName, subscription.getPersistentKey(), new MessageListResultHandler() {
                public void onMessage(long seq, String messageID) throws JMSException {
                    MessageIdentity messageIdentity = new MessageIdentity(messageID, new Long(seq));
                    ActiveMQMessage message = getMessage(messageIdentity);
                    subscription.addMessage(container, message);
                }
            });
        }
        catch (SQLException e) {
            throw JMSExceptionHelper.newJMSException("Failed to recover subscription: " + subscription + ". Reason: " + e, e);
        }
        finally {
            persistenceAdapter.returnConnection(c);
        }
    }

    /**
     * @see org.codehaus.activemq.store.TopicMessageStore#setSubscriberEntry(org.codehaus.activemq.message.ConsumerInfo, org.codehaus.activemq.service.SubscriberEntry)
     */
    public void setSubscriberEntry(ConsumerInfo info, SubscriberEntry subscriberEntry) throws JMSException {
        String key = info.getConsumerKey();
        Connection c = null;
        try {
            c = persistenceAdapter.getConnection();
            adapter.doSetSubscriberEntry(c, destinationName, key, subscriberEntry);
        }
        catch (SQLException e) {
            throw JMSExceptionHelper.newJMSException("Failed to lookup subscription for info: " + info + ". Reason: " + e, e);
        }
        finally {
            persistenceAdapter.returnConnection(c);
        }
    }

    /**
     * @see org.codehaus.activemq.store.TopicMessageStore#getSubscriberEntry(org.codehaus.activemq.message.ConsumerInfo)
     */
    public SubscriberEntry getSubscriberEntry(ConsumerInfo info) throws JMSException {
        String key = info.getConsumerKey();
        Connection c = null;
        try {
            c = persistenceAdapter.getConnection();
            return adapter.doGetSubscriberEntry(c, destinationName, key);
        }
        catch (SQLException e) {
            throw JMSExceptionHelper.newJMSException("Failed to lookup subscription for info: " + info + ". Reason: " + e, e);
        }
        finally {
            persistenceAdapter.returnConnection(c);
        }
    }

    public void deleteSubscription(String subscription) throws JMSException {
        Connection c = null;
        try {
            c = persistenceAdapter.getConnection();
            adapter.doDeleteSubscription(c, destinationName, subscription);
        }
        catch (SQLException e) {
            throw JMSExceptionHelper.newJMSException("Failed to remove subscription for: " + subscription + ". Reason: " + e, e);
        }
        finally {
            persistenceAdapter.returnConnection(c);
        }
    }

    /**
     * @see org.codehaus.activemq.store.TopicMessageStore#setMessageContainer(org.codehaus.activemq.service.MessageContainer)
     */
    public void setMessageContainer(MessageContainer container) {
        this.container = container;
    }

    public void incrementMessageCount(MessageIdentity messageId) throws JMSException {
    }

    public void decrementMessageCountAndMaybeDelete(MessageIdentity messageIdentity) throws JMSException {
    }

}
TOP

Related Classes of org.codehaus.activemq.store.jdbc.JDBCTopicMessageStore

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.