Package org.codehaus.activemq.store.bdb

Source Code of org.codehaus.activemq.store.bdb.BDbPersistenceAdapter

/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.codehaus.activemq.store.bdb;

import com.sleepycat.je.Database;
import com.sleepycat.je.DatabaseConfig;
import com.sleepycat.je.DatabaseException;
import com.sleepycat.je.Environment;
import com.sleepycat.je.SecondaryConfig;
import com.sleepycat.je.SecondaryDatabase;
import com.sleepycat.je.SecondaryKeyCreator;
import com.sleepycat.je.Transaction;
import com.sleepycat.je.TransactionConfig;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.message.DefaultWireFormat;
import org.codehaus.activemq.message.WireFormat;
import org.codehaus.activemq.service.impl.PersistenceAdapterSupport;
import org.codehaus.activemq.store.MessageStore;
import org.codehaus.activemq.store.PersistenceAdapter;
import org.codehaus.activemq.store.PreparedTransactionStore;
import org.codehaus.activemq.store.TopicMessageStore;
import org.codehaus.activemq.util.JMSExceptionHelper;

import javax.jms.JMSException;
import java.io.File;
import java.util.Map;

/**
* A {@link PersistenceAdapter} implementation using
* <a href="http://www.sleepycat.com">Berkeley DB Java Edition</a>
*
* @version $Revision: 1.6 $
*/
public class BDbPersistenceAdapter extends PersistenceAdapterSupport {
    private static final Log log = LogFactory.getLog(BDbPersistenceAdapter.class);

    private Environment environment;
    private WireFormat wireFormat;
    private DatabaseConfig config;
    private TransactionConfig transactionConfig;
    private File directory = new File("ActiveMQ");


    /**
     * Factory method to create an instance using the defaults
     *
     * @param directory the directory in which to store the persistent files
     * @return
     * @throws JMSException
     */
    public static BDbPersistenceAdapter newInstance(File directory) throws JMSException {
        return new BDbPersistenceAdapter(directory);
    }


    public BDbPersistenceAdapter() {
        this(null, new DefaultWireFormat());
    }

    public BDbPersistenceAdapter(File directory) {
        this();
        this.directory = directory;
    }

    public BDbPersistenceAdapter(Environment environment, WireFormat wireFormat) {
        this(environment, wireFormat, BDbHelper.createDatabaseConfig(), new TransactionConfig());
    }

    public BDbPersistenceAdapter(Environment environment, WireFormat wireFormat, DatabaseConfig config, TransactionConfig transactionConfig) {
        this.environment = environment;
        this.wireFormat = wireFormat;
        this.config = config;
        this.transactionConfig = transactionConfig;
    }

    public Map getInitialDestinations() {
        return null/** TODO */
    }

    public MessageStore createQueueMessageStore(String destinationName) throws JMSException {
        try {
            Database database = createDatabase("Queue_" + destinationName);
            SequenceNumberCreator sequenceNumberCreator = new SequenceNumberCreator();
            SecondaryConfig secondaryConfig = createSecondaryConfig(sequenceNumberCreator);
            SecondaryDatabase secondaryDatabase = createSecondaryDatabase("Queue_Index_" + destinationName, database, secondaryConfig);
            sequenceNumberCreator.initialise(secondaryDatabase);
            return new BDbMessageStore(database, secondaryDatabase, secondaryConfig, sequenceNumberCreator, wireFormat.copy());
        }
        catch (DatabaseException e) {
            throw JMSExceptionHelper.newJMSException("Could not create Queue MessageContainer for destination: "
                    + destinationName + ". Reason: " + e, e);
        }
    }

    public TopicMessageStore createTopicMessageStore(String destinationName) throws JMSException {
        try {
            Database database = createDatabase("Topic_" + destinationName);
            SequenceNumberCreator sequenceNumberCreator = new SequenceNumberCreator();
            SecondaryConfig secondaryConfig = createSecondaryConfig(sequenceNumberCreator);
            SecondaryDatabase secondaryDatabase = createSecondaryDatabase("Topic_Index_" + destinationName, database, secondaryConfig);
            sequenceNumberCreator.initialise(secondaryDatabase);
            Database subscriptionDatabase = createDatabase("ConsumeAck_" + destinationName);
            return new BDbTopicMessageStore(database, secondaryDatabase, secondaryConfig, sequenceNumberCreator, wireFormat.copy(), subscriptionDatabase);
        }
        catch (DatabaseException e) {
            throw JMSExceptionHelper.newJMSException("Could not create Topic MessageContainer for destination: "
                    + destinationName + ". Reason: " + e, e);
        }
    }

    public PreparedTransactionStore createPreparedTransactionStore() throws JMSException {
        try {
            return new BDbPreparedTransactionStore(createDatabase("XaPrepareTxnDb"));
        }
        catch (DatabaseException e) {
            throw JMSExceptionHelper.newJMSException("Could not create XA Prepare Transaction Database. Reason: " + e, e);
        }
    }

    public void beginTransaction() throws JMSException {
        try {
            // TODO temporary hack until BDB supports nested transactions
            if (BDbHelper.getTransactionCount() == 0) {
                Transaction transaction = environment.beginTransaction(BDbHelper.getTransaction(), transactionConfig);
                BDbHelper.pushTransaction(transaction);
            }
            else {
                Transaction transaction = BDbHelper.getTransaction();
                BDbHelper.pushTransaction(transaction);
            }
        }
        catch (DatabaseException e) {
            throw JMSExceptionHelper.newJMSException("Failed to begin transaction: " + e, e);
        }
    }

    public void commitTransaction() throws JMSException {
        // TODO temporary hack until BDB supports nested transactions
        if (BDbHelper.getTransactionCount() == 1) {
            Transaction transaction = BDbHelper.getTransaction();
            if (transaction == null) {
                log.warn("Attempt to commit transaction when non in progress");
            }
            else {
                try {
                    transaction.commit();
                }
                catch (DatabaseException e) {
                    throw JMSExceptionHelper.newJMSException("Failed to commit transaction: " + transaction + ": " + e, e);
                }
                finally {
                    BDbHelper.popTransaction();
                }
            }
        }
        else {
            BDbHelper.popTransaction();
        }
    }

    public void rollbackTransaction() {
        Transaction transaction = BDbHelper.getTransaction();
        if (transaction != null) {
            if (BDbHelper.getTransactionCount() == 1) {
                try {
                    transaction.abort();
                }
                catch (DatabaseException e) {
                    log.warn("Cannot rollback transaction due to: " + e, e);
                }
                finally {
                    BDbHelper.popTransaction();
                }
            }
            else {
                BDbHelper.popTransaction();
            }
        }
    }


    public void start() throws JMSException {
        if (environment == null) {
            directory.mkdirs();

            log.info("Creating Berkeley DB based message store in directory: " + directory.getAbsolutePath());

            try {
                environment = BDbHelper.createEnvironment(directory);
            }
            catch (DatabaseException e) {
                throw JMSExceptionHelper.newJMSException("Failed to open Berkeley DB persistent store at directory: "
                        + directory + ". Reason: " + e, e);
            }
        }
    }

    public synchronized void stop() throws JMSException {
        if (environment != null) {
            try {
                environment.close();
            }
            catch (DatabaseException e) {
                throw JMSExceptionHelper.newJMSException("Failed to close environment. Reason: " + e, e);
            }
            finally {
                environment = null;
            }
        }
    }

    // Properties
    //-------------------------------------------------------------------------
    public File getDirectory() {
        return directory;
    }

    public void setDirectory(File directory) {
        this.directory = directory;
    }

    public WireFormat getWireFormat() {
        return wireFormat;
    }

    public void setWireFormat(WireFormat wireFormat) {
        this.wireFormat = wireFormat;
    }

    public TransactionConfig getTransactionConfig() {
        return transactionConfig;
    }

    public void setTransactionConfig(TransactionConfig transactionConfig) {
        this.transactionConfig = transactionConfig;
    }

    public Environment getEnvironment() {
        return environment;
    }

    public void setEnvironment(Environment environment) {
        this.environment = environment;
    }

    public DatabaseConfig getConfig() {
        return config;
    }

    public void setConfig(DatabaseConfig config) {
        this.config = config;
    }

    // Implementation methods
    //-------------------------------------------------------------------------
    protected Database createDatabase(String name) throws DatabaseException {
        //System.out.println("#####   Opening database: " + name);

        if (log.isTraceEnabled()) {
            log.trace("Opening database: " + name);
        }
        return environment.openDatabase(null, name, config);
    }

    protected SecondaryDatabase createSecondaryDatabase(String name, Database database, SecondaryConfig secondaryConfig) throws DatabaseException {
        //System.out.println("#####   Opening secondary database: " + name);

        if (log.isTraceEnabled()) {
            log.trace("Opening secondary database: " + name);
        }
        return environment.openSecondaryDatabase(null, name, database, secondaryConfig);
    }

    public static JMSException closeDatabase(Database db, JMSException firstException) {
        if (db != null) {

            if (log.isTraceEnabled()) {
                try {
                    log.trace("Closing database: " + db.getDatabaseName());
                }
                catch (DatabaseException e) {
                    log.trace("Closing database: " + db + " but could not get the name: " + e);
                }
            }
            try {
                //System.out.println("#####  Closing database: " + db.getDatabaseName() + " " + db);
                db.close();
            }
            catch (DatabaseException e) {
                if (firstException == null) {
                    firstException = JMSExceptionHelper.newJMSException("Failed to close database. Reason: " + e, e);
                }
            }
        }
        return firstException;
    }

    protected SecondaryConfig createSecondaryConfig(SecondaryKeyCreator keyGenerator) {
        SecondaryConfig answer = new SecondaryConfig();
        answer.setKeyCreator(keyGenerator);
        answer.setAllowCreate(true);
        answer.setAllowPopulate(true);
        answer.setTransactional(true);
        return answer;
    }
}
TOP

Related Classes of org.codehaus.activemq.store.bdb.BDbPersistenceAdapter

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.