/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.codehaus.activemq.store.howl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.message.Packet;
import org.codehaus.activemq.message.WireFormat;
import org.codehaus.activemq.service.MessageIdentity;
import org.codehaus.activemq.service.QueueMessageContainer;
import org.codehaus.activemq.store.MessageStore;
import org.codehaus.activemq.util.Callback;
import org.codehaus.activemq.util.JMSExceptionHelper;
import org.codehaus.activemq.util.TransactionTemplate;
import org.objectweb.howl.log.LogConfigurationException;
import org.objectweb.howl.log.LogException;
import org.objectweb.howl.log.LogRecord;
import org.objectweb.howl.log.Logger;
import org.objectweb.howl.log.ReplayListener;
import javax.jms.JMSException;
import java.io.IOException;
import java.util.LinkedHashMap;
import java.util.Map;
/**
* An implementation of {@link MessageStore} designed for
* optimal use with <a href="http://howl.objectweb.org/">Howl</a>
* as the transaction log and then checkpointing asynchronously
* on a timeout with some other persistent storage.
*
* @version $Revision: 1.3 $
*/
public class HowlMessageStore implements MessageStore {
private static final int DEFAULT_RECORD_SIZE = 64 * 1024;
private static final Log log = LogFactory.getLog(HowlMessageStore.class);
private HowlPersistenceAdapter longTermPersistence;
private MessageStore longTermStore;
private Logger transactionLog;
private WireFormat wireFormat;
private TransactionTemplate transactionTemplate;
private int maximumCacheSize = 100;
private Map map = new LinkedHashMap();
private boolean sync = true;
private long lastLogMark;
private Exception firstException;
public HowlMessageStore(HowlPersistenceAdapter adapter, MessageStore checkpointStore, Logger transactionLog, WireFormat wireFormat) {
this.longTermPersistence = adapter;
this.longTermStore = checkpointStore;
this.transactionLog = transactionLog;
this.wireFormat = wireFormat;
this.transactionTemplate = new TransactionTemplate(adapter);
}
/**
* This method is synchronized to ensure that only 1 thread can write to the log and cache
* and possibly checkpoint at once, to preserve order across the transaction log,
* cache and checkpointStore.
*/
public synchronized MessageIdentity addMessage(ActiveMQMessage message) throws JMSException {
// write to howl
writePacket(message);
// can we add it to the cache?
if (!addMessageToCache(message)) {
log.warn("Not enough RAM to store the active transaction log and so we're having to force" +
"a checkpoint so that we can ensure that reads are efficient and do not have to " +
"replay the transaction log");
checkpoint(message);
// now lets add the current message to the checkpoint store
longTermStore.addMessage(message);
}
return message.getJMSMessageIdentity();
}
/**
* Lets ensure that readers don't block writers so there only synchronization on
* the cache and checkpointStore.
*/
public ActiveMQMessage getMessage(MessageIdentity identity) throws JMSException {
ActiveMQMessage answer = null;
synchronized (map) {
answer = (ActiveMQMessage) map.get(identity.getMessageID());
}
if (answer == null) {
answer = longTermStore.getMessage(identity);
}
return answer;
}
/**
* Removes can be done in any order so we only synchronize on the cache and
* checkpointStore
*/
public void removeMessage(MessageIdentity identity, MessageAck ack) throws JMSException {
// write to howl
writePacket(ack);
synchronized (map) {
map.remove(identity.getMessageID());
}
longTermPersistence.onMessageRemove(this);
}
/**
* Replays the checkpointStore first as those messages are the oldest ones,
* then messages are replayed from the transaction log and then
* the cache is updated.
*
* @param container
* @throws JMSException
*/
public synchronized void recover(final QueueMessageContainer container) throws JMSException {
longTermStore.recover(container);
// replay the transaction log, updating the cache and adding any messages to be dispatched
// to the container
firstException = null;
try {
transactionLog.replay(new ReplayListener() {
LogRecord record = new LogRecord(DEFAULT_RECORD_SIZE);
public void onRecord(LogRecord logRecord) {
readPacket(logRecord, container);
}
public void onError(LogException e) {
log.error("Error while recovering Howl transaction log: " + e, e);
}
public LogRecord getLogRecord() {
return record;
}
});
}
catch (LogConfigurationException e) {
throw createRecoveryFailedException(e);
}
if (firstException != null) {
if (firstException instanceof JMSException) {
throw (JMSException) firstException;
}
else {
throw createRecoveryFailedException(firstException);
}
}
}
public synchronized void start() throws JMSException {
longTermStore.start();
}
public synchronized void stop() throws JMSException {
longTermStore.stop();
}
/**
* Writes the current RAM cache to the long term, checkpoint store so that the
* transaction log can be truncated.
*/
public synchronized void checkpoint() throws JMSException {
checkpoint(null);
}
// Properties
//-------------------------------------------------------------------------
public int getMaximumCacheSize() {
return maximumCacheSize;
}
public void setMaximumCacheSize(int maximumCacheSize) {
this.maximumCacheSize = maximumCacheSize;
}
// Implementation methods
//-------------------------------------------------------------------------
/**
* Writes the current RAM image of the transaction log to stable, checkpoint store
*
* @param message is an optional message. This is null for timer based
* checkpoints or is the message which cannot fit into the cache if cache-exhaustion
* based checkpoints
* @throws JMSException
*/
protected void checkpoint(final ActiveMQMessage message) throws JMSException {
// lets create a copy of the collection to avoid blocking readers
ActiveMQMessage[] temp = null;
synchronized (map) {
temp = new ActiveMQMessage[map.size()];
map.values().toArray(temp);
// lets clear the map so that its next contents represent
// the stuff we need to checkpoint next time around
map.clear();
}
final ActiveMQMessage[] data = temp;
transactionTemplate.run(new Callback() {
public void execute() throws Throwable {
for (int i = 0, size = data.length; i < size; i++) {
longTermStore.addMessage(data[i]);
}
if (message != null) {
longTermStore.addMessage(message);
}
}
});
try {
transactionLog.mark(lastLogMark);
}
catch (Exception e) {
throw JMSExceptionHelper.newJMSException("Failed to checkpoint the Howl transaction log: " + e, e);
}
}
/**
* Adds the given message to the cache if there is spare capacity
*
* @param message
* @return true if the message was added to the cache or false
*/
protected boolean addMessageToCache(ActiveMQMessage message) {
synchronized (map) {
if (map.size() < maximumCacheSize && longTermPersistence.hasCacheCapacity(this)) {
map.put(message.getJMSMessageID(), message);
return true;
}
}
return false;
}
protected void readPacket(LogRecord logRecord, QueueMessageContainer container) {
if (!logRecord.isCTRL() && !logRecord.isEOB() && logRecord.length > 0) {
try {
// TODO for some wierd reason we get an unnecessary long which I'm guessing is the size
Packet packet = wireFormat.fromBytes(logRecord.data, 2, logRecord.length - 2);
if (packet instanceof ActiveMQMessage) {
container.addMessage((ActiveMQMessage) packet);
}
else if (packet instanceof MessageAck) {
MessageAck ack = (MessageAck) packet;
container.delete(ack.getMessageIdentity(), ack);
}
else {
log.error("Unknown type of packet in transaction log which will be discarded: " + packet);
}
}
catch (Exception e) {
if (firstException == null) {
firstException = e;
}
}
}
}
/**
* Writes a message to the transaction log using the current sync mode
*/
protected synchronized void writePacket(Packet packet) throws JMSException {
try {
byte[] data = wireFormat.toBytes(packet);
lastLogMark = transactionLog.put(data, sync);
}
catch (IOException e) {
throw createWriteException(packet, e);
}
catch (LogException e) {
throw createWriteException(packet, e);
}
catch (InterruptedException e) {
throw createWriteException(packet, e);
}
}
protected JMSException createRecoveryFailedException(Exception e) {
return JMSExceptionHelper.newJMSException("Failed to recover from Howl transaction log. Reason: " + e, e);
}
protected JMSException createWriteException(Packet packet, Exception e) {
return JMSExceptionHelper.newJMSException("Failed to write to Howl transaction log for: " + packet + ". Reason: " + e, e);
}
}