/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.codehaus.activemq.store.bdb;
import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.SecondaryConfig;
import com.sleepycat.je.SecondaryDatabase;
import com.sleepycat.je.SecondaryKeyCreator;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.TransactionConfig;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.message.DefaultWireFormat;
import org.codehaus.activemq.message.WireFormat;
import org.codehaus.activemq.service.impl.PersistenceAdapterSupport;
import org.codehaus.activemq.store.MessageStore;
import org.codehaus.activemq.store.PersistenceAdapter;
import org.codehaus.activemq.store.PreparedTransactionStore;
import org.codehaus.activemq.store.TopicMessageStore;
import org.codehaus.activemq.util.JMSExceptionHelper;
import javax.jms.JMSException;
import java.io.File;
import java.util.Map;
/**
* A {@link PersistenceAdapter} implementation using
* <a href="http://www.sleepycat.com">Berkeley DB Java Edition</a>
*
* @version $Revision: 1.6 $
*/
public class BDbPersistenceAdapter extends PersistenceAdapterSupport {
private static final Log log = LogFactory.getLog(BDbPersistenceAdapter.class);
private Environment environment;
private WireFormat wireFormat;
private DatabaseConfig config;
private TransactionConfig transactionConfig;
private File directory = new File("ActiveMQ");
/**
* Factory method to create an instance using the defaults
*
* @param directory the directory in which to store the persistent files
* @return
* @throws JMSException
*/
public static BDbPersistenceAdapter newInstance(File directory) throws JMSException {
return new BDbPersistenceAdapter(directory);
}
public BDbPersistenceAdapter() {
this(null, new DefaultWireFormat());
}
public BDbPersistenceAdapter(File directory) {
this();
this.directory = directory;
}
public BDbPersistenceAdapter(Environment environment, WireFormat wireFormat) {
this(environment, wireFormat, BDbHelper.createDatabaseConfig(), new TransactionConfig());
}
public BDbPersistenceAdapter(Environment environment, WireFormat wireFormat, DatabaseConfig config, TransactionConfig transactionConfig) {
this.environment = environment;
this.wireFormat = wireFormat;
this.config = config;
this.transactionConfig = transactionConfig;
}
public Map getInitialDestinations() {
return null; /** TODO */
}
public MessageStore createQueueMessageStore(String destinationName) throws JMSException {
try {
Database database = createDatabase("Queue_" + destinationName);
SequenceNumberCreator sequenceNumberCreator = new SequenceNumberCreator();
SecondaryConfig secondaryConfig = createSecondaryConfig(sequenceNumberCreator);
SecondaryDatabase secondaryDatabase = createSecondaryDatabase("Queue_Index_" + destinationName, database, secondaryConfig);
sequenceNumberCreator.initialise(secondaryDatabase);
return new BDbMessageStore(database, secondaryDatabase, secondaryConfig, sequenceNumberCreator, wireFormat.copy());
}
catch (DatabaseException e) {
throw JMSExceptionHelper.newJMSException("Could not create Queue MessageContainer for destination: "
+ destinationName + ". Reason: " + e, e);
}
}
public TopicMessageStore createTopicMessageStore(String destinationName) throws JMSException {
try {
Database database = createDatabase("Topic_" + destinationName);
SequenceNumberCreator sequenceNumberCreator = new SequenceNumberCreator();
SecondaryConfig secondaryConfig = createSecondaryConfig(sequenceNumberCreator);
SecondaryDatabase secondaryDatabase = createSecondaryDatabase("Topic_Index_" + destinationName, database, secondaryConfig);
sequenceNumberCreator.initialise(secondaryDatabase);
Database subscriptionDatabase = createDatabase("ConsumeAck_" + destinationName);
return new BDbTopicMessageStore(database, secondaryDatabase, secondaryConfig, sequenceNumberCreator, wireFormat.copy(), subscriptionDatabase);
}
catch (DatabaseException e) {
throw JMSExceptionHelper.newJMSException("Could not create Topic MessageContainer for destination: "
+ destinationName + ". Reason: " + e, e);
}
}
public PreparedTransactionStore createPreparedTransactionStore() throws JMSException {
try {
return new BDbPreparedTransactionStore(createDatabase("XaPrepareTxnDb"));
}
catch (DatabaseException e) {
throw JMSExceptionHelper.newJMSException("Could not create XA Prepare Transaction Database. Reason: " + e, e);
}
}
public void beginTransaction() throws JMSException {
try {
// TODO temporary hack until BDB supports nested transactions
if (BDbHelper.getTransactionCount() == 0) {
Transaction transaction = environment.beginTransaction(BDbHelper.getTransaction(), transactionConfig);
BDbHelper.pushTransaction(transaction);
}
else {
Transaction transaction = BDbHelper.getTransaction();
BDbHelper.pushTransaction(transaction);
}
}
catch (DatabaseException e) {
throw JMSExceptionHelper.newJMSException("Failed to begin transaction: " + e, e);
}
}
public void commitTransaction() throws JMSException {
// TODO temporary hack until BDB supports nested transactions
if (BDbHelper.getTransactionCount() == 1) {
Transaction transaction = BDbHelper.getTransaction();
if (transaction == null) {
log.warn("Attempt to commit transaction when non in progress");
}
else {
try {
transaction.commit();
}
catch (DatabaseException e) {
throw JMSExceptionHelper.newJMSException("Failed to commit transaction: " + transaction + ": " + e, e);
}
finally {
BDbHelper.popTransaction();
}
}
}
else {
BDbHelper.popTransaction();
}
}
public void rollbackTransaction() {
Transaction transaction = BDbHelper.getTransaction();
if (transaction != null) {
if (BDbHelper.getTransactionCount() == 1) {
try {
transaction.abort();
}
catch (DatabaseException e) {
log.warn("Cannot rollback transaction due to: " + e, e);
}
finally {
BDbHelper.popTransaction();
}
}
else {
BDbHelper.popTransaction();
}
}
}
public void start() throws JMSException {
if (environment == null) {
directory.mkdirs();
log.info("Creating Berkeley DB based message store in directory: " + directory.getAbsolutePath());
try {
environment = BDbHelper.createEnvironment(directory);
}
catch (DatabaseException e) {
throw JMSExceptionHelper.newJMSException("Failed to open Berkeley DB persistent store at directory: "
+ directory + ". Reason: " + e, e);
}
}
}
public synchronized void stop() throws JMSException {
if (environment != null) {
try {
environment.close();
}
catch (DatabaseException e) {
throw JMSExceptionHelper.newJMSException("Failed to close environment. Reason: " + e, e);
}
finally {
environment = null;
}
}
}
// Properties
//-------------------------------------------------------------------------
public File getDirectory() {
return directory;
}
public void setDirectory(File directory) {
this.directory = directory;
}
public WireFormat getWireFormat() {
return wireFormat;
}
public void setWireFormat(WireFormat wireFormat) {
this.wireFormat = wireFormat;
}
public TransactionConfig getTransactionConfig() {
return transactionConfig;
}
public void setTransactionConfig(TransactionConfig transactionConfig) {
this.transactionConfig = transactionConfig;
}
public Environment getEnvironment() {
return environment;
}
public void setEnvironment(Environment environment) {
this.environment = environment;
}
public DatabaseConfig getConfig() {
return config;
}
public void setConfig(DatabaseConfig config) {
this.config = config;
}
// Implementation methods
//-------------------------------------------------------------------------
protected Database createDatabase(String name) throws DatabaseException {
//System.out.println("##### Opening database: " + name);
if (log.isTraceEnabled()) {
log.trace("Opening database: " + name);
}
return environment.openDatabase(null, name, config);
}
protected SecondaryDatabase createSecondaryDatabase(String name, Database database, SecondaryConfig secondaryConfig) throws DatabaseException {
//System.out.println("##### Opening secondary database: " + name);
if (log.isTraceEnabled()) {
log.trace("Opening secondary database: " + name);
}
return environment.openSecondaryDatabase(null, name, database, secondaryConfig);
}
public static JMSException closeDatabase(Database db, JMSException firstException) {
if (db != null) {
if (log.isTraceEnabled()) {
try {
log.trace("Closing database: " + db.getDatabaseName());
}
catch (DatabaseException e) {
log.trace("Closing database: " + db + " but could not get the name: " + e);
}
}
try {
//System.out.println("##### Closing database: " + db.getDatabaseName() + " " + db);
db.close();
}
catch (DatabaseException e) {
if (firstException == null) {
firstException = JMSExceptionHelper.newJMSException("Failed to close database. Reason: " + e, e);
}
}
}
return firstException;
}
protected SecondaryConfig createSecondaryConfig(SecondaryKeyCreator keyGenerator) {
SecondaryConfig answer = new SecondaryConfig();
answer.setKeyCreator(keyGenerator);
answer.setAllowCreate(true);
answer.setAllowPopulate(true);
answer.setTransactional(true);
return answer;
}
}