Package org.openeai.jms.producer

Source Code of org.openeai.jms.producer.QueueRequestor

/*******************************************************************************
$Source: /cvs/repositories/openii3/project/java/source/org/openeai/jms/producer/QueueRequestor.java,v $
$Revision: 1.18 $
*******************************************************************************/

/**********************************************************************
This file is part of the OpenEAI Application Foundation or
OpenEAI Message Object API created by Tod Jackson
(tod@openeai.org) and Steve Wheat (steve@openeai.org) at
the University of Illinois Urbana-Champaign.

Copyright (C) 2002 The OpenEAI Software Foundation

This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.

This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
Lesser General Public License for more details.

You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA

For specific licensing details and examples of how this software
can be used to build commercial integration software or to implement
integrations for your enterprise, visit http://www.OpenEai.org/licensing.
*/

package org.openeai.jms.producer;

import java.io.ByteArrayInputStream;

import javax.jms.*;
import javax.jms.Queue;

import org.openeai.*;

import java.util.*;

import org.jdom.output.XMLOutputter;
import org.jdom.Document;
import org.jdom.Element;

import org.openeai.xml.*;

/**
* JMS provides a QueueRequestor helper class to simplify making service
* requests.  This is the OpenEAI version of that class which leverages several
* other OpenEAI foundation components like TempQueuePool to increase
* the performance of request production.  It is thread safe so multiple threads
* can make requests using the same producer and queue requestor.
* <P>
* @author      Tod Jackson (tod@openeai.org)
* @author      Steve Wheat (steve@openeai.org)
* @version     3.0  - 28 January 2003
* @see TempQueuePool
* @see PointToPointProducer
**/
public class QueueRequestor extends OpenEaiObject {

//  private Object _lock = new Object();
  private TempQueuePool m_tempQueuePool = null;
  private QueueSession m_session =
    null; // The queue session the queue belongs to.
  private Queue m_queue = null; // The queue to perform the request/reply on.
  private QueueSender m_sender = null;
  private int m_timeoutInterval =
    10000; // Default (10 seconds), eventually, this will be
  // configurable via the Producer creating this requestor
  // and it's ProducerConfig.

  /**
    * Constructor for the QueueRequestor class.
    * <P>
    * This implementation assumes the session parameter to be non-transacted
    * and either AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE.
    * <P>
    * @param session the queue session the queue belongs to.
    * @param queue the queue to perform the request/reply call on.
    * @exception JMSException if a JMS error occurs.
    **/
  public QueueRequestor(QueueSession session,
                        Queue queue) throws JMSException {
    m_session = session;
    m_queue = queue;
    m_sender = session.createSender(queue);
    m_sender.setPriority(9); // This is the priority that will be used unless overridden
  }

  /**
    * Returns the Temporary Queue Pool associated to this QueueRequestor.
    * <P>
    * @return TempQueuePool
    * @see TempQueuePool
    **/
  public TempQueuePool getTempQueuePool() {
    return m_tempQueuePool;
  }

  /**
    * Sets the Temporary Queue Pool associated to this QueueRequestor.  The PointToPointProducer
    * creates a TempQueuePool and calls this method when it's being initialized based
    * it's configuration information found in the ProducerConfig object.
    * <P>
    * @return TempQueuePool
    * @see TempQueuePool
    * @see org.openeai.config.ProducerConfig
    **/
  public void setTempQueuePool(TempQueuePool tempQueuePool) {
    m_tempQueuePool = tempQueuePool;
  }

  /**
    * Returns the request timeout interval assoicated to this QueueRequestor.  This
    * value is used by the QueueRequestor when it waits for a response to a request.
    * If the response takes longer than this interval, a JMSTimeoutException will be
    * thrown by the request method.
    * <P>
    * @return int the currently assigned timeout interval for requests.
    **/
  public int getTimeoutInterval() {
    return m_timeoutInterval;
  }

  /**
    * Sets the request timeout interval assoicated to this QueueRequestor.  This
    * value is used by the QueueRequestor when it waits for a response to a request.
    * If the response takes longer than this interval, a JMSTimeoutException will be
    * thrown by the request method.
    * <P>
    * This method is called by the PointToPointProducer when it uses the QueueRequestor
    * to send a request in the produceRequest method.
    * <P>
    * @param timeout int the currently assigned timeout interval for requests.
    * @see PointToPointProducer#produceRequest(Message)
    **/
  public void setTimeoutInterval(int timeout) {
    m_timeoutInterval = timeout;
  }

  private void setQueue(Queue queue) {
    m_queue = queue;
  }

  /**
    * Send a request and wait for a reply.  The temporary queue is used for
    * replyTo, and only one reply per request is expected.
    * <P>
    * This method retrieves a PooledTempQueue object from the TempQueuePool associated
    * to this QueueRequestor and uses the TemporaryQueue and QueueReceiver associated
    * to that PooledTempQueue for the request and subsequent response.
    * <P>
    * @param message the message to send.
    * @return the reply message.
    * @exception JMSException if a JMS error occurs.
    * @exception JMSTimeoutException if the response isn't returned in the allowed time.
    **/
  public Message request(Message message) throws JMSTimeoutException,
                                                 JMSException {

    // Get a PooledTempQueue fro our TempQueuePool
    PooledTempQueue pq = getTempQueuePool().getPooledTempQueue();
    TemporaryQueue tempQueue = pq.getTemporaryQueue();
    QueueReceiver receiver = pq.getQueueReceiver();

    if (tempQueue == null) {
      String errMessage =
        "Error creating TemporaryQueue in QueueRequestor.  TemporaryQueue is Null.";
      logger.fatal(errMessage);
      throw new JMSException(errMessage);
    }

    if (receiver == null) {
      String errMessage =
        "Error creating QueueReceiver in QueueRequestor.  QueueReceiver is Null.";
      logger.fatal(errMessage);
      throw new JMSException(errMessage);
    }

    message.setJMSReplyTo(tempQueue);
    message.setJMSPriority(m_sender.getPriority());
    logger.debug("[QueueRequestor] Sending request");
    String messageIdSent =
      message.getStringProperty(MessageProducer.MESSAGE_ID);
    if (messageIdSent == null) {
      // got to get the message id out of the message
      logger.warn("JMS property 'MESSAGE_ID' does not exist on " +
                  "message, getting it from XML Document.");
      messageIdSent = getMessageIdFromXmlDocument(message);
      String commandName =
        message.getStringProperty(MessageProducer.COMMAND_NAME);
      String requestName = message.getStringProperty("REQUEST_NAME");
      message.clearProperties();
      message.setStringProperty(MessageProducer.COMMAND_NAME, commandName);
      message.setStringProperty(MessageProducer.MESSAGE_ID, messageIdSent);
      if (requestName != null) {
        message.setStringProperty("REQUEST_NAME", requestName);
      }
    }

    String correlationIdSent = messageIdSent;
    message.setJMSCorrelationID(correlationIdSent);
    logger.debug("Correlation id from the message I'm sending: " +
                 message.getJMSCorrelationID());

    logger.debug("QueueRequestor:  MessageId I'm sending: " + messageIdSent);
    m_sender.send(message);

    Message retMessage = null;
//    synchronized (_lock) {
      retMessage = receiver.receive(getTimeoutInterval());
//    }

    if (retMessage == null) {
      receiver.close();
      getTempQueuePool().reinitializePooledTempQueue(pq);
      throw new JMSTimeoutException("Timed out waiting for response to request.");
    }
    else {
      getTempQueuePool().releasePooledTempQueue(pq);
    }

    String messageIdReturned =
      retMessage.getStringProperty(MessageProducer.MESSAGE_ID) == null ? "" :
      retMessage.getStringProperty(MessageProducer.MESSAGE_ID);
    String correlationIdReturned = retMessage.getJMSCorrelationID();

    logger.debug("QueueRequestor:  MessageId I got back: " +
                 messageIdReturned);
    if (messageIdSent.equalsIgnoreCase(messageIdReturned) == false) {
      logger.info("No MESSAGE_ID returned, checking JMS Correlation ID.");
      if (correlationIdSent.equalsIgnoreCase(correlationIdReturned) == false) {
        logger.fatal("Failed to correlate message id sent to message id returned.");
        logger.fatal("  Message Id sent:     " + messageIdSent);
        logger.fatal("  Message Id returned: " + messageIdReturned);
        logger.fatal("  Correlation Id sent:     " + correlationIdSent);
        logger.fatal("  Correlation Id returned: " + correlationIdReturned);
        throw new JMSException("Unable to correlate response returned to request sent.");
      }
    }
    logger.debug("[QueueRequestor] Got response");

    return retMessage;
  }


  /**
  * Since a provider may allocate some resources on behalf of a
  * QueueRequestor outside the JVM, clients should close them when they
  * are not needed. Relying on garbage collection to eventually reclaim
  * these resources may not be timely enough.
  * <P>
  * Note that this method closes the Session object passed to the
  * QueueRequestor constructor and all objects in the TempQueuePool.
  * <P>
  * @exception JMSException if a JMS error occurs.
  **/
  public void close() throws JMSException {
    // publisher and consumer created by constructor are implicitly closed.
    if (m_session != null) {
      m_session.close();
    }
    getTempQueuePool().close();
  }

  private String getMessageIdFromXmlDocument(Message aMessage) throws JMSException {

    // Handle Text Message
    TextMessage textMsg = null;
    try {
      textMsg = (TextMessage)aMessage;
    }
    catch (ClassCastException e) {
      logger.fatal(e.getMessage(), e);
      throw new JMSException(e.getMessage());
    }

    // Build an XML Document out of the contents of the message passed in...
    Document inDoc = null;
    try {
      XmlDocumentReader xmlReader = new XmlDocumentReader();
      inDoc =
          xmlReader.initializeDocument(new ByteArrayInputStream(textMsg.getText().getBytes()),
                                       false);
      Element eControlArea = getControlArea(inDoc.getRootElement());
      Element eMessageId =
        eControlArea.getChild("Sender").getChild("MessageId");
      String senderAppId = eMessageId.getChild("SenderAppId").getText();
      String producerId = eMessageId.getChild("ProducerId").getText();
      String messageSeq = eMessageId.getChild("MessageSeq").getText();
      return senderAppId + "-" + producerId + "-" + messageSeq;
    }
    catch (XmlDocumentReaderException e) {
      logger.fatal("Error creating document from message passed in");
      logger.fatal(e.getMessage(), e);
      throw new JMSException(e.getMessage());
    }
  }

  /**
   * This method looks at the document and returns the appropriate ControlArea.
   * Since there can be three different control areas based on the message
   * (ControlAreaRequest, ControlAreaReply and ControlAreaSync) we need to have
   * some intelligence built in when retrieving the element from the document.
   * Therefore, command implementations can just call this method and get a ControlArea
   * element based on the type of message being processed without having to make
   * assumptions regarding the type of message it's processing and without performing
   * this logic themselves.
   *
   * @param root  org.jdom.Element the root element of the document
   *
   * @return          Element the ControlArea element (may be ControlAreaRequest,
   *                  ControlAreaReply or ControlAreaSync depending on the doc)
   */
  private Element getControlArea(Element root) {
    java.util.List cList = root.getChildren();
    Element retElem = null;
    for (int i = 0; i < cList.size(); i++) {
      Element current = (Element)cList.get(i);
      if (current.getName().indexOf("ControlArea") != -1) {
        retElem = current;
      }
    }
    return retElem;
  }
}
TOP

Related Classes of org.openeai.jms.producer.QueueRequestor

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.