/**
*
* Copyright 2004 Hiram Chirino
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.codehaus.activemq.store.cache;
import javax.jms.JMSException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.service.MessageIdentity;
import org.codehaus.activemq.service.QueueMessageContainer;
import org.codehaus.activemq.store.MessageStore;
/**
* A MessageStore that uses an in memory cache to speed up getMessage() method calls.
*
* @version $Revision: 1.2 $
*/
public class CacheMessageStore implements MessageStore, CacheMessageStoreAware {
private static final Log log = LogFactory.getLog(CacheMessageStore.class);
private final CachePersistenceAdapter peristenceAdapter;
private final MessageStore longTermStore;
private final MessageCache cache;
public CacheMessageStore(CachePersistenceAdapter adapter, MessageStore longTermStore, MessageCache cache) {
this.peristenceAdapter = adapter;
this.longTermStore = longTermStore;
this.cache = cache;
// Make any downstream CacheMessageStoreAware objects aware of us.
setCacheMessageStore(this);
}
/**
* Add the meessage to the long term store and cache it.
*/
public MessageIdentity addMessage(ActiveMQMessage message) throws JMSException {
MessageIdentity messageIdentity = longTermStore.addMessage(message);
cache.put(messageIdentity.getMessageID(), message);
return messageIdentity;
}
/**
* Remove the meessage to the long term store and remove it from the cache.
*/
public void removeMessage(MessageIdentity identity, MessageAck ack) throws JMSException {
longTermStore.removeMessage( identity, ack);
cache.remove(identity.getMessageID());
}
/**
* Return the message from the cache or go to the longTermStore if it is not
* in there.
*/
public ActiveMQMessage getMessage(MessageIdentity identity) throws JMSException {
ActiveMQMessage answer=null;
answer = cache.get(identity.getMessageID());
if( answer!=null )
return answer;
answer = longTermStore.getMessage(identity);
cache.put(identity.getMessageID(), answer);
return answer;
}
/**
* Replays the checkpointStore first as those messages are the oldest ones,
* then messages are replayed from the transaction log and then the cache is
* updated.
*
* @param container
* @throws JMSException
*/
public synchronized void recover(final QueueMessageContainer container)
throws JMSException {
longTermStore.recover(container);
}
public void start() throws JMSException {
longTermStore.start();
}
public void stop() throws JMSException {
longTermStore.stop();
}
/**
* @return Returns the longTermStore.
*/
public MessageStore getLongTermStore() {
return longTermStore;
}
/**
* @see org.codehaus.activemq.store.cache.CacheMessageStoreAware#setCacheMessageStore(org.codehaus.activemq.store.cache.CacheMessageStore)
*/
public void setCacheMessageStore(CacheMessageStore store) {
// Make any downstream CacheMessageStoreAware objects aware of us.
// This cache implementation could be cached by another cache.
if( longTermStore instanceof CacheMessageStoreAware ) {
((CacheMessageStoreAware)longTermStore).setCacheMessageStore(store);
}
}
}