Package org.activemq.store.journal

Source Code of org.activemq.store.journal.JournalMessageStore

/**
*
* Copyright 2004 Hiram Chirino
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.activemq.store.journal;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;

import javax.jms.JMSException;

import org.activeio.journal.RecordLocation;
import org.activemq.message.ActiveMQMessage;
import org.activemq.message.MessageAck;
import org.activemq.service.MessageIdentity;
import org.activemq.service.Transaction;
import org.activemq.service.TransactionManager;
import org.activemq.service.TransactionTask;
import org.activemq.store.MessageStore;
import org.activemq.store.RecoveryListener;
import org.activemq.store.cache.CacheMessageStore;
import org.activemq.store.cache.CacheMessageStoreAware;
import org.activemq.util.Callback;
import org.activemq.util.TransactionTemplate;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
* A MessageStore that uses a Journal to store it's messages.
*
* @version $Revision: 1.1 $
*/
public class JournalMessageStore implements MessageStore, CacheMessageStoreAware {

    private static final Log log = LogFactory.getLog(JournalMessageStore.class);
    protected final JournalPersistenceAdapter peristenceAdapter;
    protected final MessageStore longTermStore;
    protected final String destinationName;
    protected final TransactionTemplate transactionTemplate;

    private LinkedHashMap addedMessageIds = new LinkedHashMap();
    private ArrayList removedMessageLocations = new ArrayList();
    protected HashSet inFlightTxLocations = new HashSet();  
    protected RecordLocation lastLocation;

    /** A MessageStore that we can use to retreive messages quickly. */
    private MessageStore cacheMessageStore = this;

    protected final JournalTransactionStore transactionStore;

    private LinkedHashMap cpAddedMessageIds;
   
    int removedFromJournal;

    public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, String destinationName) {
        this.peristenceAdapter = adapter;
        this.transactionStore = this.peristenceAdapter.getTransactionStore();
        this.longTermStore = checkpointStore;
        this.destinationName = destinationName;
        this.transactionTemplate = new TransactionTemplate(adapter);
    }

    /**
     * Not synchronized since the Journal has better throughput if you increase
     * the number of conncurrent writes that it is doing.
     */
    public void addMessage(final ActiveMQMessage message) throws JMSException {
        final RecordLocation location = peristenceAdapter.writePacket(destinationName, message, message.isReceiptRequired());
        if( !TransactionManager.isCurrentTransaction() ) {
            addMessage(message, location);
        } else {
            synchronized (this) {
                inFlightTxLocations.add(location);
            }
            final Transaction tx = TransactionManager.getContexTransaction();
            transactionStore.addMessage(this, message, location);
            tx.addPostCommitTask(new TransactionTask() {
                public void execute() throws Throwable {
                    synchronized (JournalMessageStore.this) {
                        inFlightTxLocations.remove(location);
                        addMessage(message, location);
                    }
                }
            });
        }
    }
   
    /**
     * @param message
     * @param location
     */
    private void addMessage(final ActiveMQMessage message, final RecordLocation location) {
        synchronized (this) {
            lastLocation=location;
            MessageIdentity id = message.getJMSMessageIdentity();
            addedMessageIds.put(id, location);
        }
    }

    /**
     */
    public void removeMessage(final MessageAck ack) throws JMSException {

        final RecordLocation location = peristenceAdapter.writePacket(destinationName, ack, ack.isReceiptRequired());
        if( !TransactionManager.isCurrentTransaction() ) {
            removeMessage(ack, location);
        } else {
            synchronized( this ) {
                inFlightTxLocations.add(location);
            }
            final Transaction tx = TransactionManager.getContexTransaction();
            transactionStore.removeMessage(this, ack, location);
            tx.addPostCommitTask(new TransactionTask(){
                public void execute() throws Throwable {
                    synchronized (JournalMessageStore.this) {
                        inFlightTxLocations.remove(location);
                        removeMessage(ack, location);
                    }
                }
            });
        }
    }

    /**
     * @param ack
     * @param location
     */
    private void removeMessage(final MessageAck ack, final RecordLocation location) {
        synchronized (this) {
            lastLocation=location;
            MessageIdentity id = ack.getMessageIdentity();
            RecordLocation msgLocation = (RecordLocation) addedMessageIds.remove(id);
            if (msgLocation == null) {
                removedMessageLocations.add(ack);
            } else {
                removedFromJournal++;
            }
        }
    }

    /**
     * @return
     * @throws JMSException
     */
    public RecordLocation checkpoint() throws JMSException {

        RecordLocation rc;
        final ArrayList cpRemovedMessageLocations;
        final ArrayList cpActiveJournalLocations;

        // swap out the message hash maps..
        synchronized (this) {
            cpAddedMessageIds = this.addedMessageIds;
            cpRemovedMessageLocations = this.removedMessageLocations;

            this.inFlightTxLocations.removeAll(this.removedMessageLocations);
            this.inFlightTxLocations.removeAll(this.addedMessageIds.values());           
            cpActiveJournalLocations=new ArrayList(inFlightTxLocations);
           
            this.addedMessageIds = new LinkedHashMap();
            this.removedMessageLocations = new ArrayList();           
            log.debug("removedFromJournal="+removedFromJournal);
            removedFromJournal=0;
        }
       
        final int messagesAdded[]=new int[]{0};
        final int messagesRemoved[]=new int[]{0};

        transactionTemplate.run(new Callback() {
            public void execute() throws Throwable {

                // Checkpoint the added messages.
                Iterator iterator = cpAddedMessageIds.keySet().iterator();
                while (iterator.hasNext()) {
                    MessageIdentity identity = (MessageIdentity) iterator.next();
                    ActiveMQMessage msg = getCacheMessage(identity);
                    // Pull it out of the journal if we have to.
                    if (msg == null) {
                        RecordLocation location = (RecordLocation) cpAddedMessageIds.get(identity);
                        msg = (ActiveMQMessage) peristenceAdapter.readPacket((RecordLocation) location);
                    }
                    if( msg != null ) {
                        try {
                            longTermStore.addMessage(msg);
                            messagesAdded[0]++;
                        } catch (Throwable e) {
                            log.warn("Message could not be added to long term store: " + e.getMessage(), e);
                        }
                    } else {
                        log.warn("Journal could not reload message: " + identity);                       
                    }
                }

                // Checkpoint the removed messages.
                iterator = cpRemovedMessageLocations.iterator();
                while (iterator.hasNext()) {
                    try {
                        MessageAck ack = (MessageAck) iterator.next();
                        longTermStore.removeMessage(ack);
                        messagesRemoved[0]++;
                    } catch (Throwable e) {
                        log.debug("Message could not be removed from long term store: " + e.getMessage(), e);
                    }
                }
            }

        });

        log.debug("Added "+messagesAdded[0]+" message(s) and removed "+messagesRemoved[0]+" message(s). removedFromJournal="+removedFromJournal);
        synchronized (this) {
            cpAddedMessageIds = null;
        }
       
        Collections.sort(cpActiveJournalLocations);
        if( cpActiveJournalLocations.size() > 0 ) {
            return (RecordLocation) cpActiveJournalLocations.get(0);
        } else {
            return lastLocation;
        }
    }

    private ActiveMQMessage getCacheMessage(MessageIdentity identity) throws JMSException {
        return cacheMessageStore.getMessage(identity);
    }

    /**
     *
     */
    public ActiveMQMessage getMessage(MessageIdentity identity) throws JMSException {
        ActiveMQMessage answer = null;

        Object location;
        synchronized (this) {
            location = addedMessageIds.get(identity);
            if( location==null && cpAddedMessageIds!=null )
                location = cpAddedMessageIds.get(identity);
        }
       
        // Do we have a still have it in the journal?
        if (location != null ) {
            try {
                answer = (ActiveMQMessage)peristenceAdapter.readPacket((RecordLocation) location);
                if (answer != null)
                    return answer;
            } catch (Throwable e) {
                // We could have had an async checkpoint and thus we cannot read that location anymore,
                // but now the message should be in the long term store.
            }
        }

        // If all else fails try the long term message store.
        return longTermStore.getMessage(identity);
    }

    /**
     * Replays the checkpointStore first as those messages are the oldest ones,
     * then messages are replayed from the transaction log and then the cache is
     * updated.
     *
     * @param listener
     * @throws JMSException
     */
    public void recover(final RecoveryListener listener) throws JMSException {
        peristenceAdapter.checkpoint(true);
        longTermStore.recover(listener);
    }

    public void start() throws JMSException {
        longTermStore.start();
    }

    public void stop() throws JMSException {
        longTermStore.stop();
    }

    /**
     * @return Returns the longTermStore.
     */
    public MessageStore getLongTermMessageStore() {
        return longTermStore;
    }

    /**
     * @see org.activemq.store.cache.CacheMessageStoreAware#setCacheMessageStore(org.activemq.store.cache.CacheMessageStore)
     */
    public void setCacheMessageStore(CacheMessageStore store) {
        cacheMessageStore = store;
        // Propagate the setCacheMessageStore method call to the longTermStore
        // if possible.
        if (longTermStore instanceof CacheMessageStoreAware) {
            ((CacheMessageStoreAware) longTermStore).setCacheMessageStore(store);
        }
    }

    /**
     * @see org.activemq.store.MessageStore#removeAllMessages()
     */
    public void removeAllMessages() throws JMSException {
        peristenceAdapter.checkpoint(true);
        longTermStore.removeAllMessages();
    }

    public void replayAddMessage(ActiveMQMessage msg) {
        try {
            // Only add the message if it has not already been added.
            ActiveMQMessage t = longTermStore.getMessage(msg.getJMSMessageIdentity());
            if( t==null ) {
                longTermStore.addMessage(msg);
            }
        }
        catch (Throwable e) {
            log.debug("Could not replay add for message '" + msg.getJMSMessageIdentity().getMessageID() + "'.  Message may have already been added. reason: " + e);
        }
    }

    public void replayRemoveMessage(MessageAck ack) {
        try {
            // Only remove the message if it has not already been removed.
            ActiveMQMessage t = longTermStore.getMessage(ack.getMessageIdentity());
            if( t!=null ) {
                longTermStore.removeMessage(ack);
            }
        }
        catch (Throwable e) {
            log.debug("Could not replay acknowledge for message '" + ack.getMessageID() + "'.  Message may have already been acknowledged. reason: " + e);
        }
    }

}
TOP

Related Classes of org.activemq.store.journal.JournalMessageStore

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.