Package org.objectweb.joram.client.jms

Source Code of org.objectweb.joram.client.jms.XAContext

/*
* JORAM: Java(TM) Open Reliable Asynchronous Messaging
* Copyright (C) 2004 - 2009 ScalAgent Distributed Technologies
* Copyright (C) 2004 - 2000 Bull SA
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307
* USA.
*
* Initial developer(s): Frederic Maistre (INRIA)
* Contributor(s): ScalAgent Distributed Technologies
*/
package org.objectweb.joram.client.jms;

import java.util.Enumeration;
import java.util.Hashtable;
import java.util.Vector;

import javax.jms.JMSException;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
import javax.transaction.xa.Xid;

import org.objectweb.joram.shared.client.ProducerMessages;
import org.objectweb.joram.shared.client.SessAckRequest;
import org.objectweb.joram.shared.client.XACnxCommit;
import org.objectweb.joram.shared.client.XACnxPrepare;
import org.objectweb.joram.shared.client.XACnxRecoverReply;
import org.objectweb.joram.shared.client.XACnxRecoverRequest;
import org.objectweb.joram.shared.client.XACnxRollback;
import org.objectweb.util.monolog.api.BasicLevel;
import org.objectweb.util.monolog.api.Logger;

import fr.dyade.aaa.common.Debug;

/**
* Utility class used by XA connections for managing XA resources.
*/
public class XAResourceMngr {
  /** Transaction active. */
  public static final int STARTED = 0;
  /** Transaction suspended. */
  public static final int SUSPENDED = 1;
  /** Transaction successful. */
  public static final int SUCCESS = 2;
  /** Failed transaction. */
  public static final int ROLLBACK_ONLY = 3;
  /** Prepared transaction. */
  public static final int PREPARED = 4;

  private static Logger logger = Debug.getLogger(XAResourceMngr.class.getName());

  /**
   * The table of known transactions.
   * <p>
   * <b>Key:</b> transaction identifier<br>
   * <b>Object:</b> <code>XAContext</code> instance
   */
  private Hashtable transactions;

  /** The connection this manager belongs to. */
  Connection cnx;

  /** table of Session (key Xid). */
  Hashtable sessionTable;

  /**
   * Creates a <code>XAResourceMngr</code> instance.
   *
   * @param cnx   The connection this manager belongs to.
   */
  public XAResourceMngr(Connection cnx) {
    this.cnx = cnx;
    transactions = new Hashtable();
    sessionTable = new Hashtable();

    if (logger.isLoggable(BasicLevel.DEBUG))
      logger.log(BasicLevel.DEBUG,
                                 " XAResourceMngr cnx = " + cnx);
  }

  /**
   * Notifies the RM that a transaction is starting.
   *
   * @exception XAException  If the specified transaction is already known by
   *                         the RM in an incompatible state with the start
   *                         request.
   */
  synchronized void start(Xid xid, int flag,
                          Session sess) throws XAException {
    if (logger.isLoggable(BasicLevel.DEBUG))
      logger.log(BasicLevel.DEBUG,
                                 " XAResourceMngr start(" + xid + ", " + flag + ", " + sess +")");

    sess.setTransacted(true); // for XAResource.TMRESUME
    sessionTable.put(xid,sess);

    // New transaction.
    if (flag == XAResource.TMNOFLAGS) {
      if (transactions.containsKey(xid))
        throw new XAException("Can't start transaction already known by RM.");

      transactions.put(xid, new XAContext());

      if (logger.isLoggable(BasicLevel.DEBUG))
        logger.log(BasicLevel.DEBUG,
                                   "--- " + this + ": involved in transaction " + xid.toString());
    } else if (flag == XAResource.TMRESUME) {
      // Resumed transaction.
      if (! transactions.containsKey(xid))
        throw new XAException("Can't resume unknown transaction.");

      if (logger.isLoggable(BasicLevel.DEBUG))
        logger.log(BasicLevel.DEBUG,
                                   "--- " + this + ": resumes transaction " + xid.toString());
    } else if (flag == XAResource.TMJOIN) {
      // Already known transaction.
      if (! transactions.containsKey(xid))
        throw new XAException("Can't join unknown transaction.");

      if (logger.isLoggable(BasicLevel.DEBUG))
        logger.log(BasicLevel.DEBUG,
                                   "--- " + this + ": joins transaction " + xid.toString());
    } else
      throw new XAException("Invalid flag: " + flag);

    setStatus(xid, STARTED);
  }

  /**
   * Notifies the RM that a transaction is ended.
   *
   * @exception XAException  If the specified transaction is in an
   *                         incompatible state with the end request.
   */
  synchronized void end(Xid xid, int flag,
                        Session sess) throws XAException {
    boolean saveResourceState = true;

    if (logger.isLoggable(BasicLevel.DEBUG))
      logger.log(BasicLevel.DEBUG,
                                 "--- " + this + ": end(" + xid + ", " + flag + ", " + sess + ")");
   
    if (flag == XAResource.TMSUSPEND) {
      if (getStatus(xid) != STARTED)
        throw new XAException("Can't suspend non started transaction.");

      setStatus(xid, SUSPENDED);
    } else {
      if (getStatus(xid) != STARTED && getStatus(xid) != SUSPENDED)
        throw new XAException("Can't end non active or non "
                              + "suspended transaction.");

      // No need to save the resource's state as it has already been done
      // when suspending it.
      if (getStatus(xid) == SUSPENDED)
        saveResourceState = false;

      if (flag == XAResource.TMSUCCESS)
        setStatus(xid, SUCCESS);
      else if (flag == XAResource.TMFAIL)
        setStatus(xid, ROLLBACK_ONLY);
      else
        throw new XAException("Invalid flag: " + flag);
    }

    if (saveResourceState) {
      XAContext xaC = (XAContext) transactions.get(xid);
      xaC.addSendings(sess.sendings);
      xaC.addDeliveries(sess.deliveries);
    }

    Session session = (Session) sessionTable.get(xid);
    if (logger.isLoggable(BasicLevel.DEBUG))
      logger.log(BasicLevel.DEBUG,
                                 "--- " + this + ": end(...) session="  + session);

    if (session != null) {
      session.setTransacted(false);
      sessionTable.remove(xid);
    }
  }

  /**
   * Notifies the RM that a transaction is prepared.
   *
   * @exception XAException  If the specified transaction is in an
   *                         incompatible state with the prepare request,
   *                         or if the request fails.
   */
  synchronized void prepare(Xid xid) throws XAException {
    if (logger.isLoggable(BasicLevel.DEBUG))
      logger.log(BasicLevel.DEBUG,
                                 "--- " + this  + ": prepare(" + xid + ")");
   
    try {
      if (getStatus(xid) == ROLLBACK_ONLY)
        throw new XAException("Can't prepare resource in ROLLBACK_ONLY state.");

      XAContext xaC = (XAContext) transactions.get(xid);

      if (logger.isLoggable(BasicLevel.DEBUG))
        logger.log(BasicLevel.DEBUG,
                                   "--- "
                                   + this
                                   + ": prepares transaction "
                                   + xid.toString());

      Enumeration targets;   
      String target;
      Vector pMs = new Vector();
      MessageAcks acks;
      Vector sessAcks = new Vector();

      // Getting all the ProducerMessages to send:
      targets = xaC.sendings.keys();
      while (targets.hasMoreElements()) {
        target = (String) targets.nextElement();
        pMs.add(xaC.sendings.remove(target));
      }

      // Getting all the SessAckRequest to send:
      targets = xaC.deliveries.keys();
      while (targets.hasMoreElements()) {
        target = (String) targets.nextElement();
        acks = (MessageAcks) xaC.deliveries.remove(target);
        sessAcks.add(new SessAckRequest(target, acks.getIds(),
                                        acks.getQueueMode()));
      }

      // Sending to the proxy:
      cnx.syncRequest(new XACnxPrepare(xid.getBranchQualifier(),
                                        xid.getFormatId(),
                                        xid.getGlobalTransactionId(),
                                        pMs,
                                        sessAcks));

      setStatus(xid, PREPARED);
    } catch (JMSException exc) {
      setStatus(xid, ROLLBACK_ONLY);
      throw new XAException("Prepare request failed: " + exc);
    } catch (XAException exc) {
      setStatus(xid, ROLLBACK_ONLY);
      throw exc;
    }
  }

  /**
   * Notifies the RM that a transaction is commited.
   *
   * @exception XAException  If the specified transaction is in an
   *                         incompatible state with the commit request,
   *                         or if the request fails.
   */
  synchronized void commit(Xid xid) throws XAException {
    if (logger.isLoggable(BasicLevel.DEBUG))
      logger.log(BasicLevel.DEBUG,
                                 "--- " + this + ": commit(" + xid + ")");

    try {
      if (getStatus(xid) != PREPARED)
        throw new XAException("Can't commit non prepared transaction.");

      if (logger.isLoggable(BasicLevel.DEBUG))
        logger.log(BasicLevel.DEBUG,
                                   "--- " + this + ": commits transaction " + xid.toString());

      cnx.syncRequest(new XACnxCommit(xid.getBranchQualifier(),
                                      xid.getFormatId(),
                                      xid.getGlobalTransactionId()));

      transactions.remove(xid);
      Session session = (Session) sessionTable.get(xid);
      if (session != null)
        session.setTransacted(false);

    } catch (JMSException exc) {
      setStatus(xid, ROLLBACK_ONLY);
      throw new XAException("Commit request failed: " + exc);
    } catch (XAException exc) {
      setStatus(xid, ROLLBACK_ONLY);
      throw exc;
    }
  }

  /**
   * Notifies the RM that a transaction is rolled back.
   *
   * @exception XAException  If the specified transaction is in an
   *                         incompatible state with the rollback request,
   *                         or if the request fails.
   */
  synchronized void rollback(Xid xid) throws XAException {

    if (logger.isLoggable(BasicLevel.DEBUG))
      logger.log(BasicLevel.DEBUG,
                                 "--- " + this + ": rollback(" + xid + ")");

    try {
      XAContext xaC = (XAContext) transactions.get(xid);

      if (xaC == null)
        throw new XAException("Unknown transaction.");

      if (logger.isLoggable(BasicLevel.DEBUG))
        logger.log(BasicLevel.DEBUG,
                                   "--- " + this + ": rolls back transaction " + xid.toString());

      Enumeration targets;
      String target;
      MessageAcks acks;

      XACnxRollback rollbackRequest;
 
      targets = xaC.deliveries.keys();
 
      rollbackRequest = new XACnxRollback(xid.getBranchQualifier(),
                                           xid.getFormatId(),
                                           xid.getGlobalTransactionId());

      while (targets.hasMoreElements()) {
        target = (String) targets.nextElement();
        acks = (MessageAcks) xaC.deliveries.remove(target);
        rollbackRequest.add(target, acks.getIds(), acks.getQueueMode());
      }

      // Sending to the proxy:
      cnx.syncRequest(rollbackRequest);

      transactions.remove(xid);
      Session session = (Session) sessionTable.get(xid);
      if (session != null) {
        session.setTransacted(false);
        sessionTable.remove(xid);
      }
    } catch (JMSException exc) {
      setStatus(xid, ROLLBACK_ONLY);
      throw new XAException("Rollback request failed: " + exc);
    } catch (XAException exc) {
      setStatus(xid, ROLLBACK_ONLY);
      throw exc;
    }
  }

  /**
   * Notifies the RM to recover the prepared transactions.
   *
   * @exception XAException  If the specified flag is invalid, or if the
   *                         request fails.
   */
  synchronized Xid[] recover(int flag) throws XAException {
    if (logger.isLoggable(BasicLevel.DEBUG))
      logger.log(BasicLevel.DEBUG,
                                 "--- "
                                 + this
                                 + ": recovers transactions.");

    if (flag == XAResource.TMSTARTRSCAN || flag == XAResource.TMENDRSCAN)
      throw new XAException("Non supported recovery flag: " + flag);

    try {
      XACnxRecoverReply reply =
        (XACnxRecoverReply) cnx.syncRequest(new XACnxRecoverRequest());
    
      if (reply == null) {
        return new Xid[0]
      }
     
      Xid[] xids = new Xid[reply.getSize()];

      for (int i = 0; i < reply.getSize(); i++) {
        xids[i] = new XidImpl(reply.getBranchQualifier(i),
                              reply.getFormatId(i),
                              reply.getGlobalTransactionId(i));
        transactions.put(xids[i], new XAContext());
        setStatus(xids[i], PREPARED);
      }
      return xids;
    } catch (Exception exc) {
      throw new XAException("Recovery request failed: " + exc.getMessage());
    }
  }

  /**
   * Sets the status of a transaction.
   *
   * @exception XAException  If the transaction is unknown.
   */
  private void setStatus(Xid xid, int status) throws XAException {
    XAContext xac = (XAContext) transactions.get(xid);

    if (xac == null)
      throw new XAException("Unknown transaction.");

    xac.status = status;
  }

  /**
   * Gets the status of a transaction.
   *
   * @exception XAException  If the transaction is unknown.
   */
  private int getStatus(Xid xid) throws XAException  {
    XAContext xac = (XAContext) transactions.get(xid);

    if (xac == null)
      throw new XAException("Unknown transaction.");

    return xac.status;
  }

  /** Resource managers are equal if they belong to the same connection. */
  public boolean equals(Object o) {
    if (! (o instanceof XAResourceMngr))
      return false;

    XAResourceMngr other = (XAResourceMngr) o;

    if (logger.isLoggable(BasicLevel.DEBUG))
      logger.log(BasicLevel.DEBUG,
                                 this + ": equals other = " + other.cnx +
                                 ", this.cnx = " + cnx +
                                 ", equals = " + cnx.equals(other.cnx));
   
    return cnx.equals(other.cnx);
  }

  public int hashCode() {
    return cnx.hashCode();
  }
}

/**
* Utility class holding a resource's state during transaction progress.
*/
class XAContext {
  /** The transaction status. */
  int status;
  /**
   * Table holding the <code>ProducerMessages</code> produced in the
   * transaction.
   * <p>
   * <b>Key:</b> destination name<br>
   * <b>Object:</b> <code>ProducerMessages</code>
   */
  Hashtable sendings;
  /**
   * Table holding the identifiers of the messages delivered per
   * destination or subscription, in the transaction.
   * <p>
   * <b>Key:</b> destination or subscription name<br>
   * <b>Object:</b> corresponding <code>MessageAcks</code> instance
   */
  Hashtable deliveries;


  /**
   * Constructs an <code>XAContext</code> instance.
   */
  XAContext() {
    sendings = new Hashtable();
    deliveries = new Hashtable();
  }


  /**
   * Adds new sendings performed by the resumed transaction.
   */
  void addSendings(Hashtable newSendings) {
    String newDest;
    ProducerMessages newPM;
    ProducerMessages storedPM;
    Vector msgs;

    // Browsing the destinations for which messages have been produced:
    Enumeration newDests = newSendings.keys();
    while (newDests.hasMoreElements()) {
      newDest = (String) newDests.nextElement();
      newPM = (ProducerMessages) newSendings.remove(newDest);
      storedPM = (ProducerMessages) sendings.get(newDest);
      // If messages haven't already been produced for this destination,
      // storing the new ProducerMessages object:
      if (storedPM == null)
        sendings.put(newDest, newPM);
      // Else, adding the newly produced messages to the existing
      // ProducerMessages:
      else {
        msgs = newPM.getMessages();
        for (int i = 0; i < msgs.size(); i++)
          storedPM.addMessage((org.objectweb.joram.shared.messages.Message) msgs.get(i));
      }
    }
  }

  /**
   * Adds new deliveries occured within the resumed transaction.
   */
  void addDeliveries(Hashtable newDeliveries) {
    String newName;
    MessageAcks newAcks;
    MessageAcks storedAcks;

    // Browsing the destinations or subscriptions to which messages will have
    // to be acknowledged:
    Enumeration newNames = newDeliveries.keys();
    while (newNames.hasMoreElements()) {
      newName = (String) newNames.nextElement();
      newAcks = (MessageAcks) newDeliveries.remove(newName);
      storedAcks = (MessageAcks) deliveries.get(newName);
      // If there are no messages to acknowledge for this destination or
      // subscription, storing the new vector:
      if (storedAcks == null)
        deliveries.put(newName, newAcks);
      // Else, adding the new ids to the stored ones:
      else
        storedAcks.addIds(newAcks.getIds());
    }
  }
}
TOP

Related Classes of org.objectweb.joram.client.jms.XAContext

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.