/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
*
* Copyright (c) 2000-2010 Oracle and/or its affiliates. All rights reserved.
*
* The contents of this file are subject to the terms of either the GNU
* General Public License Version 2 only ("GPL") or the Common Development
* and Distribution License("CDDL") (collectively, the "License"). You
* may not use this file except in compliance with the License. You can
* obtain a copy of the License at
* https://glassfish.dev.java.net/public/CDDL+GPL_1_1.html
* or packager/legal/LICENSE.txt. See the License for the specific
* language governing permissions and limitations under the License.
*
* When distributing the software, include this License Header Notice in each
* file and include the License file at packager/legal/LICENSE.txt.
*
* GPL Classpath Exception:
* Oracle designates this particular file as subject to the "Classpath"
* exception as provided by Oracle in the GPL Version 2 section of the License
* file that accompanied this code.
*
* Modifications:
* If applicable, add the following below the License Header, with the fields
* enclosed by brackets [] replaced by your own identifying information:
* "Portions Copyright [year] [name of copyright owner]"
*
* Contributor(s):
* If you wish your version of this file to be governed by only the CDDL or
* only the GPL Version 2, indicate your decision by adding "[Contributor]
* elects to include this software in this distribution under the [CDDL or GPL
* Version 2] license." If you don't indicate a single choice of license, a
* recipient has the option to distribute your version of this file under
* either the CDDL, the GPL Version 2 or to extend the choice of license to
* its licensees as provided above. However, if you add GPL Version 2 code
* and therefore, elected the GPL Version 2 license, then the option applies
* only if the new code is made subject to such option by the copyright
* holder.
*/
/*
* @(#)ConsumerStateDAOImpl.java 1.36 06/29/07
*/
package com.sun.messaging.jmq.jmsserver.persist.jdbc;
import com.sun.messaging.jmq.jmsserver.persist.Store;
import com.sun.messaging.jmq.jmsserver.persist.HABrokerInfo;
import com.sun.messaging.jmq.util.log.Logger;
import com.sun.messaging.jmq.jmsserver.Globals;
import com.sun.messaging.jmq.jmsserver.util.*;
import com.sun.messaging.jmq.jmsserver.data.TransactionUID;
import com.sun.messaging.jmq.jmsserver.data.TransactionAcknowledgement;
import com.sun.messaging.jmq.jmsserver.core.ConsumerUID;
import com.sun.messaging.jmq.jmsserver.core.DestinationUID;
import com.sun.messaging.jmq.jmsserver.resources.*;
import com.sun.messaging.jmq.io.SysMessageID;
import com.sun.messaging.jmq.io.Status;
import java.util.*;
import java.sql.*;
import java.io.*;
/**
* This class implement a generic ConsumerStateDAO.
*/
class ConsumerStateDAOImpl extends BaseDAOImpl implements ConsumerStateDAO {
protected String tableName;
// SQLs
protected String insertSQL;
protected String updateTransactionSQL;
protected String updateStateSQL;
protected String updateState2SQL;
protected String clearTxnSQL;
protected String deleteByTxnSQL;
protected String deleteByDstSQL;
protected String deleteByStateSQL;
protected String deleteByMsgSQL;
protected String selectStateSQL;
protected String selectStatesByMsgSQL;
protected String selectTransactionSQL;
protected String selectCountByMsgSQL;
protected String selectConsumerIDsByMsgSQL;
protected String selectTransactionAcksSQL;
protected String selectAllTransactionAcksSQL;
/**
* Constructor
* @throws BrokerException
*/
ConsumerStateDAOImpl() throws BrokerException {
// Initialize all SQLs
DBManager dbMgr = DBManager.getDBManager();
// Criteria to ensure the local broker still owns the store for HA
String brokerNotTakenOverClause = Globals.getHAEnabled() ?
" AND NOT EXISTS (" +
((BrokerDAOImpl)dbMgr.getDAOFactory().getBrokerDAO())
.selectIsBeingTakenOverSQL + ")" : "";
tableName = dbMgr.getTableName( TABLE_NAME_PREFIX );
insertSQL = new StringBuffer(128)
.append( "INSERT INTO " ).append( tableName )
.append( " ( " )
.append( MESSAGE_ID_COLUMN ).append( ", " )
.append( CONSUMER_ID_COLUMN ).append( ", " )
.append( STATE_COLUMN ).append( ", " )
.append( CREATED_TS_COLUMN )
.append( ") VALUES ( ?, ?, ?, ? )" )
.toString();
updateTransactionSQL = new StringBuffer(128)
.append( "UPDATE " ).append( tableName )
.append( " SET " )
.append( TRANSACTION_ID_COLUMN ).append( " = ?" )
.append( " WHERE " )
.append( MESSAGE_ID_COLUMN ).append( " = ?" )
.append( " AND " )
.append( CONSUMER_ID_COLUMN ).append( " = ?" )
.append( " AND " )
.append( TRANSACTION_ID_COLUMN ).append( " IS NULL" )
.append( brokerNotTakenOverClause )
.toString();
updateStateSQL = new StringBuffer(128)
.append( "UPDATE " ).append( tableName )
.append( " SET " )
.append( STATE_COLUMN ).append( " = ?" )
.append( " WHERE " )
.append( MESSAGE_ID_COLUMN ).append( " = ?" )
.append( " AND " )
.append( CONSUMER_ID_COLUMN ).append( " = ?" )
.toString();
updateState2SQL = new StringBuffer(128)
.append( "UPDATE " ).append( tableName )
.append( " SET " )
.append( STATE_COLUMN ).append( " = ?" )
.append( " WHERE " )
.append( MESSAGE_ID_COLUMN ).append( " = ?" )
.append( " AND " )
.append( CONSUMER_ID_COLUMN ).append( " = ?" )
.append( " AND " )
.append( STATE_COLUMN ).append( " = ?" )
.toString();
clearTxnSQL = new StringBuffer(128)
.append( "UPDATE " ).append( tableName )
.append( " SET " )
.append( TRANSACTION_ID_COLUMN ).append( " = NULL" )
.append( " WHERE " )
.append( TRANSACTION_ID_COLUMN ).append( " = ?" )
.append( brokerNotTakenOverClause )
.toString();
deleteByTxnSQL = new StringBuffer(128)
.append( "DELETE FROM " ).append( tableName )
.append( " WHERE " )
.append( TRANSACTION_ID_COLUMN ).append( " = ?" )
.toString();
deleteByDstSQL = new StringBuffer(128)
.append( "DELETE FROM " ).append( tableName )
.append( " WHERE " )
.append( MESSAGE_ID_COLUMN ).append( " IN " )
.append( "(SELECT msgTbl." ).append( MessageDAO.ID_COLUMN )
.append( " FROM " )
.append( dbMgr.getTableName( MessageDAO.TABLE_NAME_PREFIX ) )
.append( " msgTbl, " )
.append( dbMgr.getTableName( StoreSessionDAO.TABLE_NAME_PREFIX ) )
.append( " sesTbl" )
.append( " WHERE sesTbl." )
.append( StoreSessionDAO.BROKER_ID_COLUMN ).append( " = ?" )
.append( " AND sesTbl." ).append( StoreSessionDAO.ID_COLUMN )
.append( " = msgTbl." ).append( MessageDAO.STORE_SESSION_ID_COLUMN )
.append( " AND " )
.append( MessageDAO.DESTINATION_ID_COLUMN ).append( " = ?)")
.toString();
deleteByMsgSQL = new StringBuffer(128)
.append( "DELETE FROM " ).append( tableName )
.append( " WHERE " )
.append( MESSAGE_ID_COLUMN ).append( " = ?" )
.toString();
deleteByStateSQL = new StringBuffer(128)
.append( "DELETE FROM " ).append( tableName )
.append( " WHERE " )
.append( STATE_COLUMN ).append( " = ?" )
.toString();
selectStateSQL = new StringBuffer(128)
.append( "SELECT " )
.append( STATE_COLUMN )
.append( " FROM " ).append( tableName )
.append( " WHERE " )
.append( MESSAGE_ID_COLUMN ).append( " = ?" )
.append( " AND " )
.append( CONSUMER_ID_COLUMN ).append( " = ?" )
.toString();
selectStatesByMsgSQL = new StringBuffer(128)
.append( "SELECT " )
.append( CONSUMER_ID_COLUMN ).append( ", " )
.append( STATE_COLUMN )
.append( " FROM " ).append( tableName )
.append( " WHERE " )
.append( MESSAGE_ID_COLUMN ).append( " = ?" )
.toString();
selectTransactionSQL = new StringBuffer(128)
.append( "SELECT " )
.append( TRANSACTION_ID_COLUMN )
.append( " FROM " ).append( tableName )
.append( " WHERE " )
.append( MESSAGE_ID_COLUMN ).append( " = ?" )
.append( " AND " )
.append( CONSUMER_ID_COLUMN ).append( " = ?" )
.toString();
selectCountByMsgSQL = new StringBuffer(128)
.append( "SELECT COUNT(*) FROM " )
.append( tableName )
.append( " WHERE " )
.append( MESSAGE_ID_COLUMN ).append( " = ?" )
.toString();
selectConsumerIDsByMsgSQL = new StringBuffer(128)
.append( "SELECT " )
.append( CONSUMER_ID_COLUMN )
.append( " FROM " ).append( tableName )
.append( " WHERE " )
.append( MESSAGE_ID_COLUMN ).append( " = ? " )
.append( " AND " )
.append( STATE_COLUMN ).append( " <> " )
.append( Store.INTEREST_STATE_ACKNOWLEDGED )
.toString();
selectTransactionAcksSQL = new StringBuffer(128)
.append( "SELECT " )
.append( CONSUMER_ID_COLUMN ).append( ", " )
.append( MESSAGE_ID_COLUMN )
.append( " FROM " ).append( tableName )
.append( " WHERE " )
.append( TRANSACTION_ID_COLUMN ).append( " = ?" )
.toString();
selectAllTransactionAcksSQL = new StringBuffer(128)
.append( "SELECT " )
.append( TRANSACTION_ID_COLUMN ).append( ", " )
.append( CONSUMER_ID_COLUMN ).append( ", " )
.append( MESSAGE_ID_COLUMN )
.append( " FROM " ).append( tableName )
.append( " WHERE " )
.append( TRANSACTION_ID_COLUMN ).append( " IS NOT NULL" )
.toString();
}
/**
* Get the prefix name of the table.
* @return table name
*/
public final String getTableNamePrefix() {
return TABLE_NAME_PREFIX;
}
/**
* Get the name of the table.
* @return table name
*/
public String getTableName() {
return tableName;
}
/**
* Insert a new entry.
* @param conn database connection
* @param dstID the destination ID
* @param sysMsgID the system message ID
* @param conUIDs an array of consumer ids
* @param states an array of states
* @throws BrokerException
*/
public void insert( Connection conn, String dstID, SysMessageID sysMsgID,
ConsumerUID[] conUIDs, int[] states, boolean checkMsgExist )
throws BrokerException {
String msgID = sysMsgID.getUniqueName();
int count = 0;
boolean myConn = false;
PreparedStatement pstmt = null;
Exception myex = null;
try {
// Get a connection
DBManager dbMgr = DBManager.getDBManager();
if ( conn == null ) {
conn = dbMgr.getConnection( false );
myConn = true;
}
// No need to check for message existence
if ( checkMsgExist ) {
if ( getConsumerCount( conn, msgID ) > 0 ) {
// the message has a list already
throw new BrokerException(
br.getKString( BrokerResources.E_MSG_INTEREST_LIST_EXISTS, msgID ) );
}
dbMgr.getDAOFactory().getMessageDAO().checkMessage( conn, dstID, msgID );
}
boolean dobatch = dbMgr.supportsBatchUpdates();
pstmt = conn.prepareStatement( insertSQL );
for ( int len = conUIDs.length; count < len; count++ ) {
pstmt.setString( 1, msgID );
pstmt.setLong( 2, conUIDs[count].longValue() );
pstmt.setInt( 3, states[count] );
pstmt.setLong( 4, System.currentTimeMillis() );
if ( dobatch ) {
pstmt.addBatch();
} else {
pstmt.executeUpdate();
}
}
if ( dobatch ) {
pstmt.executeBatch();
}
if ( myConn ) {
conn.commit();
}
} catch ( Exception e ) {
myex = e;
if (Store.getDEBUG() && count < conUIDs.length ) {
logger.log( Logger .DEBUG, "Failed to persist interest: "
+ conUIDs[count].toString()
+ "("+conUIDs[count].getUniqueName() + ")");
}
try {
if ( (conn != null) && !conn.getAutoCommit() ) {
conn.rollback();
}
} catch ( SQLException rbe ) {
logger.log( Logger.ERROR, BrokerResources.X_DB_ROLLBACK_FAILED, rbe );
}
Exception ex;
if ( e instanceof BrokerException ) {
throw (BrokerException)e;
} else if ( e instanceof IOException ) {
ex = DBManager.wrapIOException("[" + insertSQL + "]", (IOException)e);
} else if ( e instanceof SQLException ) {
ex = DBManager.wrapSQLException("[" + insertSQL + "]", (SQLException)e);
} else {
ex = e;
}
throw new BrokerException(
br.getKString( BrokerResources.X_PERSIST_INTEREST_LIST_FAILED,
msgID ), ex );
} finally {
if ( myConn ) {
Util.close( null, pstmt, conn, myex );
} else {
Util.close( null, pstmt, null, myex );
}
}
}
/**
* Update existing entry.
* @param conn database connection
* @param dstUID the destination ID
* @param sysMsgID the system message ID
* @param conUID the consumer id
* @param state the state
* @throws BrokerException
*/
public void updateState( Connection conn, DestinationUID dstUID,
SysMessageID sysMsgID, ConsumerUID conUID, int state )
throws BrokerException {
String msgID = sysMsgID.getUniqueName();
boolean myConn = false;
PreparedStatement pstmt = null;
Exception myex = null;
try {
// Get a connection
DBManager dbMgr = DBManager.getDBManager();
if ( conn == null ) {
conn = dbMgr.getConnection( true );
myConn = true;
}
// Get the broker ID of the msg which also checks if the msg exists
String brokerID =
dbMgr.getDAOFactory().getMessageDAO().getBroker( conn, dstUID, msgID );
// For HA, state can only be udpated by the broker that owns the msg
if ( Globals.getHAEnabled() && !dbMgr.getBrokerID().equals( brokerID ) ) {
String emsg = br.getKString( BrokerResources.X_PERSIST_INTEREST_STATE_FAILED,
conUID.toString(), sysMsgID.toString() );
BrokerException be = new StoreBeingTakenOverException(
br.getKString(BrokerResources.E_STORE_BEING_TAKEN_OVER+"["+emsg+"]") );
try {
BrokerDAO dao = dbMgr.getDAOFactory().getBrokerDAO();
HABrokerInfo bkrInfo = dao.getBrokerInfo( conn, dbMgr.getBrokerID() );
logger.log( Logger.ERROR, br.getKString(BrokerResources.X_INTERNAL_EXCEPTION,
bkrInfo.toString() ), be );
} catch (Throwable t) { /* Ignore error */ }
throw be;
}
pstmt = conn.prepareStatement( updateStateSQL );
pstmt.setInt( 1, state );
pstmt.setString( 2, msgID );
pstmt.setLong( 3, conUID.longValue() );
if ( pstmt.executeUpdate() == 0 ) {
// Otherwise we're assuming the entry does not exist
throw new BrokerException(
br.getKString( BrokerResources.E_INTEREST_STATE_NOT_FOUND_IN_STORE,
conUID.toString(), msgID ), Status.NOT_FOUND );
}
} catch ( Exception e ) {
myex = e;
try {
if ( (conn != null) && !conn.getAutoCommit() ) {
conn.rollback();
}
} catch ( SQLException rbe ) {
logger.log( Logger.ERROR, BrokerResources.X_DB_ROLLBACK_FAILED, rbe );
}
Exception ex;
if ( e instanceof BrokerException ) {
throw (BrokerException)e;
} else if ( e instanceof SQLException ) {
ex = DBManager.wrapSQLException("[" + updateStateSQL + "]", (SQLException)e);
} else {
ex = e;
}
throw new BrokerException(
br.getKString( BrokerResources.X_PERSIST_INTEREST_STATE_FAILED,
conUID.toString(), sysMsgID.toString() ), ex );
} finally {
if ( myConn ) {
Util.close( null, pstmt, conn, myex );
} else {
Util.close( null, pstmt, null, myex );
}
}
}
/**
* Update existing entry.
* @param conn database connection
* @param dstUID the destination ID
* @param sysMsgID the system message ID
* @param conUID the consumer id
* @param newState the new state
* @param expectedState the expected state
* @throws BrokerException
*/
public void updateState( Connection conn, DestinationUID dstUID,
SysMessageID sysMsgID, ConsumerUID conUID, int newState,
int expectedState ) throws BrokerException {
String msgID = sysMsgID.getUniqueName();
boolean myConn = false;
PreparedStatement pstmt = null;
Exception myex = null;
try {
// Get a connection
if ( conn == null ) {
conn = DBManager.getDBManager().getConnection( true );
myConn = true;
}
// Since this method is used to update the state of a consumer
// during a takeover, we will skip checking if the msg exists
// to improve performance.
pstmt = conn.prepareStatement( updateState2SQL );
pstmt.setInt( 1, newState );
pstmt.setString( 2, msgID );
pstmt.setLong( 3, conUID.longValue() );
pstmt.setInt( 4, expectedState );
if ( pstmt.executeUpdate() == 0 ) {
// Verify if record is not updated because state doesn't match
int currentState = getState( conn, sysMsgID, conUID );
if ( currentState != expectedState ) {
String[] args = { conUID.toString(),
sysMsgID.toString(),
String.valueOf(expectedState),
String.valueOf(currentState) };
throw new BrokerException(
br.getKString( BrokerResources.E_PERSIST_INTEREST_STATE_FAILED,
args ), Status.PRECONDITION_FAILED );
}
// Otherwise we're assuming the entry does not exist
throw new BrokerException(
br.getKString( BrokerResources.E_INTEREST_STATE_NOT_FOUND_IN_STORE,
conUID.toString(), msgID ), Status.NOT_FOUND );
}
} catch ( Exception e ) {
myex = e;
try {
if ( (conn != null) && !conn.getAutoCommit() ) {
conn.rollback();
}
} catch ( SQLException rbe ) {
logger.log( Logger.ERROR, BrokerResources.X_DB_ROLLBACK_FAILED, rbe );
}
Exception ex;
if ( e instanceof BrokerException ) {
throw (BrokerException)e;
} else if ( e instanceof SQLException ) {
ex = DBManager.wrapSQLException("[" + updateState2SQL + "]", (SQLException)e);
} else {
ex = e;
}
throw new BrokerException(
br.getKString( BrokerResources.X_PERSIST_INTEREST_STATE_FAILED,
conUID.toString(), sysMsgID.toString() ), ex );
} finally {
if ( myConn ) {
Util.close( null, pstmt, conn, myex );
} else {
Util.close( null, pstmt, null, myex );
}
}
}
/**
* Update existing entry.
* @param conn database connection
* @param sysMsgID the system message ID
* @param conUID the consumer id
* @param txnUID the transaction id associated with an acknowledgment
* @throws BrokerException
*/
public void updateTransaction( Connection conn, SysMessageID sysMsgID,
ConsumerUID conUID, TransactionUID txnUID ) throws BrokerException {
String msgID = sysMsgID.getUniqueName();
boolean myConn = false;
PreparedStatement pstmt = null;
Exception myex = null;
try {
// Get a connection
DBManager dbMgr = DBManager.getDBManager();
if ( conn == null ) {
conn = dbMgr.getConnection( true );
myConn = true;
}
// Make sure the transaction exists
dbMgr.getDAOFactory().getTransactionDAO().checkTransaction(
conn, txnUID.longValue() );
pstmt = conn.prepareStatement( updateTransactionSQL );
pstmt.setLong( 1, txnUID.longValue() );
pstmt.setString( 2, msgID );
pstmt.setLong( 3, conUID.longValue() );
if ( Globals.getHAEnabled() ) {
pstmt.setString( 4, dbMgr.getBrokerID() );
}
if ( pstmt.executeUpdate() == 0 ) {
// For HA mode, check if this broker still owns the store
if ( Globals.getHAEnabled() ) {
String brokerID = dbMgr.getBrokerID();
BrokerDAO dao = dbMgr.getDAOFactory().getBrokerDAO();
if ( dao.isBeingTakenOver( conn, brokerID ) ) {
BrokerException be = new StoreBeingTakenOverException(
br.getKString( BrokerResources.E_STORE_BEING_TAKEN_OVER ) );
try {
HABrokerInfo bkrInfo = dao.getBrokerInfo( conn, brokerID );
logger.log( Logger.ERROR, br.getKString(
BrokerResources.X_INTERNAL_EXCEPTION,
bkrInfo.toString() ), be );
} catch (Throwable t) { /* Ignore error */ }
throw be;
}
}
// Check if ack exists
long existingTxnID = getTransaction( conn, sysMsgID, conUID );
if ( existingTxnID > 0 ) {
String ack = "[" + sysMsgID + "]" + conUID;
throw new TransactionAckExistException(
br.getKString( BrokerResources.X_PERSIST_TXNACK_FAILED,
ack, txnUID ) + " : " +
br.getKString( BrokerResources.E_ACK_EXISTS_IN_STORE,
ack, String.valueOf( existingTxnID ) ), Status.CONFLICT );
}
// We're assuming the entry does not exist
throw new BrokerException(
br.getKString( BrokerResources.E_INTEREST_STATE_NOT_FOUND_IN_STORE,
conUID.toString(), msgID ), Status.NOT_FOUND );
}
} catch ( Exception e ) {
myex = e;
try {
if ( (conn != null) && !conn.getAutoCommit() ) {
conn.rollback();
}
} catch ( SQLException rbe ) {
logger.log( Logger.ERROR, BrokerResources.X_DB_ROLLBACK_FAILED, rbe );
}
Exception ex;
if ( e instanceof BrokerException ) {
throw (BrokerException)e;
} else if ( e instanceof SQLException ) {
ex = DBManager.wrapSQLException("[" + updateTransactionSQL + "]", (SQLException)e);
} else {
ex = e;
}
throw new BrokerException(
br.getKString( BrokerResources.X_PERSIST_INTEREST_STATE_FAILED,
conUID.toString(), sysMsgID.toString() ), ex );
} finally {
if ( myConn ) {
Util.close( null, pstmt, conn, myex );
} else {
Util.close( null, pstmt, null, myex );
}
}
}
/**
* Clear the transaction from all consumer states associated with it.
* @param conn Database Connection
* @param txnUID the transaction
* @throws BrokerException
*/
public void clearTransaction( Connection conn, TransactionUID txnUID )
throws BrokerException {
long txnID = txnUID.longValue();
boolean myConn = false;
PreparedStatement pstmt = null;
Exception myex = null;
try {
// Get a connection
DBManager dbMgr = DBManager.getDBManager();
if ( conn == null ) {
conn = dbMgr.getConnection( true );
myConn = true;
}
pstmt = conn.prepareStatement( clearTxnSQL );
pstmt.setLong( 1, txnID );
if ( Globals.getHAEnabled() ) {
pstmt.setString( 2, dbMgr.getBrokerID() );
}
if ( pstmt.executeUpdate() == 0 ) {
// For HA mode, check if this broker still owns the store
if ( Globals.getHAEnabled() ) {
String brokerID = dbMgr.getBrokerID();
BrokerDAO dao = dbMgr.getDAOFactory().getBrokerDAO();
if ( dao.isBeingTakenOver( conn, brokerID ) ) {
BrokerException be = new StoreBeingTakenOverException(
br.getKString( BrokerResources.E_STORE_BEING_TAKEN_OVER ) );
try {
HABrokerInfo bkrInfo = dao.getBrokerInfo( conn, brokerID );
logger.log( Logger.ERROR, br.getKString(
BrokerResources.X_INTERNAL_EXCEPTION,
bkrInfo.toString() ), be );
} catch (Throwable t) { /* Ignore error */ }
throw be;
}
}
}
} catch ( Exception e ) {
myex = e;
try {
if ( (conn != null) && !conn.getAutoCommit() ) {
conn.rollback();
}
} catch ( SQLException rbe ) {
logger.log( Logger.ERROR, BrokerResources.X_DB_ROLLBACK_FAILED, rbe );
}
Exception ex;
if ( e instanceof BrokerException ) {
throw (BrokerException)e;
} else if ( e instanceof SQLException ) {
ex = DBManager.wrapSQLException("[" + clearTxnSQL + "]", (SQLException)e);
} else {
ex = e;
}
throw new BrokerException(
br.getKString( BrokerResources.X_CLEAR_TXN_FROM_INT_STATES_FAILED,
txnUID ), ex );
} finally {
if ( myConn ) {
Util.close( null, pstmt, conn, myex );
} else {
Util.close( null, pstmt, null, myex );
}
}
}
/**
* Delete all consumer states for a message.
* @param conn Database Connection
* @param sysMsgID the SysMessageID
* @throws BrokerException
*/
public void deleteByMessageID( Connection conn, SysMessageID sysMsgID )
throws BrokerException {
String msgID = sysMsgID.getUniqueName();
boolean myConn = false;
PreparedStatement pstmt = null;
Exception myex = null;
try {
// Get a connection
if ( conn == null ) {
conn = DBManager.getDBManager().getConnection( true );
myConn = true;
}
pstmt = conn.prepareStatement( deleteByMsgSQL );
pstmt.setString( 1, msgID );
pstmt.executeUpdate();
} catch ( Exception e ) {
myex = e;
try {
if ( (conn != null) && !conn.getAutoCommit() ) {
conn.rollback();
}
} catch ( SQLException rbe ) {
logger.log( Logger.ERROR, BrokerResources.X_DB_ROLLBACK_FAILED, rbe );
}
Exception ex;
if ( e instanceof BrokerException ) {
throw (BrokerException)e;
} else if ( e instanceof SQLException ) {
ex = DBManager.wrapSQLException("[" + deleteByMsgSQL + "]", (SQLException)e);
} else {
ex = e;
}
throw new BrokerException(
br.getKString( BrokerResources.X_REMOVE_INTEREST_STATE_FAILED,
msgID ), ex );
} finally {
if ( myConn ) {
Util.close( null, pstmt, conn, myex );
} else {
Util.close( null, pstmt, null, myex );
}
}
}
/**
* Delete all consumer states for a transaction.
* @param conn Database Connection
* @param txnUID the transaction
* @throws BrokerException
*/
public void deleteByTransaction( Connection conn, TransactionUID txnUID )
throws BrokerException {
long txnID = txnUID.longValue();
boolean myConn = false;
PreparedStatement pstmt = null;
Exception myex = null;
try {
// Get a connection
if ( conn == null ) {
conn = DBManager.getDBManager().getConnection( true );
myConn = true;
}
pstmt = conn.prepareStatement( deleteByTxnSQL );
pstmt.setLong( 1, txnID );
pstmt.executeUpdate();
} catch ( Exception e ) {
myex = e;
try {
if ( (conn != null) && !conn.getAutoCommit() ) {
conn.rollback();
}
} catch ( SQLException rbe ) {
logger.log( Logger.ERROR, BrokerResources.X_DB_ROLLBACK_FAILED, rbe );
}
Exception ex;
if ( e instanceof BrokerException ) {
throw (BrokerException)e;
} else if ( e instanceof SQLException ) {
ex = DBManager.wrapSQLException("[" + deleteByTxnSQL + "]", (SQLException)e);
} else {
ex = e;
}
throw new BrokerException(
br.getKString( BrokerResources.X_REMOVE_INT_STATES_FOR_TXN_FAILED,
txnUID ), ex );
} finally {
if ( myConn ) {
Util.close( null, pstmt, conn, myex );
} else {
Util.close( null, pstmt, null, myex );
}
}
}
/**
* Delete all consumer states for a destination.
* Note: Because Consumer State table is a child table to the Message table,
* this method will be call from MessageDAO.deleteByDestination() to delete
* all consumer states before it deletes the messages.
* @param conn Database Connection
* @param dstUID the destination
* @throws BrokerException
*/
public void deleteByDestination( Connection conn, DestinationUID dstUID )
throws BrokerException {
String dstID = dstUID.toString();
boolean myConn = false;
PreparedStatement pstmt = null;
Exception myex = null;
try {
// Get a connection
DBManager dbMgr = DBManager.getDBManager();
if ( conn == null ) {
conn = dbMgr.getConnection( true );
myConn = true;
}
pstmt = conn.prepareStatement( deleteByDstSQL );
pstmt.setString( 1, dbMgr.getBrokerID() );
pstmt.setString( 2, dstID );
pstmt.executeUpdate();
} catch ( Exception e ) {
myex = e;
try {
if ( (conn != null) && !conn.getAutoCommit() ) {
conn.rollback();
}
} catch ( SQLException rbe ) {
logger.log( Logger.ERROR, BrokerResources.X_DB_ROLLBACK_FAILED, rbe );
}
Exception ex;
if ( e instanceof BrokerException ) {
throw (BrokerException)e;
} else if ( e instanceof SQLException ) {
ex = DBManager.wrapSQLException("[" + deleteByDstSQL + "]", (SQLException)e);
} else {
ex = e;
}
throw new BrokerException(
br.getKString( BrokerResources.X_REMOVE_INT_STATES_FOR_DST_FAILED,
dstID ), ex );
} finally {
if ( myConn ) {
Util.close( null, pstmt, conn, myex );
} else {
Util.close( null, pstmt, null, myex );
}
}
}
/**
* Delete all entries.
* @param conn Database Connection
* @throws BrokerException
*/
public void deleteAll( Connection conn )
throws BrokerException {
String whereClause = null;
if ( Globals.getHAEnabled() ) {
// Only delete consumer states that belong to the running broker,
// construct the where clause for the delete statement:
// DELETE FROM mqconstate41cmycluster WHERE message_id IN
// (SELECT id FROM mqmsg41cmycluster msgtbl,
// mqses41cmycluster sestbl
// WHERE sestbl.broker_id = 'mybroker' AND
// sestbl.id = msgtbl.store_session_id)
DBManager dbMgr = DBManager.getDBManager();
whereClause = new StringBuffer(128)
.append( MESSAGE_ID_COLUMN )
.append( " IN (SELECT msgTbl." ).append( MessageDAO.ID_COLUMN )
.append( " FROM " )
.append( dbMgr.getTableName( MessageDAO.TABLE_NAME_PREFIX ) )
.append( " msgTbl, " )
.append( dbMgr.getTableName( StoreSessionDAO.TABLE_NAME_PREFIX ) )
.append( " sesTbl" )
.append( " WHERE sesTbl." ).append( StoreSessionDAO.BROKER_ID_COLUMN )
.append( " = '" ).append( dbMgr.getBrokerID() )
.append( "' AND sesTbl." ).append( StoreSessionDAO.ID_COLUMN )
.append( " = msgTbl." ).append( MessageDAO.STORE_SESSION_ID_COLUMN )
.append( ")" )
.toString();
}
deleteAll( conn, whereClause, null, 0 );
}
/**
* Get consumer's state.
* @param conn database connection
* @param sysMsgID the system message ID
* @param conUID the consumer ID
* @return consumer's state
* @throws com.sun.messaging.jmq.jmsserver.util.BrokerException
*/
public int getState( Connection conn, SysMessageID sysMsgID,
ConsumerUID conUID ) throws BrokerException {
int state = -1;
String id = sysMsgID.getUniqueName();
boolean myConn = false;
PreparedStatement pstmt = null;
ResultSet rs = null;
Exception myex = null;
try {
// Get a connection
if ( conn == null ) {
conn = DBManager.getDBManager().getConnection( true );
myConn = true;
}
pstmt = conn.prepareStatement( selectStateSQL );
pstmt.setString( 1, id );
pstmt.setLong( 2, conUID.longValue() );
rs = pstmt.executeQuery();
if ( rs.next() ) {
state = rs.getInt( 1 );
} else {
// We are assuming the consumer state does not exist
throw new BrokerException(
br.getKString( BrokerResources.E_INTEREST_STATE_NOT_FOUND_IN_STORE,
conUID.toString(), id ), Status.NOT_FOUND );
}
} catch ( Exception e ) {
myex = e;
try {
if ( (conn != null) && !conn.getAutoCommit() ) {
conn.rollback();
}
} catch ( SQLException rbe ) {
logger.log( Logger.ERROR, BrokerResources.X_DB_ROLLBACK_FAILED, rbe );
}
Exception ex;
if ( e instanceof BrokerException ) {
throw (BrokerException)e;
} else if ( e instanceof SQLException ) {
ex = DBManager.wrapSQLException("[" + selectStateSQL + "]", (SQLException)e);
} else {
ex = e;
}
throw new BrokerException(
br.getKString( BrokerResources.X_GET_INTEREST_STATE_FAILED,
conUID.toString(), id ), ex );
} finally {
if ( myConn ) {
Util.close( rs, pstmt, conn, myex );
} else {
Util.close( rs, pstmt, null, myex );
}
}
return state;
}
/**
* Get all consumers and states associated with the messapge ID.
* @param conn database connection
* @return HashMap of containing all consumer's state
* @throws BrokerException
*/
public HashMap getStates( Connection conn, SysMessageID sysMsgID )
throws BrokerException {
HashMap map = new HashMap();
String id = sysMsgID.getUniqueName();
boolean myConn = false;
PreparedStatement pstmt = null;
ResultSet rs = null;
Exception myex = null;
try {
// Get a connection
if ( conn == null ) {
conn = DBManager.getDBManager().getConnection( true );
myConn = true;
}
pstmt = conn.prepareStatement( selectStatesByMsgSQL );
pstmt.setString( 1, id );
rs = pstmt.executeQuery();
while ( rs.next() ) {
ConsumerUID cUID = new ConsumerUID( rs.getLong( 1 ) );
int state = rs.getInt( 2 );
map.put( cUID, Integer.valueOf( state ) );
}
} catch ( Exception e ) {
myex = e;
try {
if ( (conn != null) && !conn.getAutoCommit() ) {
conn.rollback();
}
} catch ( SQLException rbe ) {
logger.log( Logger.ERROR, BrokerResources.X_DB_ROLLBACK_FAILED, rbe );
}
Exception ex;
if ( e instanceof BrokerException ) {
throw (BrokerException)e;
} else if ( e instanceof SQLException ) {
ex = DBManager.wrapSQLException("[" + selectStatesByMsgSQL + "]", (SQLException)e);
} else {
ex = e;
}
throw new BrokerException(
br.getKString(
BrokerResources.X_LOAD_INT_STATES_FOR_MSG_FAILED, id ), ex );
} finally {
if ( myConn ) {
Util.close( rs, pstmt, conn, myex );
} else {
Util.close( rs, pstmt, null, myex );
}
}
return map;
}
/**
* Get consumer's transaction.
* @param conn database connection
* @param sysMsgID the system message ID
* @param conUID the consumer ID
* @return consumer's transaction
*/
public long getTransaction( Connection conn, SysMessageID sysMsgID,
ConsumerUID conUID ) throws BrokerException {
long txnID = -1;
String id = sysMsgID.getUniqueName();
boolean myConn = false;
PreparedStatement pstmt = null;
ResultSet rs = null;
Exception myex = null;
try {
// Get a connection
if ( conn == null ) {
conn = DBManager.getDBManager().getConnection( true );
myConn = true;
}
pstmt = conn.prepareStatement( selectTransactionSQL );
pstmt.setString( 1, id );
pstmt.setLong( 2, conUID.longValue() );
rs = pstmt.executeQuery();
if ( rs.next() ) {
txnID = rs.getLong( 1 );
}
} catch ( Exception e ) {
myex = e;
try {
if ( (conn != null) && !conn.getAutoCommit() ) {
conn.rollback();
}
} catch ( SQLException rbe ) {
logger.log( Logger.ERROR, BrokerResources.X_DB_ROLLBACK_FAILED, rbe );
}
Exception ex;
if ( e instanceof BrokerException ) {
throw (BrokerException)e;
} else if ( e instanceof SQLException ) {
ex = DBManager.wrapSQLException("[" + selectTransactionSQL + "]", (SQLException)e);
} else {
ex = e;
}
throw new BrokerException(
br.getKString( BrokerResources.X_RETRIEVE_INTEREST_FAILED,
conUID.toString() ), ex );
} finally {
if ( myConn ) {
Util.close( rs, pstmt, conn, myex );
} else {
Util.close( rs, pstmt, null, myex );
}
}
return txnID;
}
/**
* Get all consumers associated with the messapge ID and state is not
* INTEREST_STATE_ACKNOWLEDGED.
* @param conn database connection
* @param sysMsgID the system message ID
* @return list of consumer IDs
* @throws BrokerException
*/
public List getConsumerUIDs( Connection conn, SysMessageID sysMsgID )
throws BrokerException {
List list = new ArrayList();
String id = sysMsgID.getUniqueName();
boolean myConn = false;
PreparedStatement pstmt = null;
ResultSet rs = null;
Exception myex = null;
try {
// Get a connection
if ( conn == null ) {
conn = DBManager.getDBManager().getConnection( true );
myConn = true;
}
pstmt = conn.prepareStatement( selectConsumerIDsByMsgSQL );
pstmt.setString( 1, id );
rs = pstmt.executeQuery();
while ( rs.next() ) {
ConsumerUID cUID = new ConsumerUID( rs.getLong( 1 ) );
list.add( cUID );
}
} catch ( Exception e ) {
myex = e;
try {
if ( (conn != null) && !conn.getAutoCommit() ) {
conn.rollback();
}
} catch ( SQLException rbe ) {
logger.log( Logger.ERROR, BrokerResources.X_DB_ROLLBACK_FAILED, rbe );
}
Exception ex;
if ( e instanceof BrokerException ) {
throw (BrokerException)e;
} else if ( e instanceof SQLException ) {
ex = DBManager.wrapSQLException("[" + selectConsumerIDsByMsgSQL + "]", (SQLException)e);
} else {
ex = e;
}
throw new BrokerException(
br.getKString(
BrokerResources.X_LOAD_INT_STATES_FOR_MSG_FAILED, id ), ex );
} finally {
if ( myConn ) {
Util.close( rs, pstmt, conn, myex );
} else {
Util.close( rs, pstmt, null, myex );
}
}
return list;
}
/**
* Retrieve all transaction acknowledgements for the specified transaction ID.
* @param conn database connection
* @param txnUID the transaction ID
* @return List of transaction acks
* @throws BrokerException
*/
public List getTransactionAcks( Connection conn, TransactionUID txnUID )
throws BrokerException {
List data = new ArrayList();
boolean myConn = false;
PreparedStatement pstmt = null;
ResultSet rs = null;
Exception myex = null;
try {
// Get a connection
if ( conn == null ) {
conn = DBManager.getDBManager().getConnection( true );
myConn = true;
}
pstmt = conn.prepareStatement( selectTransactionAcksSQL );
pstmt.setLong( 1, txnUID.longValue() );
rs = pstmt.executeQuery();
while ( rs.next() ) {
ConsumerUID conID = new ConsumerUID( rs.getLong( 1 ) );
try {
SysMessageID msgID = SysMessageID.get( rs.getString( 2 ) );
data.add( new TransactionAcknowledgement( msgID, conID, conID ) );
} catch ( Exception e ) {
// fail to parse one object; just log it
logger.logStack( Logger.ERROR,
BrokerResources.X_PARSE_TXNACK_FAILED, txnUID, e );
}
}
} catch ( Exception e ) {
myex = e;
try {
if ( (conn != null) && !conn.getAutoCommit() ) {
conn.rollback();
}
} catch ( SQLException rbe ) {
logger.log( Logger.ERROR, BrokerResources.X_DB_ROLLBACK_FAILED, rbe );
}
Exception ex;
if ( e instanceof BrokerException ) {
throw (BrokerException)e;
} else if ( e instanceof SQLException ) {
ex = DBManager.wrapSQLException("[" + selectTransactionAcksSQL + "]", (SQLException)e);
} else {
ex = e;
}
throw new BrokerException(
br.getKString( BrokerResources.X_LOAD_ACKS_FOR_TXN_FAILED,
txnUID ), ex );
} finally {
if ( myConn ) {
Util.close( rs, pstmt, conn, myex );
} else {
Util.close( rs, pstmt, null, myex );
}
}
return data;
}
/**
* Retrieve all transaction acknowledgements for all transactions.
* @param conn database connection
* @return HashMap of containing all acknowledgement
* @throws BrokerException
*/
public HashMap getAllTransactionAcks( Connection conn )
throws BrokerException {
HashMap data = new HashMap(100);
boolean myConn = false;
PreparedStatement pstmt = null;
ResultSet rs = null;
Exception myex = null;
try {
// Get a connection
if ( conn == null ) {
conn = DBManager.getDBManager().getConnection( true );
myConn = true;
}
pstmt = conn.prepareStatement( selectAllTransactionAcksSQL );
rs = pstmt.executeQuery();
while ( rs.next() ) {
TransactionUID txnUID = new TransactionUID( rs.getLong( 1 ) );
ConsumerUID conID = new ConsumerUID( rs.getLong( 2 ) );
try {
SysMessageID msgID = SysMessageID.get( rs.getString( 3 ) );
List ackList = (List)data.get( txnUID );
if ( ackList == null ) {
// Create a new list of acks for this txn
ackList = new ArrayList(25);
data.put( txnUID, ackList );
}
// Added ack to the list of acks
ackList.add( new TransactionAcknowledgement( msgID, conID, conID ) );
} catch ( Exception e ) {
// fail to parse one object; just log it
logger.logStack( Logger.ERROR,
BrokerResources.X_PARSE_TXNACK_FAILED, txnUID, e );
}
}
} catch ( Exception e ) {
myex = e;
try {
if ( (conn != null) && !conn.getAutoCommit() ) {
conn.rollback();
}
} catch ( SQLException rbe ) {
logger.log( Logger.ERROR, BrokerResources.X_DB_ROLLBACK_FAILED, rbe );
}
Exception ex;
if ( e instanceof BrokerException ) {
throw (BrokerException)e;
} else if ( e instanceof SQLException ) {
ex = DBManager.wrapSQLException("[" + selectAllTransactionAcksSQL + "]", (SQLException)e);
} else {
ex = e;
}
throw new BrokerException(
br.getKString( BrokerResources.X_LOAD_TXNACK_FAILED ), ex );
} finally {
if ( myConn ) {
Util.close( rs, pstmt, conn, myex );
} else {
Util.close( rs, pstmt, null, myex );
}
}
// Transforms HashMap value to TransactionAcknowledgement[] instead of List
Set keySet = data.keySet();
if ( !keySet.isEmpty() ) {
Iterator itr = keySet.iterator();
while ( itr.hasNext() ) {
TransactionUID txnUID = (TransactionUID)itr.next();
List ackList = (List)data.get( txnUID );
data.put( txnUID, ackList.toArray(
new TransactionAcknowledgement[0] ) );
}
}
return data;
}
/**
* Get debug information about the store.
* @param conn database connection
* @return A HashMap of name value pair of information
*/
public HashMap getDebugInfo( Connection conn ) {
HashMap map = new HashMap();
int count = -1;
try {
// Get row count
count = getRowCount( null, null );
} catch ( Exception e ) {
logger.log( Logger.ERROR, e.getMessage(), e.getCause() );
}
map.put( "Message/Consumer states(" + tableName + ")", String.valueOf( count ) );
return map;
}
/**
* Get the number of consumers (e.g. interest) for the specified message.
* @param conn database connection
* @param msgID the message ID
* @return number of consumers
* @throws BrokerException
*/
public int getConsumerCount( Connection conn, String msgID )
throws BrokerException {
int count = -1;
boolean myConn = false;
PreparedStatement pstmt = null;
ResultSet rs = null;
Exception myex = null;
try {
// Get a connection
if ( conn == null ) {
conn = DBManager.getDBManager().getConnection( false );
myConn = true;
}
pstmt = conn.prepareStatement( selectCountByMsgSQL );
pstmt.setString( 1, msgID );
rs = pstmt.executeQuery();
if ( rs.next() ) {
count = rs.getInt( 1 );
}
} catch ( Exception e ) {
myex = e;
try {
if ( (conn != null) && !conn.getAutoCommit() ) {
conn.rollback();
}
} catch ( SQLException rbe ) {
logger.log( Logger.ERROR, BrokerResources.X_DB_ROLLBACK_FAILED+"["+selectCountByMsgSQL+"]", rbe );
}
Exception ex;
if ( e instanceof BrokerException ) {
throw (BrokerException)e;
} else if ( e instanceof SQLException ) {
ex = DBManager.wrapSQLException("[" + selectCountByMsgSQL + "]", (SQLException)e);
} else {
ex = e;
}
throw new BrokerException(
br.getKString( BrokerResources.X_LOAD_INT_STATES_FOR_MSG_FAILED,
msgID ), ex );
} finally {
if ( myConn ) {
Util.close( rs, pstmt, conn, myex );
} else {
Util.close( rs, pstmt, null, myex );
}
}
return count;
}
}