/**
*
* Copyright 2004 Hiram Chirino
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.activemq.store.journal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import javax.jms.JMSException;
import org.activeio.journal.RecordLocation;
import org.activemq.message.ActiveMQMessage;
import org.activemq.message.MessageAck;
import org.activemq.service.MessageIdentity;
import org.activemq.service.Transaction;
import org.activemq.service.TransactionManager;
import org.activemq.service.TransactionTask;
import org.activemq.store.MessageStore;
import org.activemq.store.RecoveryListener;
import org.activemq.store.cache.CacheMessageStore;
import org.activemq.store.cache.CacheMessageStoreAware;
import org.activemq.util.Callback;
import org.activemq.util.TransactionTemplate;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
/**
* A MessageStore that uses a Journal to store it's messages.
*
* @version $Revision: 1.1 $
*/
public class JournalMessageStore implements MessageStore, CacheMessageStoreAware {
private static final Log log = LogFactory.getLog(JournalMessageStore.class);
protected final JournalPersistenceAdapter peristenceAdapter;
protected final MessageStore longTermStore;
protected final String destinationName;
protected final TransactionTemplate transactionTemplate;
private LinkedHashMap addedMessageIds = new LinkedHashMap();
private ArrayList removedMessageLocations = new ArrayList();
protected HashSet inFlightTxLocations = new HashSet();
protected RecordLocation lastLocation;
/** A MessageStore that we can use to retreive messages quickly. */
private MessageStore cacheMessageStore = this;
protected final JournalTransactionStore transactionStore;
private LinkedHashMap cpAddedMessageIds;
int removedFromJournal;
public JournalMessageStore(JournalPersistenceAdapter adapter, MessageStore checkpointStore, String destinationName) {
this.peristenceAdapter = adapter;
this.transactionStore = this.peristenceAdapter.getTransactionStore();
this.longTermStore = checkpointStore;
this.destinationName = destinationName;
this.transactionTemplate = new TransactionTemplate(adapter);
}
/**
* Not synchronized since the Journal has better throughput if you increase
* the number of conncurrent writes that it is doing.
*/
public void addMessage(final ActiveMQMessage message) throws JMSException {
final RecordLocation location = peristenceAdapter.writePacket(destinationName, message, message.isReceiptRequired());
if( !TransactionManager.isCurrentTransaction() ) {
addMessage(message, location);
} else {
synchronized (this) {
inFlightTxLocations.add(location);
}
final Transaction tx = TransactionManager.getContexTransaction();
transactionStore.addMessage(this, message, location);
tx.addPostCommitTask(new TransactionTask() {
public void execute() throws Throwable {
synchronized (JournalMessageStore.this) {
inFlightTxLocations.remove(location);
addMessage(message, location);
}
}
});
}
}
/**
* @param message
* @param location
*/
private void addMessage(final ActiveMQMessage message, final RecordLocation location) {
synchronized (this) {
lastLocation=location;
MessageIdentity id = message.getJMSMessageIdentity();
addedMessageIds.put(id, location);
}
}
/**
*/
public void removeMessage(final MessageAck ack) throws JMSException {
final RecordLocation location = peristenceAdapter.writePacket(destinationName, ack, ack.isReceiptRequired());
if( !TransactionManager.isCurrentTransaction() ) {
removeMessage(ack, location);
} else {
synchronized( this ) {
inFlightTxLocations.add(location);
}
final Transaction tx = TransactionManager.getContexTransaction();
transactionStore.removeMessage(this, ack, location);
tx.addPostCommitTask(new TransactionTask(){
public void execute() throws Throwable {
synchronized (JournalMessageStore.this) {
inFlightTxLocations.remove(location);
removeMessage(ack, location);
}
}
});
}
}
/**
* @param ack
* @param location
*/
private void removeMessage(final MessageAck ack, final RecordLocation location) {
synchronized (this) {
lastLocation=location;
MessageIdentity id = ack.getMessageIdentity();
RecordLocation msgLocation = (RecordLocation) addedMessageIds.remove(id);
if (msgLocation == null) {
removedMessageLocations.add(ack);
} else {
removedFromJournal++;
}
}
}
/**
* @return
* @throws JMSException
*/
public RecordLocation checkpoint() throws JMSException {
RecordLocation rc;
final ArrayList cpRemovedMessageLocations;
final ArrayList cpActiveJournalLocations;
// swap out the message hash maps..
synchronized (this) {
cpAddedMessageIds = this.addedMessageIds;
cpRemovedMessageLocations = this.removedMessageLocations;
this.inFlightTxLocations.removeAll(this.removedMessageLocations);
this.inFlightTxLocations.removeAll(this.addedMessageIds.values());
cpActiveJournalLocations=new ArrayList(inFlightTxLocations);
this.addedMessageIds = new LinkedHashMap();
this.removedMessageLocations = new ArrayList();
log.debug("removedFromJournal="+removedFromJournal);
removedFromJournal=0;
}
final int messagesAdded[]=new int[]{0};
final int messagesRemoved[]=new int[]{0};
transactionTemplate.run(new Callback() {
public void execute() throws Throwable {
// Checkpoint the added messages.
Iterator iterator = cpAddedMessageIds.keySet().iterator();
while (iterator.hasNext()) {
MessageIdentity identity = (MessageIdentity) iterator.next();
ActiveMQMessage msg = getCacheMessage(identity);
// Pull it out of the journal if we have to.
if (msg == null) {
RecordLocation location = (RecordLocation) cpAddedMessageIds.get(identity);
msg = (ActiveMQMessage) peristenceAdapter.readPacket((RecordLocation) location);
}
if( msg != null ) {
try {
longTermStore.addMessage(msg);
messagesAdded[0]++;
} catch (Throwable e) {
log.warn("Message could not be added to long term store: " + e.getMessage(), e);
}
} else {
log.warn("Journal could not reload message: " + identity);
}
}
// Checkpoint the removed messages.
iterator = cpRemovedMessageLocations.iterator();
while (iterator.hasNext()) {
try {
MessageAck ack = (MessageAck) iterator.next();
longTermStore.removeMessage(ack);
messagesRemoved[0]++;
} catch (Throwable e) {
log.debug("Message could not be removed from long term store: " + e.getMessage(), e);
}
}
}
});
log.debug("Added "+messagesAdded[0]+" message(s) and removed "+messagesRemoved[0]+" message(s). removedFromJournal="+removedFromJournal);
synchronized (this) {
cpAddedMessageIds = null;
}
Collections.sort(cpActiveJournalLocations);
if( cpActiveJournalLocations.size() > 0 ) {
return (RecordLocation) cpActiveJournalLocations.get(0);
} else {
return lastLocation;
}
}
private ActiveMQMessage getCacheMessage(MessageIdentity identity) throws JMSException {
return cacheMessageStore.getMessage(identity);
}
/**
*
*/
public ActiveMQMessage getMessage(MessageIdentity identity) throws JMSException {
ActiveMQMessage answer = null;
Object location;
synchronized (this) {
location = addedMessageIds.get(identity);
if( location==null && cpAddedMessageIds!=null )
location = cpAddedMessageIds.get(identity);
}
// Do we have a still have it in the journal?
if (location != null ) {
try {
answer = (ActiveMQMessage)peristenceAdapter.readPacket((RecordLocation) location);
if (answer != null)
return answer;
} catch (Throwable e) {
// We could have had an async checkpoint and thus we cannot read that location anymore,
// but now the message should be in the long term store.
}
}
// If all else fails try the long term message store.
return longTermStore.getMessage(identity);
}
/**
* Replays the checkpointStore first as those messages are the oldest ones,
* then messages are replayed from the transaction log and then the cache is
* updated.
*
* @param listener
* @throws JMSException
*/
public void recover(final RecoveryListener listener) throws JMSException {
peristenceAdapter.checkpoint(true);
longTermStore.recover(listener);
}
public void start() throws JMSException {
longTermStore.start();
}
public void stop() throws JMSException {
longTermStore.stop();
}
/**
* @return Returns the longTermStore.
*/
public MessageStore getLongTermMessageStore() {
return longTermStore;
}
/**
* @see org.activemq.store.cache.CacheMessageStoreAware#setCacheMessageStore(org.activemq.store.cache.CacheMessageStore)
*/
public void setCacheMessageStore(CacheMessageStore store) {
cacheMessageStore = store;
// Propagate the setCacheMessageStore method call to the longTermStore
// if possible.
if (longTermStore instanceof CacheMessageStoreAware) {
((CacheMessageStoreAware) longTermStore).setCacheMessageStore(store);
}
}
/**
* @see org.activemq.store.MessageStore#removeAllMessages()
*/
public void removeAllMessages() throws JMSException {
peristenceAdapter.checkpoint(true);
longTermStore.removeAllMessages();
}
public void replayAddMessage(ActiveMQMessage msg) {
try {
// Only add the message if it has not already been added.
ActiveMQMessage t = longTermStore.getMessage(msg.getJMSMessageIdentity());
if( t==null ) {
longTermStore.addMessage(msg);
}
}
catch (Throwable e) {
log.debug("Could not replay add for message '" + msg.getJMSMessageIdentity().getMessageID() + "'. Message may have already been added. reason: " + e);
}
}
public void replayRemoveMessage(MessageAck ack) {
try {
// Only remove the message if it has not already been removed.
ActiveMQMessage t = longTermStore.getMessage(ack.getMessageIdentity());
if( t!=null ) {
longTermStore.removeMessage(ack);
}
}
catch (Throwable e) {
log.debug("Could not replay acknowledge for message '" + ack.getMessageID() + "'. Message may have already been acknowledged. reason: " + e);
}
}
}