Package org.codehaus.activemq.store.cache

Source Code of org.codehaus.activemq.store.cache.CacheMessageStore

/**
*
* Copyright 2004 Hiram Chirino
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.codehaus.activemq.store.cache;

import javax.jms.JMSException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.message.ActiveMQMessage;
import org.codehaus.activemq.message.MessageAck;
import org.codehaus.activemq.service.MessageIdentity;
import org.codehaus.activemq.service.QueueMessageContainer;
import org.codehaus.activemq.store.MessageStore;

/**
* A MessageStore that uses an in memory cache to speed up getMessage() method calls.
*
* @version $Revision: 1.2 $
*/
public class CacheMessageStore implements MessageStore, CacheMessageStoreAware {

  private static final Log log = LogFactory.getLog(CacheMessageStore.class);
 
  private final CachePersistenceAdapter peristenceAdapter;
  private final MessageStore longTermStore; 
  private final MessageCache cache;

  public CacheMessageStore(CachePersistenceAdapter adapter, MessageStore longTermStore, MessageCache cache) {
    this.peristenceAdapter = adapter;
    this.longTermStore = longTermStore;
    this.cache = cache;
   
    // Make any downstream CacheMessageStoreAware objects aware of us.
    setCacheMessageStore(this);
  }

  /**
   * Add the meessage to the long term store and cache it.
   */
  public MessageIdentity addMessage(ActiveMQMessage message) throws JMSException {   
    MessageIdentity messageIdentity = longTermStore.addMessage(message);
    cache.put(messageIdentity.getMessageID(), message);   
    return messageIdentity;
  }
 
  /**
   * Remove the meessage to the long term store and remove it from the cache.
   */
  public void removeMessage(MessageIdentity identity, MessageAck ack) throws JMSException {   
    longTermStore.removeMessage( identity, ack);
    cache.remove(identity.getMessageID());   
  }

  /**
   * Return the message from the cache or go to the longTermStore if it is not
   * in there.
   */
  public ActiveMQMessage getMessage(MessageIdentity identitythrows JMSException {
    ActiveMQMessage answer=null;
    answer = cache.get(identity.getMessageID());
    if( answer!=null )
      return answer;
   
    answer = longTermStore.getMessage(identity);
    cache.put(identity.getMessageID(), answer);
    return answer;
  }

  /**
   * Replays the checkpointStore first as those messages are the oldest ones,
   * then messages are replayed from the transaction log and then the cache is
   * updated.
   *
   * @param container
   * @throws JMSException
   */
  public synchronized void recover(final QueueMessageContainer container)
      throws JMSException {
    longTermStore.recover(container);
  }

  public void start() throws JMSException {
    longTermStore.start();
  }

  public void stop() throws JMSException {
    longTermStore.stop();
  }

  /**
   * @return Returns the longTermStore.
   */
  public MessageStore getLongTermStore() {
    return longTermStore;
  }

  /**
   * @see org.codehaus.activemq.store.cache.CacheMessageStoreAware#setCacheMessageStore(org.codehaus.activemq.store.cache.CacheMessageStore)
   */
  public void setCacheMessageStore(CacheMessageStore store) {
    // Make any downstream CacheMessageStoreAware objects aware of us.
    // This cache implementation could be cached by another cache.
    if( longTermStore instanceof CacheMessageStoreAware ) {
      ((CacheMessageStoreAware)longTermStore).setCacheMessageStore(store);
    }
  }
 
}
TOP

Related Classes of org.codehaus.activemq.store.cache.CacheMessageStore

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.