/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.codehaus.activemq.store.howl;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.message.DefaultWireFormat;
import org.codehaus.activemq.service.impl.PersistenceAdapterSupport;
import org.codehaus.activemq.store.MessageStore;
import org.codehaus.activemq.store.PersistenceAdapter;
import org.codehaus.activemq.store.PreparedTransactionStore;
import org.codehaus.activemq.store.TopicMessageStore;
import org.codehaus.activemq.store.jdbm.JdbmPersistenceAdapter;
import org.codehaus.activemq.util.JMSExceptionHelper;
import org.objectweb.howl.log.Configuration;
import org.objectweb.howl.log.LogConfigurationException;
import org.objectweb.howl.log.Logger;
import javax.jms.JMSException;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
/**
* An implementation of {@link PersistenceAdapter} designed for
* optimal use with <a href="http://howl.objectweb.org/">Howl</a>
* as the transaction log and then checkpointing asynchronously
* on a timeout with some other persistent storage.
*
* @version $Revision: 1.6 $
*/
public class HowlPersistenceAdapter extends PersistenceAdapterSupport {
private static final Log log = LogFactory.getLog(HowlPersistenceAdapter.class);
private PersistenceAdapter longTermPersistence;
private Configuration configuration;
private int maximumTotalCachedMessages = 10000;
private int maximumCachedMessagesPerStore = 100;
private int cachedMessageCount;
private File directory;
private Logger transactionLog;
/**
* Factory method to create an instance using the defaults
*
* @param directory the directory in which to store the persistent files
* @return
* @throws JMSException
*/
public static HowlPersistenceAdapter newInstance(File directory) throws JMSException {
return new HowlPersistenceAdapter(directory, JdbmPersistenceAdapter.newInstance(directory));
}
public HowlPersistenceAdapter() {
}
public HowlPersistenceAdapter(File directory, PersistenceAdapter longTermPersistence) {
this.directory = directory;
this.longTermPersistence = longTermPersistence;
}
public MessageStore createQueueMessageStore(String destinationName) throws JMSException {
MessageStore checkpointStore = longTermPersistence.createQueueMessageStore(destinationName);
return new HowlMessageStore(this, checkpointStore, transactionLog, new DefaultWireFormat());
}
public TopicMessageStore createTopicMessageStore(String destinationName) throws JMSException {
/** TODO not yet implemented for topics */
return longTermPersistence.createTopicMessageStore(destinationName);
}
public PreparedTransactionStore createPreparedTransactionStore() throws JMSException {
// delegate to long term store
return longTermPersistence.createPreparedTransactionStore();
}
public void beginTransaction() throws JMSException {
}
public void commitTransaction() throws JMSException {
}
public void rollbackTransaction() {
}
public void start() throws JMSException {
if (transactionLog == null) {
if (directory != null) {
directory.mkdirs();
}
try {
transactionLog = createTransactionLog();
}
catch (Exception e) {
throw JMSExceptionHelper.newJMSException("Failed to create Howl based message store due to: " + e, e);
}
}
try {
log.info("Using Howl transaction log in directory: " + getLogFileDir());
transactionLog.open();
}
catch (Exception e) {
throw JMSExceptionHelper.newJMSException("Failed to open Howl transaction log: " + e, e);
}
longTermPersistence.start();
}
public void stop() throws JMSException {
try {
transactionLog.close();
}
catch (Exception e) {
throw JMSExceptionHelper.newJMSException("Failed to close Howl transaction log due to: " + e, e);
}
}
/**
* Return true if a store is allowed to cache a message.
* Called by a store when its about to store a message in its cache.
*
* @param messageStore
* @return true if the cache is allowed to cache the mesage
*/
public synchronized boolean hasCacheCapacity(HowlMessageStore messageStore) {
if (cachedMessageCount < maximumTotalCachedMessages) {
cachedMessageCount++;
return true;
}
return false;
}
public synchronized void onMessageRemove(HowlMessageStore messageStore) {
cachedMessageCount--;
}
// Properties
//-------------------------------------------------------------------------
public PersistenceAdapter getLongTermPersistence() {
return longTermPersistence;
}
public void setLongTermPersistence(PersistenceAdapter longTermPersistence) {
this.longTermPersistence = longTermPersistence;
}
public int getMaximumCachedMessagesPerStore() {
return maximumCachedMessagesPerStore;
}
public void setMaximumCachedMessagesPerStore(int maximumCachedMessagesPerStore) {
this.maximumCachedMessagesPerStore = maximumCachedMessagesPerStore;
}
public int getMaximumTotalCachedMessages() {
return maximumTotalCachedMessages;
}
public void setMaximumTotalCachedMessages(int maximumTotalCachedMessages) {
this.maximumTotalCachedMessages = maximumTotalCachedMessages;
}
public File getDirectory() {
return directory;
}
public void setDirectory(File directory) {
this.directory = directory;
}
public Configuration getConfiguration() throws LogConfigurationException, IOException {
if (configuration == null) {
configuration = createConfiguration();
}
return configuration;
}
public void setConfiguration(Configuration configuration) {
this.configuration = configuration;
}
public Logger getTransactionLog() {
return transactionLog;
}
public void setTransactionLog(Logger transactionLog) {
this.transactionLog = transactionLog;
}
// Delegate Howl configuration properties
//-------------------------------------------------------------------------
public String getBufferClassName() throws LogConfigurationException, IOException {
return getConfiguration().getBufferClassName();
}
public int getBufferSize() throws LogConfigurationException, IOException {
return getConfiguration().getBufferSize();
}
public int getFlushSleepTime() throws LogConfigurationException, IOException {
return getConfiguration().getFlushSleepTime();
}
public String getLogFileDir() throws LogConfigurationException, IOException {
return getConfiguration().getLogFileDir();
}
public String getLogFileExt() throws LogConfigurationException, IOException {
return getConfiguration().getLogFileExt();
}
public String getLogFileName() throws LogConfigurationException, IOException {
return getConfiguration().getLogFileName();
}
public int getMaxBlocksPerFile() throws LogConfigurationException, IOException {
return getConfiguration().getMaxBlocksPerFile();
}
public int getMaxBuffers() throws LogConfigurationException, IOException {
return getConfiguration().getMaxBuffers();
}
public int getMaxLogFiles() throws LogConfigurationException, IOException {
return getConfiguration().getMaxLogFiles();
}
public int getMinBuffers() throws LogConfigurationException, IOException {
return getConfiguration().getMinBuffers();
}
public int getThreadsWaitingForceThreshold() throws LogConfigurationException, IOException {
return getConfiguration().getThreadsWaitingForceThreshold();
}
public boolean isChecksumEnabled() throws LogConfigurationException, IOException {
return getConfiguration().isChecksumEnabled();
}
public void setBufferClassName(String s) throws LogConfigurationException, IOException {
getConfiguration().setBufferClassName(s);
}
public void setBufferSize(int i) throws LogConfigurationException, IOException {
getConfiguration().setBufferSize(i);
}
public void setChecksumEnabled(boolean b) throws LogConfigurationException, IOException {
getConfiguration().setChecksumEnabled(b);
}
public void setFlushSleepTime(int i) throws LogConfigurationException, IOException {
getConfiguration().setFlushSleepTime(i);
}
public void setLogFileDir(String s) throws LogConfigurationException, IOException {
getConfiguration().setLogFileDir(s);
}
public void setLogFileExt(String s) throws LogConfigurationException, IOException {
getConfiguration().setLogFileExt(s);
}
public void setLogFileName(String s) throws LogConfigurationException, IOException {
getConfiguration().setLogFileName(s);
}
public void setMaxBlocksPerFile(int i) throws LogConfigurationException, IOException {
getConfiguration().setMaxBlocksPerFile(i);
}
public void setMaxBuffers(int i) throws LogConfigurationException, IOException {
getConfiguration().setMaxBuffers(i);
}
public void setMaxLogFiles(int i) throws LogConfigurationException, IOException {
getConfiguration().setMaxLogFiles(i);
}
public void setMinBuffers(int i) throws LogConfigurationException, IOException {
getConfiguration().setMinBuffers(i);
}
public void setThreadsWaitingForceThreshold(int i) throws LogConfigurationException, IOException {
getConfiguration().setThreadsWaitingForceThreshold(i);
}
// Implementation methods
//-------------------------------------------------------------------------
protected Logger createTransactionLog() throws IOException, LogConfigurationException {
return new Logger(getConfiguration());
}
protected Configuration createConfiguration() throws IOException, LogConfigurationException {
String[] names = {"org/codehaus/activemq/howl.properties", "org/codehaus/activemq/defaultHowl.properties"};
Configuration answer = null;
for (int i = 0; i < names.length; i++) {
InputStream in = Thread.currentThread().getContextClassLoader().getResourceAsStream(names[i]);
if (in == null) {
in = getClass().getClassLoader().getResourceAsStream(names[i]);
}
if (in != null) {
Properties properties = new Properties();
properties.load(in);
answer = new Configuration(properties);
}
}
if (answer == null) {
log.warn("Could not find file: " + names[0] + " or: " + names[1] + " on the classpath to initialise Howl");
answer = new Configuration();
}
if (directory != null) {
answer.setLogFileDir(directory.getAbsolutePath());
}
return answer;
}
}