Package com.sun.messaging.jmq.jmsserver.persist.jdbc

Source Code of com.sun.messaging.jmq.jmsserver.persist.jdbc.ConsumerStateDAOImpl

/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
*
* Copyright (c) 2000-2010 Oracle and/or its affiliates. All rights reserved.
*
* The contents of this file are subject to the terms of either the GNU
* General Public License Version 2 only ("GPL") or the Common Development
* and Distribution License("CDDL") (collectively, the "License").  You
* may not use this file except in compliance with the License.  You can
* obtain a copy of the License at
* https://glassfish.dev.java.net/public/CDDL+GPL_1_1.html
* or packager/legal/LICENSE.txt.  See the License for the specific
* language governing permissions and limitations under the License.
*
* When distributing the software, include this License Header Notice in each
* file and include the License file at packager/legal/LICENSE.txt.
*
* GPL Classpath Exception:
* Oracle designates this particular file as subject to the "Classpath"
* exception as provided by Oracle in the GPL Version 2 section of the License
* file that accompanied this code.
*
* Modifications:
* If applicable, add the following below the License Header, with the fields
* enclosed by brackets [] replaced by your own identifying information:
* "Portions Copyright [year] [name of copyright owner]"
*
* Contributor(s):
* If you wish your version of this file to be governed by only the CDDL or
* only the GPL Version 2, indicate your decision by adding "[Contributor]
* elects to include this software in this distribution under the [CDDL or GPL
* Version 2] license."  If you don't indicate a single choice of license, a
* recipient has the option to distribute your version of this file under
* either the CDDL, the GPL Version 2 or to extend the choice of license to
* its licensees as provided above.  However, if you add GPL Version 2 code
* and therefore, elected the GPL Version 2 license, then the option applies
* only if the new code is made subject to such option by the copyright
* holder.
*/

/*
* @(#)ConsumerStateDAOImpl.java  1.36 06/29/07
*/

package com.sun.messaging.jmq.jmsserver.persist.jdbc;

import com.sun.messaging.jmq.jmsserver.persist.Store;
import com.sun.messaging.jmq.jmsserver.persist.HABrokerInfo;
import com.sun.messaging.jmq.util.log.Logger;
import com.sun.messaging.jmq.jmsserver.Globals;
import com.sun.messaging.jmq.jmsserver.util.*;
import com.sun.messaging.jmq.jmsserver.data.TransactionUID;
import com.sun.messaging.jmq.jmsserver.data.TransactionAcknowledgement;
import com.sun.messaging.jmq.jmsserver.core.ConsumerUID;
import com.sun.messaging.jmq.jmsserver.core.DestinationUID;
import com.sun.messaging.jmq.jmsserver.resources.*;
import com.sun.messaging.jmq.io.SysMessageID;
import com.sun.messaging.jmq.io.Status;

import java.util.*;
import java.sql.*;
import java.io.*;

/**
*  This class implement a generic ConsumerStateDAO.
*/
class ConsumerStateDAOImpl extends BaseDAOImpl implements ConsumerStateDAO {

    protected String tableName;

    // SQLs
    protected String insertSQL;
    protected String updateTransactionSQL;
    protected String updateStateSQL;
    protected String updateState2SQL;
    protected String clearTxnSQL;
    protected String deleteByTxnSQL;
    protected String deleteByDstSQL;
    protected String deleteByStateSQL;
    protected String deleteByMsgSQL;
    protected String selectStateSQL;
    protected String selectStatesByMsgSQL;
    protected String selectTransactionSQL;
    protected String selectCountByMsgSQL;
    protected String selectConsumerIDsByMsgSQL;
    protected String selectTransactionAcksSQL;
    protected String selectAllTransactionAcksSQL;

    /**
     * Constructor
     * @throws BrokerException
     */
    ConsumerStateDAOImpl() throws BrokerException {

        // Initialize all SQLs
        DBManager dbMgr = DBManager.getDBManager();

        // Criteria to ensure the local broker still owns the store for HA
        String brokerNotTakenOverClause = Globals.getHAEnabled() ?
            " AND NOT EXISTS (" +
            ((BrokerDAOImpl)dbMgr.getDAOFactory().getBrokerDAO())
                .selectIsBeingTakenOverSQL + ")" : "";

        tableName = dbMgr.getTableName( TABLE_NAME_PREFIX );

        insertSQL = new StringBuffer(128)
            .append( "INSERT INTO " ).append( tableName )
            .append( " ( " )
            .append( MESSAGE_ID_COLUMN ).append( ", " )
            .append( CONSUMER_ID_COLUMN ).append( ", " )
            .append( STATE_COLUMN ).append( ", " )
            .append( CREATED_TS_COLUMN )
            .append( ") VALUES ( ?, ?, ?, ? )" )
            .toString();

        updateTransactionSQL = new StringBuffer(128)
            .append( "UPDATE " ).append( tableName )
            .append( " SET " )
            .append( TRANSACTION_ID_COLUMN ).append( " = ?" )
            .append( " WHERE " )
            .append( MESSAGE_ID_COLUMN ).append( " = ?" )
            .append( " AND " )
            .append( CONSUMER_ID_COLUMN ).append( " = ?" )
            .append( " AND " )
            .append( TRANSACTION_ID_COLUMN ).append( " IS NULL" )
            .append( brokerNotTakenOverClause )
            .toString();

        updateStateSQL = new StringBuffer(128)
            .append( "UPDATE " ).append( tableName )
            .append( " SET " )
            .append( STATE_COLUMN ).append( " = ?" )
            .append( " WHERE " )
            .append( MESSAGE_ID_COLUMN ).append( " = ?" )
            .append( " AND " )
            .append( CONSUMER_ID_COLUMN ).append( " = ?" )
            .toString();

        updateState2SQL = new StringBuffer(128)
            .append( "UPDATE " ).append( tableName )
            .append( " SET " )
            .append( STATE_COLUMN ).append( " = ?" )
            .append( " WHERE " )
            .append( MESSAGE_ID_COLUMN ).append( " = ?" )
            .append( " AND " )
            .append( CONSUMER_ID_COLUMN ).append( " = ?" )
            .append( " AND " )
            .append( STATE_COLUMN ).append( " = ?" )
            .toString();

        clearTxnSQL = new StringBuffer(128)
            .append( "UPDATE " ).append( tableName )
            .append( " SET " )
            .append( TRANSACTION_ID_COLUMN ).append( " = NULL" )
            .append( " WHERE " )
            .append( TRANSACTION_ID_COLUMN ).append( " = ?" )
            .append( brokerNotTakenOverClause )
            .toString();

        deleteByTxnSQL = new StringBuffer(128)
            .append( "DELETE FROM " ).append( tableName )
            .append( " WHERE " )
            .append( TRANSACTION_ID_COLUMN ).append( " = ?" )
            .toString();

        deleteByDstSQL = new StringBuffer(128)
            .append( "DELETE FROM " ).append( tableName )
            .append( " WHERE " )
            .append( MESSAGE_ID_COLUMN ).append( " IN " )
            .append( "(SELECT msgTbl." ).append( MessageDAO.ID_COLUMN )
            .append( " FROM " )
            .append(   dbMgr.getTableName( MessageDAO.TABLE_NAME_PREFIX ) )
            .append(   " msgTbl, " )
            .append(   dbMgr.getTableName( StoreSessionDAO.TABLE_NAME_PREFIX ) )
            .append(   " sesTbl" )
            .append( " WHERE sesTbl." )
            .append(   StoreSessionDAO.BROKER_ID_COLUMN ).append( " = ?" )
            .append( " AND sesTbl." ).append( StoreSessionDAO.ID_COLUMN )
            .append(   " = msgTbl." ).append( MessageDAO.STORE_SESSION_ID_COLUMN )
            .append( " AND " )
            .append(   MessageDAO.DESTINATION_ID_COLUMN ).append( " = ?)")
            .toString();

        deleteByMsgSQL = new StringBuffer(128)
            .append( "DELETE FROM " ).append( tableName )
            .append( " WHERE " )
            .append( MESSAGE_ID_COLUMN ).append( " = ?" )
            .toString();

        deleteByStateSQL = new StringBuffer(128)
            .append( "DELETE FROM " ).append( tableName )
            .append( " WHERE " )
            .append( STATE_COLUMN ).append( " = ?" )
            .toString();

        selectStateSQL = new StringBuffer(128)
            .append( "SELECT " )
            .append( STATE_COLUMN )
            .append( " FROM " ).append( tableName )
            .append( " WHERE " )
            .append( MESSAGE_ID_COLUMN ).append( " = ?" )
            .append( " AND " )
            .append( CONSUMER_ID_COLUMN ).append( " = ?" )
            .toString();

        selectStatesByMsgSQL = new StringBuffer(128)
            .append( "SELECT " )
            .append( CONSUMER_ID_COLUMN ).append( ", " )
            .append( STATE_COLUMN )
            .append( " FROM " ).append( tableName )
            .append( " WHERE " )
            .append( MESSAGE_ID_COLUMN ).append( " = ?" )
            .toString();

        selectTransactionSQL = new StringBuffer(128)
            .append( "SELECT " )
            .append( TRANSACTION_ID_COLUMN )
            .append( " FROM " ).append( tableName )
            .append( " WHERE " )
            .append( MESSAGE_ID_COLUMN ).append( " = ?" )
            .append( " AND " )
            .append( CONSUMER_ID_COLUMN ).append( " = ?" )
            .toString();

        selectCountByMsgSQL = new StringBuffer(128)
            .append( "SELECT COUNT(*) FROM " )
            .append( tableName )
            .append( " WHERE " )
            .append( MESSAGE_ID_COLUMN ).append( " = ?" )
            .toString();

        selectConsumerIDsByMsgSQL = new StringBuffer(128)
            .append( "SELECT " )
            .append( CONSUMER_ID_COLUMN )
            .append( " FROM " ).append( tableName )
            .append( " WHERE " )
            .append( MESSAGE_ID_COLUMN ).append( " = ? " )
            .append( " AND " )
            .append( STATE_COLUMN ).append( " <> " )
            .append( Store.INTEREST_STATE_ACKNOWLEDGED )
            .toString();

        selectTransactionAcksSQL = new StringBuffer(128)
            .append( "SELECT " )
            .append( CONSUMER_ID_COLUMN ).append( ", " )
            .append( MESSAGE_ID_COLUMN )
            .append( " FROM " ).append( tableName )
            .append( " WHERE " )
            .append( TRANSACTION_ID_COLUMN ).append( " = ?" )
            .toString();

        selectAllTransactionAcksSQL = new StringBuffer(128)
            .append( "SELECT " )
            .append( TRANSACTION_ID_COLUMN ).append( ", " )
            .append( CONSUMER_ID_COLUMN ).append( ", " )
            .append( MESSAGE_ID_COLUMN )
            .append( " FROM " ).append( tableName )
            .append( " WHERE " )
            .append( TRANSACTION_ID_COLUMN ).append( " IS NOT NULL" )
            .toString();
    }

    /**
     * Get the prefix name of the table.
     * @return table name
     */
    public final String getTableNamePrefix() {
        return TABLE_NAME_PREFIX;
    }

    /**
     * Get the name of the table.
     * @return table name
     */
    public String getTableName() {
        return tableName;
    }

    /**
     * Insert a new entry.
     * @param conn database connection
     * @param dstID the destination ID
     * @param sysMsgID the system message ID
     * @param conUIDs an array of consumer ids
     * @param states an array of states
     * @throws BrokerException
     */
    public void insert( Connection conn, String dstID, SysMessageID sysMsgID,
        ConsumerUID[] conUIDs, int[] states, boolean checkMsgExist )
        throws BrokerException {

        String msgID = sysMsgID.getUniqueName();

        int count = 0;
        boolean myConn = false;
        PreparedStatement pstmt = null;
        Exception myex = null;
        try {
            // Get a connection
            DBManager dbMgr = DBManager.getDBManager();
            if ( conn == null ) {
                conn = dbMgr.getConnection( false );
                myConn = true;
            }

            // No need to check for message existence
            if ( checkMsgExist ) {
                if ( getConsumerCount( conn, msgID ) > 0 ) {
                    // the message has a list already
                    throw new BrokerException(
                        br.getKString( BrokerResources.E_MSG_INTEREST_LIST_EXISTS, msgID ) );
                }

                dbMgr.getDAOFactory().getMessageDAO().checkMessage( conn, dstID, msgID );
            }

            boolean dobatch = dbMgr.supportsBatchUpdates();
            pstmt = conn.prepareStatement( insertSQL );
            for ( int len = conUIDs.length; count < len; count++ ) {
                pstmt.setString( 1, msgID );
                pstmt.setLong( 2, conUIDs[count].longValue() );
                pstmt.setInt( 3, states[count] );
                pstmt.setLong( 4, System.currentTimeMillis() );

                if ( dobatch ) {
                    pstmt.addBatch();
                } else {
                    pstmt.executeUpdate();
                }
            }

            if ( dobatch ) {
                pstmt.executeBatch();
            }

            if ( myConn ) {
                conn.commit();
            }
        } catch ( Exception e ) {
            myex = e;
            if (Store.getDEBUG() && count < conUIDs.length ) {
                logger.log( Logger .DEBUG, "Failed to persist interest: "
                    + conUIDs[count].toString()
                    + "("+conUIDs[count].getUniqueName() + ")");
            }

            try {
                if ( (conn != null) && !conn.getAutoCommit() ) {
                    conn.rollback();
                }
            } catch ( SQLException rbe ) {
                logger.log( Logger.ERROR, BrokerResources.X_DB_ROLLBACK_FAILED, rbe );
            }

            Exception ex;
            if ( e instanceof BrokerException ) {
                throw (BrokerException)e;
            } else if ( e instanceof IOException ) {
                ex = DBManager.wrapIOException("[" + insertSQL + "]", (IOException)e);
            } else if ( e instanceof SQLException ) {
                ex = DBManager.wrapSQLException("[" + insertSQL + "]", (SQLException)e);
            } else {
                ex = e;
            }

            throw new BrokerException(
                br.getKString( BrokerResources.X_PERSIST_INTEREST_LIST_FAILED,
                msgID ), ex );
        } finally {
            if ( myConn ) {
                Util.close( null, pstmt, conn, myex );
            } else {
                Util.close( null, pstmt, null, myex );
            }
        }
    }

    /**
     * Update existing entry.
     * @param conn database connection
     * @param dstUID the destination ID
     * @param sysMsgID the system message ID
     * @param conUID the consumer id
     * @param state the state
     * @throws BrokerException
     */
    public void updateState( Connection conn, DestinationUID dstUID,
        SysMessageID sysMsgID, ConsumerUID conUID, int state )
        throws BrokerException {

        String msgID = sysMsgID.getUniqueName();

        boolean myConn = false;
        PreparedStatement pstmt = null;
        Exception myex = null;
        try {
            // Get a connection
            DBManager dbMgr = DBManager.getDBManager();
            if ( conn == null ) {
                conn = dbMgr.getConnection( true );
                myConn = true;
            }

            // Get the broker ID of the msg which also checks if the msg exists
            String brokerID =
                dbMgr.getDAOFactory().getMessageDAO().getBroker( conn, dstUID, msgID );

            // For HA, state can only be udpated by the broker that owns the msg
            if ( Globals.getHAEnabled() && !dbMgr.getBrokerID().equals( brokerID ) ) {
                 String emsg = br.getKString( BrokerResources.X_PERSIST_INTEREST_STATE_FAILED,
                                              conUID.toString(), sysMsgID.toString() );
                 BrokerException be = new StoreBeingTakenOverException(
                            br.getKString(BrokerResources.E_STORE_BEING_TAKEN_OVER+"["+emsg+"]") );
                 try {
                     BrokerDAO dao = dbMgr.getDAOFactory().getBrokerDAO();
                     HABrokerInfo bkrInfo = dao.getBrokerInfo( conn, dbMgr.getBrokerID() );
                     logger.log( Logger.ERROR, br.getKString(BrokerResources.X_INTERNAL_EXCEPTION,
                                               bkrInfo.toString() ), be );
                 } catch (Throwable t) { /* Ignore error */ }
                 throw be;
            }

            pstmt = conn.prepareStatement( updateStateSQL );
            pstmt.setInt( 1, state );
            pstmt.setString( 2, msgID );
            pstmt.setLong( 3, conUID.longValue() );

            if ( pstmt.executeUpdate() == 0 ) {
                // Otherwise we're assuming the entry does not exist
                throw new BrokerException(
                    br.getKString( BrokerResources.E_INTEREST_STATE_NOT_FOUND_IN_STORE,
                    conUID.toString(), msgID ), Status.NOT_FOUND );
            }
        } catch ( Exception e ) {
            myex = e;
            try {
                if ( (conn != null) && !conn.getAutoCommit() ) {
                    conn.rollback();
                }
            } catch ( SQLException rbe ) {
                logger.log( Logger.ERROR, BrokerResources.X_DB_ROLLBACK_FAILED, rbe );
            }

            Exception ex;
            if ( e instanceof BrokerException ) {
                throw (BrokerException)e;
            } else if ( e instanceof SQLException ) {
                ex = DBManager.wrapSQLException("[" + updateStateSQL + "]", (SQLException)e);
            } else {
                ex = e;
            }

            throw new BrokerException(
                br.getKString( BrokerResources.X_PERSIST_INTEREST_STATE_FAILED,
                conUID.toString(), sysMsgID.toString() ), ex );
        } finally {
            if ( myConn ) {
                Util.close( null, pstmt, conn, myex );
            } else {
                Util.close( null, pstmt, null, myex );
            }
        }
    }

    /**
     * Update existing entry.
     * @param conn database connection
     * @param dstUID the destination ID
     * @param sysMsgID the system message ID
     * @param conUID the consumer id
     * @param newState the new state
     * @param expectedState the expected state
     * @throws BrokerException
     */
    public void updateState( Connection conn, DestinationUID dstUID,
        SysMessageID sysMsgID, ConsumerUID conUID, int newState,
        int expectedState ) throws BrokerException {

        String msgID = sysMsgID.getUniqueName();

        boolean myConn = false;
        PreparedStatement pstmt = null;
        Exception myex = null;
        try {
            // Get a connection
            if ( conn == null ) {
                conn = DBManager.getDBManager().getConnection( true );
                myConn = true;
            }

            // Since this method is used to update the state of a consumer
            // during a takeover, we will skip checking if the msg exists
            // to improve performance.           

            pstmt = conn.prepareStatement( updateState2SQL );
            pstmt.setInt( 1, newState );
            pstmt.setString( 2, msgID );
            pstmt.setLong( 3, conUID.longValue() );
            pstmt.setInt( 4, expectedState );

            if ( pstmt.executeUpdate() == 0 ) {
                // Verify if record is not updated because state doesn't match
                int currentState = getState( conn, sysMsgID, conUID );
                if ( currentState != expectedState ) {
                    String[] args = { conUID.toString(),
                                      sysMsgID.toString(),
                                      String.valueOf(expectedState),
                                      String.valueOf(currentState) };
                    throw new BrokerException(
                        br.getKString( BrokerResources.E_PERSIST_INTEREST_STATE_FAILED,
                        args ), Status.PRECONDITION_FAILED );
                }

                // Otherwise we're assuming the entry does not exist
                throw new BrokerException(
                    br.getKString( BrokerResources.E_INTEREST_STATE_NOT_FOUND_IN_STORE,
                    conUID.toString(), msgID ), Status.NOT_FOUND );
            }
        } catch ( Exception e ) {
            myex = e;
            try {
                if ( (conn != null) && !conn.getAutoCommit() ) {
                    conn.rollback();
                }
            } catch ( SQLException rbe ) {
                logger.log( Logger.ERROR, BrokerResources.X_DB_ROLLBACK_FAILED, rbe );
            }

            Exception ex;
            if ( e instanceof BrokerException ) {
                throw (BrokerException)e;
            } else if ( e instanceof SQLException ) {
                ex = DBManager.wrapSQLException("[" + updateState2SQL + "]", (SQLException)e);
            } else {
                ex = e;
            }

            throw new BrokerException(
                br.getKString( BrokerResources.X_PERSIST_INTEREST_STATE_FAILED,
                conUID.toString(), sysMsgID.toString() ), ex );
        } finally {
            if ( myConn ) {
                Util.close( null, pstmt, conn, myex );
            } else {
                Util.close( null, pstmt, null, myex );
            }
        }
    }

    /**
     * Update existing entry.
     * @param conn database connection
     * @param sysMsgID the system message ID
     * @param conUID the consumer id
     * @param txnUID the transaction id associated with an acknowledgment
     * @throws BrokerException
     */
    public void updateTransaction( Connection conn, SysMessageID sysMsgID,
        ConsumerUID conUID, TransactionUID txnUID ) throws BrokerException {

        String msgID = sysMsgID.getUniqueName();

        boolean myConn = false;
        PreparedStatement pstmt = null;
        Exception myex = null;
        try {
            // Get a connection
            DBManager dbMgr = DBManager.getDBManager();
            if ( conn == null ) {
                conn = dbMgr.getConnection( true );
                myConn = true;
            }

            // Make sure the transaction exists
            dbMgr.getDAOFactory().getTransactionDAO().checkTransaction(
                conn, txnUID.longValue() );

            pstmt = conn.prepareStatement( updateTransactionSQL );
            pstmt.setLong( 1, txnUID.longValue() );
            pstmt.setString( 2, msgID );
            pstmt.setLong( 3, conUID.longValue() );

            if ( Globals.getHAEnabled() ) {
                pstmt.setString( 4, dbMgr.getBrokerID() );
            }

            if ( pstmt.executeUpdate() == 0 ) {
                // For HA mode, check if this broker still owns the store
                if ( Globals.getHAEnabled() ) {
                    String brokerID = dbMgr.getBrokerID();
                    BrokerDAO dao = dbMgr.getDAOFactory().getBrokerDAO();
                    if ( dao.isBeingTakenOver( conn, brokerID ) ) {
                        BrokerException be = new StoreBeingTakenOverException(
                            br.getKString( BrokerResources.E_STORE_BEING_TAKEN_OVER ) );

                        try {
                            HABrokerInfo bkrInfo = dao.getBrokerInfo( conn, brokerID );
                            logger.log( Logger.ERROR, br.getKString(
                                BrokerResources.X_INTERNAL_EXCEPTION,
                                bkrInfo.toString() ), be );
                        } catch (Throwable t) { /* Ignore error */ }

                        throw be;
                    }
                }

                // Check if ack exists
                long existingTxnID = getTransaction( conn, sysMsgID, conUID );
                if ( existingTxnID > 0 ) {
                    String ack = "[" + sysMsgID + "]" + conUID;
                    throw new TransactionAckExistException(
                        br.getKString( BrokerResources.X_PERSIST_TXNACK_FAILED,
                            ack, txnUID ) + " : " +
                        br.getKString( BrokerResources.E_ACK_EXISTS_IN_STORE,
                            ack, String.valueOf( existingTxnID ) ), Status.CONFLICT );
                }

                // We're assuming the entry does not exist
                throw new BrokerException(
                    br.getKString( BrokerResources.E_INTEREST_STATE_NOT_FOUND_IN_STORE,
                    conUID.toString(), msgID ), Status.NOT_FOUND );
            }
        } catch ( Exception e ) {
            myex = e;
            try {
                if ( (conn != null) && !conn.getAutoCommit() ) {
                    conn.rollback();
                }
            } catch ( SQLException rbe ) {
                logger.log( Logger.ERROR, BrokerResources.X_DB_ROLLBACK_FAILED, rbe );
            }

            Exception ex;
            if ( e instanceof BrokerException ) {
                throw (BrokerException)e;
            } else if ( e instanceof SQLException ) {
                ex = DBManager.wrapSQLException("[" + updateTransactionSQL + "]", (SQLException)e);
            } else {
                ex = e;
            }

            throw new BrokerException(
                br.getKString( BrokerResources.X_PERSIST_INTEREST_STATE_FAILED,
                conUID.toString(), sysMsgID.toString() ), ex );
        } finally {
            if ( myConn ) {
                Util.close( null, pstmt, conn, myex );
            } else {
                Util.close( null, pstmt, null, myex );
            }
        }
    }

    /**
     * Clear the transaction from all consumer states associated with it.
     * @param conn Database Connection
     * @param txnUID the transaction
     * @throws BrokerException
     */
    public void clearTransaction( Connection conn, TransactionUID txnUID )
        throws BrokerException {

        long txnID = txnUID.longValue();

        boolean myConn = false;
        PreparedStatement pstmt = null;
        Exception myex = null;
        try {
            // Get a connection
            DBManager dbMgr = DBManager.getDBManager();
            if ( conn == null ) {
                conn = dbMgr.getConnection( true );
                myConn = true;
            }

            pstmt = conn.prepareStatement( clearTxnSQL );
            pstmt.setLong( 1, txnID );

            if ( Globals.getHAEnabled() ) {
                pstmt.setString( 2, dbMgr.getBrokerID() );
            }

            if ( pstmt.executeUpdate() == 0 ) {
                // For HA mode, check if this broker still owns the store
                if ( Globals.getHAEnabled() ) {
                    String brokerID = dbMgr.getBrokerID();
                    BrokerDAO dao = dbMgr.getDAOFactory().getBrokerDAO();
                    if ( dao.isBeingTakenOver( conn, brokerID ) ) {
                        BrokerException be = new StoreBeingTakenOverException(
                            br.getKString( BrokerResources.E_STORE_BEING_TAKEN_OVER ) );

                        try {
                            HABrokerInfo bkrInfo = dao.getBrokerInfo( conn, brokerID );
                            logger.log( Logger.ERROR, br.getKString(
                                BrokerResources.X_INTERNAL_EXCEPTION,
                                bkrInfo.toString() ), be );
                        } catch (Throwable t) { /* Ignore error */ }

                        throw be;
                    }
                }
            }
        } catch ( Exception e ) {
            myex = e;
            try {
                if ( (conn != null) && !conn.getAutoCommit() ) {
                    conn.rollback();
                }
            } catch ( SQLException rbe ) {
                logger.log( Logger.ERROR, BrokerResources.X_DB_ROLLBACK_FAILED, rbe );
            }

            Exception ex;
            if ( e instanceof BrokerException ) {
                throw (BrokerException)e;
            } else if ( e instanceof SQLException ) {
                ex = DBManager.wrapSQLException("[" + clearTxnSQL + "]", (SQLException)e);
            } else {
                ex = e;
            }

            throw new BrokerException(
                br.getKString( BrokerResources.X_CLEAR_TXN_FROM_INT_STATES_FAILED,
                txnUID ), ex );
        } finally {
            if ( myConn ) {
                Util.close( null, pstmt, conn, myex );
            } else {
                Util.close( null, pstmt, null, myex );
            }
        }
    }

    /**
     * Delete all consumer states for a message.
     * @param conn Database Connection
     * @param sysMsgID the SysMessageID
     * @throws BrokerException
     */
    public void deleteByMessageID( Connection conn, SysMessageID sysMsgID )
        throws BrokerException {

        String msgID = sysMsgID.getUniqueName();

        boolean myConn = false;
        PreparedStatement pstmt = null;
        Exception myex = null;
        try {
            // Get a connection
            if ( conn == null ) {
                conn = DBManager.getDBManager().getConnection( true );
                myConn = true;
            }

            pstmt = conn.prepareStatement( deleteByMsgSQL );
            pstmt.setString( 1, msgID );
            pstmt.executeUpdate();
        } catch ( Exception e ) {
            myex = e;
            try {
                if ( (conn != null) && !conn.getAutoCommit() ) {
                    conn.rollback();
                }
            } catch ( SQLException rbe ) {
                logger.log( Logger.ERROR, BrokerResources.X_DB_ROLLBACK_FAILED, rbe );
            }

            Exception ex;
            if ( e instanceof BrokerException ) {
                throw (BrokerException)e;
            } else if ( e instanceof SQLException ) {
                ex = DBManager.wrapSQLException("[" + deleteByMsgSQL + "]", (SQLException)e);
            } else {
                ex = e;
            }

            throw new BrokerException(
                br.getKString( BrokerResources.X_REMOVE_INTEREST_STATE_FAILED,
                msgID ), ex );
        } finally {
            if ( myConn ) {
                Util.close( null, pstmt, conn, myex );
            } else {
                Util.close( null, pstmt, null, myex );
            }
        }
    }

    /**
     * Delete all consumer states for a transaction.
     * @param conn Database Connection
     * @param txnUID the transaction
     * @throws BrokerException
     */
    public void deleteByTransaction( Connection conn, TransactionUID txnUID )
        throws BrokerException {

        long txnID = txnUID.longValue();

        boolean myConn = false;
        PreparedStatement pstmt = null;
        Exception myex = null;
        try {
            // Get a connection
            if ( conn == null ) {
                conn = DBManager.getDBManager().getConnection( true );
                myConn = true;
            }

            pstmt = conn.prepareStatement( deleteByTxnSQL );
            pstmt.setLong( 1, txnID );
            pstmt.executeUpdate();
        } catch ( Exception e ) {
            myex = e;
            try {
                if ( (conn != null) && !conn.getAutoCommit() ) {
                    conn.rollback();
                }
            } catch ( SQLException rbe ) {
                logger.log( Logger.ERROR, BrokerResources.X_DB_ROLLBACK_FAILED, rbe );
            }

            Exception ex;
            if ( e instanceof BrokerException ) {
                throw (BrokerException)e;
            } else if ( e instanceof SQLException ) {
                ex = DBManager.wrapSQLException("[" + deleteByTxnSQL + "]", (SQLException)e);
            } else {
                ex = e;
            }

            throw new BrokerException(
                br.getKString( BrokerResources.X_REMOVE_INT_STATES_FOR_TXN_FAILED,
                txnUID ), ex );
        } finally {
            if ( myConn ) {
                Util.close( null, pstmt, conn, myex );
            } else {
                Util.close( null, pstmt, null, myex );
            }
        }
    }

    /**
     * Delete all consumer states for a destination.
     * Note: Because Consumer State table is a child table to the Message table,
     * this method will be call from MessageDAO.deleteByDestination() to delete
     * all consumer states before it deletes the messages.
     * @param conn Database Connection
     * @param dstUID the destination
     * @throws BrokerException
     */
    public void deleteByDestination( Connection conn, DestinationUID dstUID )
        throws BrokerException {

        String dstID = dstUID.toString();

        boolean myConn = false;
        PreparedStatement pstmt = null;
        Exception myex = null;
        try {
            // Get a connection
            DBManager dbMgr = DBManager.getDBManager();
            if ( conn == null ) {
                conn = dbMgr.getConnection( true );
                myConn = true;
            }

            pstmt = conn.prepareStatement( deleteByDstSQL );
            pstmt.setString( 1, dbMgr.getBrokerID() );
            pstmt.setString( 2, dstID );
            pstmt.executeUpdate();
        } catch ( Exception e ) {
            myex = e;
            try {
                if ( (conn != null) && !conn.getAutoCommit() ) {
                    conn.rollback();
                }
            } catch ( SQLException rbe ) {
                logger.log( Logger.ERROR, BrokerResources.X_DB_ROLLBACK_FAILED, rbe );
            }

            Exception ex;
            if ( e instanceof BrokerException ) {
                throw (BrokerException)e;
            } else if ( e instanceof SQLException ) {
                ex = DBManager.wrapSQLException("[" + deleteByDstSQL + "]", (SQLException)e);
            } else {
                ex = e;
            }

            throw new BrokerException(
                br.getKString( BrokerResources.X_REMOVE_INT_STATES_FOR_DST_FAILED,
                    dstID ), ex );
        } finally {
            if ( myConn ) {
                Util.close( null, pstmt, conn, myex );
            } else {
                Util.close( null, pstmt, null, myex );
            }
        }
    }

    /**
     * Delete all entries.
     * @param conn Database Connection
     * @throws BrokerException
     */
    public void deleteAll( Connection conn )
        throws BrokerException {

        String whereClause = null;
        if ( Globals.getHAEnabled() ) {
            // Only delete consumer states that belong to the running broker,
            // construct the where clause for the delete statement:
            //   DELETE FROM mqconstate41cmycluster WHERE message_id IN
            //    (SELECT id FROM mqmsg41cmycluster msgtbl,
            //                    mqses41cmycluster sestbl
            //     WHERE sestbl.broker_id = 'mybroker' AND
            //           sestbl.id = msgtbl.store_session_id)
            DBManager dbMgr = DBManager.getDBManager();
            whereClause = new StringBuffer(128)
                .append( MESSAGE_ID_COLUMN )
                .append( " IN (SELECT msgTbl." ).append( MessageDAO.ID_COLUMN )
                .append(   " FROM " )
                .append(     dbMgr.getTableName( MessageDAO.TABLE_NAME_PREFIX ) )
                .append(     " msgTbl, " )
                .append(     dbMgr.getTableName( StoreSessionDAO.TABLE_NAME_PREFIX ) )
                .append(     " sesTbl" )
                .append(   " WHERE sesTbl." ).append( StoreSessionDAO.BROKER_ID_COLUMN )
                .append(     " = '" ).append( dbMgr.getBrokerID() )
                .append(     "' AND sesTbl." ).append( StoreSessionDAO.ID_COLUMN )
                .append(     " = msgTbl." ).append( MessageDAO.STORE_SESSION_ID_COLUMN )
                .append( ")" )
                .toString();
        }

        deleteAll( conn, whereClause, null, 0 );       
    }

    /**
     * Get consumer's state.
     * @param conn database connection
     * @param sysMsgID the system message ID
     * @param conUID the consumer ID
     * @return consumer's state
     * @throws com.sun.messaging.jmq.jmsserver.util.BrokerException
     */
    public int getState( Connection conn, SysMessageID sysMsgID,
        ConsumerUID conUID ) throws BrokerException {

        int state = -1;
        String id = sysMsgID.getUniqueName();

        boolean myConn = false;
        PreparedStatement pstmt = null;
        ResultSet rs = null;
        Exception myex = null;
        try {
            // Get a connection
            if ( conn == null ) {
                conn = DBManager.getDBManager().getConnection( true );
                myConn = true;
            }

            pstmt = conn.prepareStatement( selectStateSQL );
            pstmt.setString( 1, id );
            pstmt.setLong( 2, conUID.longValue() );
            rs = pstmt.executeQuery();
            if ( rs.next() ) {
                state = rs.getInt( 1 );
            } else {
                // We are assuming the consumer state does not exist
                throw new BrokerException(
                    br.getKString( BrokerResources.E_INTEREST_STATE_NOT_FOUND_IN_STORE,
                    conUID.toString(), id ), Status.NOT_FOUND );
            }
        } catch ( Exception e ) {
            myex = e;
            try {
                if ( (conn != null) && !conn.getAutoCommit() ) {
                    conn.rollback();
                }
            } catch ( SQLException rbe ) {
                logger.log( Logger.ERROR, BrokerResources.X_DB_ROLLBACK_FAILED, rbe );
            }

            Exception ex;
            if ( e instanceof BrokerException ) {
                throw (BrokerException)e;
            } else if ( e instanceof SQLException ) {
                ex = DBManager.wrapSQLException("[" + selectStateSQL + "]", (SQLException)e);
            } else {
                ex = e;
            }

            throw new BrokerException(
                br.getKString( BrokerResources.X_GET_INTEREST_STATE_FAILED,
                conUID.toString(), id ), ex );
        } finally {
            if ( myConn ) {
                Util.close( rs, pstmt, conn, myex );
            } else {
                Util.close( rs, pstmt, null, myex );
            }
        }

        return state;
    }

    /**
     * Get all consumers and states associated with the messapge ID.
     * @param conn database connection
     * @return HashMap of containing all consumer's state
     * @throws BrokerException
     */
    public HashMap getStates( Connection conn, SysMessageID sysMsgID )
        throws BrokerException {

        HashMap map = new HashMap();
        String id = sysMsgID.getUniqueName();

        boolean myConn = false;
        PreparedStatement pstmt = null;
        ResultSet rs = null;
        Exception myex = null;
        try {
            // Get a connection
            if ( conn == null ) {
                conn = DBManager.getDBManager().getConnection( true );
                myConn = true;
            }

            pstmt = conn.prepareStatement( selectStatesByMsgSQL );
            pstmt.setString( 1, id );
            rs = pstmt.executeQuery();
            while ( rs.next() ) {
                ConsumerUID cUID = new ConsumerUID( rs.getLong( 1 ) );
                int state = rs.getInt( 2 );
                map.put( cUID, Integer.valueOf( state ) );
            }
        } catch ( Exception e ) {
            myex = e;
            try {
                if ( (conn != null) && !conn.getAutoCommit() ) {
                    conn.rollback();
                }
            } catch ( SQLException rbe ) {
                logger.log( Logger.ERROR, BrokerResources.X_DB_ROLLBACK_FAILED, rbe );
            }

            Exception ex;
            if ( e instanceof BrokerException ) {
                throw (BrokerException)e;
            } else if ( e instanceof SQLException ) {
                ex = DBManager.wrapSQLException("[" + selectStatesByMsgSQL + "]", (SQLException)e);
            } else {
                ex = e;
            }

            throw new BrokerException(
                br.getKString(
                    BrokerResources.X_LOAD_INT_STATES_FOR_MSG_FAILED, id ), ex );
        } finally {
            if ( myConn ) {
                Util.close( rs, pstmt, conn, myex );
            } else {
                Util.close( rs, pstmt, null, myex );
            }
        }

        return map;
    }

    /**
     * Get consumer's transaction.
     * @param conn database connection
     * @param sysMsgID the system message ID
     * @param conUID the consumer ID
     * @return consumer's transaction
     */
    public long getTransaction( Connection conn, SysMessageID sysMsgID,
        ConsumerUID conUID ) throws BrokerException {

        long txnID = -1;
        String id = sysMsgID.getUniqueName();

        boolean myConn = false;
        PreparedStatement pstmt = null;
        ResultSet rs = null;
        Exception myex = null;
        try {
            // Get a connection
            if ( conn == null ) {
                conn = DBManager.getDBManager().getConnection( true );
                myConn = true;
            }

            pstmt = conn.prepareStatement( selectTransactionSQL );
            pstmt.setString( 1, id );
            pstmt.setLong( 2, conUID.longValue() );
            rs = pstmt.executeQuery();
            if ( rs.next() ) {
                txnID = rs.getLong( 1 );
            }
        } catch ( Exception e ) {
            myex = e;
            try {
                if ( (conn != null) && !conn.getAutoCommit() ) {
                    conn.rollback();
                }
            } catch ( SQLException rbe ) {
                logger.log( Logger.ERROR, BrokerResources.X_DB_ROLLBACK_FAILED, rbe );
            }

            Exception ex;
            if ( e instanceof BrokerException ) {
                throw (BrokerException)e;
            } else if ( e instanceof SQLException ) {
                ex = DBManager.wrapSQLException("[" + selectTransactionSQL + "]", (SQLException)e);
            } else {
                ex = e;
            }

            throw new BrokerException(
                br.getKString( BrokerResources.X_RETRIEVE_INTEREST_FAILED,
                conUID.toString() ), ex );
        } finally {
            if ( myConn ) {
                Util.close( rs, pstmt, conn, myex );
            } else {
                Util.close( rs, pstmt, null, myex );
            }
        }

        return txnID;
    }

    /**
     * Get all consumers associated with the messapge ID and state is not
     * INTEREST_STATE_ACKNOWLEDGED.
     * @param conn database connection
     * @param sysMsgID the system message ID
     * @return list of consumer IDs
     * @throws BrokerException
     */
    public List getConsumerUIDs( Connection conn, SysMessageID sysMsgID )
        throws BrokerException {

        List list = new ArrayList();
        String id = sysMsgID.getUniqueName();

        boolean myConn = false;
        PreparedStatement pstmt = null;
        ResultSet rs = null;
        Exception myex = null;
        try {
            // Get a connection
            if ( conn == null ) {
                conn = DBManager.getDBManager().getConnection( true );
                myConn = true;
            }

            pstmt = conn.prepareStatement( selectConsumerIDsByMsgSQL );
            pstmt.setString( 1, id );
            rs = pstmt.executeQuery();
            while ( rs.next() ) {
                ConsumerUID cUID = new ConsumerUID( rs.getLong( 1 ) );
                list.add( cUID );
            }
        } catch ( Exception e ) {
            myex = e;
            try {
                if ( (conn != null) && !conn.getAutoCommit() ) {
                    conn.rollback();
                }
            } catch ( SQLException rbe ) {
                logger.log( Logger.ERROR, BrokerResources.X_DB_ROLLBACK_FAILED, rbe );
            }

            Exception ex;
            if ( e instanceof BrokerException ) {
                throw (BrokerException)e;
            } else if ( e instanceof SQLException ) {
                ex = DBManager.wrapSQLException("[" + selectConsumerIDsByMsgSQL + "]", (SQLException)e);
            } else {
                ex = e;
            }

            throw new BrokerException(
                br.getKString(
                    BrokerResources.X_LOAD_INT_STATES_FOR_MSG_FAILED, id ), ex );
        } finally {
            if ( myConn ) {
                Util.close( rs, pstmt, conn, myex );
            } else {
                Util.close( rs, pstmt, null, myex );
            }
        }

        return list;
    }

    /**
     * Retrieve all transaction acknowledgements for the specified transaction ID.
     * @param conn database connection
     * @param txnUID the transaction ID
     * @return List of transaction acks
     * @throws BrokerException
     */
    public List getTransactionAcks( Connection conn, TransactionUID txnUID )
        throws BrokerException {

        List data = new ArrayList();

        boolean myConn = false;
        PreparedStatement pstmt = null;
        ResultSet rs = null;
        Exception myex = null;
        try {
            // Get a connection
            if ( conn == null ) {
                conn = DBManager.getDBManager().getConnection( true );
                myConn = true;
            }

            pstmt = conn.prepareStatement( selectTransactionAcksSQL );
            pstmt.setLong( 1, txnUID.longValue() );
            rs = pstmt.executeQuery();

            while ( rs.next() ) {
                ConsumerUID conID = new ConsumerUID( rs.getLong( 1 ) );
                try {
                    SysMessageID msgID = SysMessageID.get( rs.getString( 2 ) );
                    data.add( new TransactionAcknowledgement( msgID, conID, conID ) );
                } catch ( Exception e ) {
                    // fail to parse one object; just log it
                    logger.logStack( Logger.ERROR,
                        BrokerResources.X_PARSE_TXNACK_FAILED, txnUID, e );
                }
            }
        } catch ( Exception e ) {
             myex = e;
             try {
                 if ( (conn != null) && !conn.getAutoCommit() ) {
                     conn.rollback();
                 }
             } catch ( SQLException rbe ) {
                 logger.log( Logger.ERROR, BrokerResources.X_DB_ROLLBACK_FAILED, rbe );
             }

            Exception ex;
            if ( e instanceof BrokerException ) {
                throw (BrokerException)e;
            } else if ( e instanceof SQLException ) {
                ex = DBManager.wrapSQLException("[" + selectTransactionAcksSQL + "]", (SQLException)e);
            } else {
                ex = e;
            }

            throw new BrokerException(
                br.getKString( BrokerResources.X_LOAD_ACKS_FOR_TXN_FAILED,
                    txnUID ), ex );
        } finally {
            if ( myConn ) {
                Util.close( rs, pstmt, conn, myex );
            } else {
                Util.close( rs, pstmt, null, myex );
            }
        }

        return data;
    }

    /**
     * Retrieve all transaction acknowledgements for all transactions.
     * @param conn database connection
     * @return HashMap of containing all acknowledgement
     * @throws BrokerException
     */
    public HashMap getAllTransactionAcks( Connection conn )
        throws BrokerException {

        HashMap data = new HashMap(100);

        boolean myConn = false;
        PreparedStatement pstmt = null;
        ResultSet rs = null;
        Exception myex = null;
        try {
            // Get a connection
            if ( conn == null ) {
                conn = DBManager.getDBManager().getConnection( true );
                myConn = true;
            }

            pstmt = conn.prepareStatement( selectAllTransactionAcksSQL );
            rs = pstmt.executeQuery();

            while ( rs.next() ) {
                TransactionUID txnUID = new TransactionUID( rs.getLong( 1 ) );
                ConsumerUID conID = new ConsumerUID( rs.getLong( 2 ) );
                try {
                    SysMessageID msgID = SysMessageID.get( rs.getString( 3 ) );

                    List ackList = (List)data.get( txnUID );
                    if ( ackList == null ) {
                        // Create a new list of acks for this txn
                        ackList = new ArrayList(25);
                        data.put( txnUID, ackList );
                    }

                    // Added ack to the list of acks
                    ackList.add( new TransactionAcknowledgement( msgID, conID, conID ) );
                } catch ( Exception e ) {
                    // fail to parse one object; just log it
                    logger.logStack( Logger.ERROR,
                        BrokerResources.X_PARSE_TXNACK_FAILED, txnUID, e );
                }
            }
        } catch ( Exception e ) {
            myex = e;
            try {
                if ( (conn != null) && !conn.getAutoCommit() ) {
                    conn.rollback();
                }
            } catch ( SQLException rbe ) {
                logger.log( Logger.ERROR, BrokerResources.X_DB_ROLLBACK_FAILED, rbe );
            }

            Exception ex;
            if ( e instanceof BrokerException ) {
                throw (BrokerException)e;
            } else if ( e instanceof SQLException ) {
                ex = DBManager.wrapSQLException("[" + selectAllTransactionAcksSQL + "]", (SQLException)e);
            } else {
                ex = e;
            }

            throw new BrokerException(
                br.getKString( BrokerResources.X_LOAD_TXNACK_FAILED ), ex );
        } finally {
            if ( myConn ) {
                Util.close( rs, pstmt, conn, myex );
            } else {
                Util.close( rs, pstmt, null, myex );
            }
        }

        // Transforms HashMap value to TransactionAcknowledgement[] instead of List
        Set keySet = data.keySet();
        if ( !keySet.isEmpty() ) {
            Iterator itr = keySet.iterator();
            while ( itr.hasNext() ) {
                TransactionUID txnUID = (TransactionUID)itr.next();
                List ackList = (List)data.get( txnUID );
                data.put( txnUID, ackList.toArray(
                    new TransactionAcknowledgement[0] ) );
            }
        }

        return data;
    }

    /**
     * Get debug information about the store.
     * @param conn database connection
     * @return A HashMap of name value pair of information
     */
    public HashMap getDebugInfo( Connection conn ) {

        HashMap map = new HashMap();
        int count = -1;

        try {
            // Get row count
            count = getRowCount( null, null );
        } catch ( Exception e ) {
            logger.log( Logger.ERROR, e.getMessage(), e.getCause() );
        }

        map.put( "Message/Consumer states(" + tableName + ")", String.valueOf( count ) );
        return map;
    }

    /**
     * Get the number of consumers (e.g. interest) for the specified message.
     * @param conn database connection
     * @param msgID the message ID
     * @return number of consumers
     * @throws BrokerException
     */
    public int getConsumerCount( Connection conn, String msgID )
        throws BrokerException {

        int count = -1;

        boolean myConn = false;
        PreparedStatement pstmt = null;
        ResultSet rs = null;
        Exception myex = null;
        try {
            // Get a connection
            if ( conn == null ) {
                conn = DBManager.getDBManager().getConnection( false );
                myConn = true;
            }

            pstmt = conn.prepareStatement( selectCountByMsgSQL );
            pstmt.setString( 1, msgID );
            rs = pstmt.executeQuery();

            if ( rs.next() ) {
                count = rs.getInt( 1 );
            }
        } catch ( Exception e ) {
            myex = e;
            try {
                if ( (conn != null) && !conn.getAutoCommit() ) {
                    conn.rollback();
                }
            } catch ( SQLException rbe ) {
                logger.log( Logger.ERROR, BrokerResources.X_DB_ROLLBACK_FAILED+"["+selectCountByMsgSQL+"]", rbe );
            }

            Exception ex;
            if ( e instanceof BrokerException ) {
                throw (BrokerException)e;
            } else if ( e instanceof SQLException ) {
                ex = DBManager.wrapSQLException("[" + selectCountByMsgSQL + "]", (SQLException)e);
            } else {
                ex = e;
            }

            throw new BrokerException(
                br.getKString( BrokerResources.X_LOAD_INT_STATES_FOR_MSG_FAILED,
                msgID ), ex );

        } finally {
            if ( myConn ) {
                Util.close( rs, pstmt, conn, myex );
            } else {
                Util.close( rs, pstmt, null, myex );
            }
        }

        return count;
    }
}
TOP

Related Classes of com.sun.messaging.jmq.jmsserver.persist.jdbc.ConsumerStateDAOImpl

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.