Package org.codehaus.activemq.store.jdbc

Source Code of org.codehaus.activemq.store.jdbc.JDBCPersistenceAdapter

/**
*
* Copyright 2004 Hiram Chirino
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.codehaus.activemq.store.jdbc;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.message.DefaultWireFormat;
import org.codehaus.activemq.message.WireFormat;
import org.codehaus.activemq.service.impl.PersistenceAdapterSupport;
import org.codehaus.activemq.store.MessageStore;
import org.codehaus.activemq.store.PersistenceAdapter;
import org.codehaus.activemq.store.PreparedTransactionStore;
import org.codehaus.activemq.store.TopicMessageStore;
import org.codehaus.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
import org.codehaus.activemq.util.FactoryFinder;
import org.codehaus.activemq.util.JMSExceptionHelper;

import javax.jms.JMSException;
import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Map;

/**
* A {@link PersistenceAdapter} implementation using JDBC for
* persistence storage.
*
* @version $Revision: 1.8 $
*/
public class JDBCPersistenceAdapter extends PersistenceAdapterSupport {

    private static final Log log = LogFactory.getLog(JDBCPersistenceAdapter.class);
    private static FactoryFinder factoryFinder = new FactoryFinder("META-INF/services/org/codehaus/activemq/store/jdbc/");

    private WireFormat wireFormat = new DefaultWireFormat();
    private DataSource dataSource;
    private JDBCAdapter adapter;


    public JDBCPersistenceAdapter() {
    }
   
    public JDBCPersistenceAdapter(DataSource ds, WireFormat wireFormat) {
        this.dataSource = ds;
        this.wireFormat = wireFormat;
    }

    public Map getInitialDestinations() {
        return null/** TODO */
    }

    public MessageStore createQueueMessageStore(String destinationName) throws JMSException {
        if (adapter == null) {
            throw new IllegalStateException("Not started");
        }
        return new JDBCMessageStore(this, adapter, wireFormat, destinationName);
    }

    public TopicMessageStore createTopicMessageStore(String destinationName) throws JMSException {
        if (adapter == null) {
            throw new IllegalStateException("Not started");
        }
        return new JDBCTopicMessageStore(this, adapter, wireFormat, destinationName);
    }

    public PreparedTransactionStore createPreparedTransactionStore() throws JMSException {
        if (adapter == null) {
            throw new IllegalStateException("Not started");
        }
        return new JDBCPreparedTransactionStore(this, adapter, wireFormat);
    }

    public void beginTransaction() throws JMSException {
        try {
            Connection c = dataSource.getConnection();          
            c.setAutoCommit(false);
            TransactionContext.pushConnection(c);
        }
        catch (SQLException e) {
            throw JMSExceptionHelper.newJMSException("Failed to create transaction: " + e, e);
        }
    }

    public void commitTransaction() throws JMSException {
        Connection c = TransactionContext.popConnection();
        if (c == null) {
            log.warn("Commit while no transaction in progress");
        }
        else {
            try {
                c.commit();
            }
            catch (SQLException e) {
                throw JMSExceptionHelper.newJMSException("Failed to commit transaction: " + c + ": " + e, e);
            }
            finally {
                try {
                    c.close();
                }
                catch (Throwable e) {
                }
            }
        }
    }

    public void rollbackTransaction() {
        Connection c = TransactionContext.popConnection();
        try {
            c.rollback();
        }
        catch (SQLException e) {
            log.warn("Cannot rollback transaction due to: " + e, e);
        }
        finally {
            try {
                c.close();
            }
            catch (Throwable e) {
            }
        }
    }


    public void start() throws JMSException {
        beginTransaction();
        try {
          Connection c=null;
            try {
              c = getConnection();
             
              // Choose the right adapter depending on the
              // databse connection.
              adapter = null;
              String database = null;
             
                database = c.getMetaData().getDriverName();
                database = database.replaceAll(" ", "_");

                log.debug("Database type: [" + database + "]");
                try {
                    adapter = (DefaultJDBCAdapter) factoryFinder.newInstance(database);
                }
                catch (Throwable e) {
                    log.warn("Unrecognized database type (" + database + ").  Will use default JDBC implementation.");
                    log.debug("Reason: "+e,e);
                }
            }
            catch (SQLException e1) {
              returnConnection(c);
            }
           
            // Use the default JDBC adapter if the
            // Database type is not recognized.
            if (adapter == null) {
                adapter = new DefaultJDBCAdapter();
            }

            try {
                adapter.doCreateTables(c);
            }
            catch (SQLException e) {
                log.warn("Cannot create tables due to: " + e, e);
            }
            adapter.initSequenceGenerator(c);

        }
        finally {
            commitTransaction();
        }
    }


  public synchronized void stop() throws JMSException {
    }

  public DataSource getDataSource() {
    return dataSource;
  }
  public void setDataSource(DataSource dataSource) {
    this.dataSource = dataSource;
  }
  public WireFormat getWireFormat() {
    return wireFormat;
  }
  public void setWireFormat(WireFormat wireFormat) {
    this.wireFormat = wireFormat;
  }
   
  public Connection getConnection() throws SQLException {
    Connection answer = TransactionContext.peekConnection();
    if(answer==null) {
      answer = dataSource.getConnection();
      answer.setAutoCommit(true);
    }
    return answer;
  }

  public void returnConnection(Connection connection) {
    if( connection==null )
      return;
    Connection peek = TransactionContext.peekConnection();
    if(peek!=connection) {
      try { connection.close(); } catch (SQLException e) {}
    }
  }
}
TOP

Related Classes of org.codehaus.activemq.store.jdbc.JDBCPersistenceAdapter

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.