/*******************************************************************************
$Source: /cvs/repositories/openii3/project/java/source/org/openeai/jms/producer/QueueRequestor.java,v $
$Revision: 1.18 $
*******************************************************************************/
/**********************************************************************
This file is part of the OpenEAI Application Foundation or
OpenEAI Message Object API created by Tod Jackson
(tod@openeai.org) and Steve Wheat (steve@openeai.org) at
the University of Illinois Urbana-Champaign.
Copyright (C) 2002 The OpenEAI Software Foundation
This library is free software; you can redistribute it and/or
modify it under the terms of the GNU Lesser General Public
License as published by the Free Software Foundation; either
version 2.1 of the License, or (at your option) any later version.
This library is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
Lesser General Public License for more details.
You should have received a copy of the GNU Lesser General Public
License along with this library; if not, write to the Free Software
Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
For specific licensing details and examples of how this software
can be used to build commercial integration software or to implement
integrations for your enterprise, visit http://www.OpenEai.org/licensing.
*/
package org.openeai.jms.producer;
import java.io.ByteArrayInputStream;
import javax.jms.*;
import javax.jms.Queue;
import org.openeai.*;
import java.util.*;
import org.jdom.output.XMLOutputter;
import org.jdom.Document;
import org.jdom.Element;
import org.openeai.xml.*;
/**
* JMS provides a QueueRequestor helper class to simplify making service
* requests. This is the OpenEAI version of that class which leverages several
* other OpenEAI foundation components like TempQueuePool to increase
* the performance of request production. It is thread safe so multiple threads
* can make requests using the same producer and queue requestor.
* <P>
* @author Tod Jackson (tod@openeai.org)
* @author Steve Wheat (steve@openeai.org)
* @version 3.0 - 28 January 2003
* @see TempQueuePool
* @see PointToPointProducer
**/
public class QueueRequestor extends OpenEaiObject {
// private Object _lock = new Object();
private TempQueuePool m_tempQueuePool = null;
private QueueSession m_session =
null; // The queue session the queue belongs to.
private Queue m_queue = null; // The queue to perform the request/reply on.
private QueueSender m_sender = null;
private int m_timeoutInterval =
10000; // Default (10 seconds), eventually, this will be
// configurable via the Producer creating this requestor
// and it's ProducerConfig.
/**
* Constructor for the QueueRequestor class.
* <P>
* This implementation assumes the session parameter to be non-transacted
* and either AUTO_ACKNOWLEDGE or DUPS_OK_ACKNOWLEDGE.
* <P>
* @param session the queue session the queue belongs to.
* @param queue the queue to perform the request/reply call on.
* @exception JMSException if a JMS error occurs.
**/
public QueueRequestor(QueueSession session,
Queue queue) throws JMSException {
m_session = session;
m_queue = queue;
m_sender = session.createSender(queue);
m_sender.setPriority(9); // This is the priority that will be used unless overridden
}
/**
* Returns the Temporary Queue Pool associated to this QueueRequestor.
* <P>
* @return TempQueuePool
* @see TempQueuePool
**/
public TempQueuePool getTempQueuePool() {
return m_tempQueuePool;
}
/**
* Sets the Temporary Queue Pool associated to this QueueRequestor. The PointToPointProducer
* creates a TempQueuePool and calls this method when it's being initialized based
* it's configuration information found in the ProducerConfig object.
* <P>
* @return TempQueuePool
* @see TempQueuePool
* @see org.openeai.config.ProducerConfig
**/
public void setTempQueuePool(TempQueuePool tempQueuePool) {
m_tempQueuePool = tempQueuePool;
}
/**
* Returns the request timeout interval assoicated to this QueueRequestor. This
* value is used by the QueueRequestor when it waits for a response to a request.
* If the response takes longer than this interval, a JMSTimeoutException will be
* thrown by the request method.
* <P>
* @return int the currently assigned timeout interval for requests.
**/
public int getTimeoutInterval() {
return m_timeoutInterval;
}
/**
* Sets the request timeout interval assoicated to this QueueRequestor. This
* value is used by the QueueRequestor when it waits for a response to a request.
* If the response takes longer than this interval, a JMSTimeoutException will be
* thrown by the request method.
* <P>
* This method is called by the PointToPointProducer when it uses the QueueRequestor
* to send a request in the produceRequest method.
* <P>
* @param timeout int the currently assigned timeout interval for requests.
* @see PointToPointProducer#produceRequest(Message)
**/
public void setTimeoutInterval(int timeout) {
m_timeoutInterval = timeout;
}
private void setQueue(Queue queue) {
m_queue = queue;
}
/**
* Send a request and wait for a reply. The temporary queue is used for
* replyTo, and only one reply per request is expected.
* <P>
* This method retrieves a PooledTempQueue object from the TempQueuePool associated
* to this QueueRequestor and uses the TemporaryQueue and QueueReceiver associated
* to that PooledTempQueue for the request and subsequent response.
* <P>
* @param message the message to send.
* @return the reply message.
* @exception JMSException if a JMS error occurs.
* @exception JMSTimeoutException if the response isn't returned in the allowed time.
**/
public Message request(Message message) throws JMSTimeoutException,
JMSException {
// Get a PooledTempQueue fro our TempQueuePool
PooledTempQueue pq = getTempQueuePool().getPooledTempQueue();
TemporaryQueue tempQueue = pq.getTemporaryQueue();
QueueReceiver receiver = pq.getQueueReceiver();
if (tempQueue == null) {
String errMessage =
"Error creating TemporaryQueue in QueueRequestor. TemporaryQueue is Null.";
logger.fatal(errMessage);
throw new JMSException(errMessage);
}
if (receiver == null) {
String errMessage =
"Error creating QueueReceiver in QueueRequestor. QueueReceiver is Null.";
logger.fatal(errMessage);
throw new JMSException(errMessage);
}
message.setJMSReplyTo(tempQueue);
message.setJMSPriority(m_sender.getPriority());
logger.debug("[QueueRequestor] Sending request");
String messageIdSent =
message.getStringProperty(MessageProducer.MESSAGE_ID);
if (messageIdSent == null) {
// got to get the message id out of the message
logger.warn("JMS property 'MESSAGE_ID' does not exist on " +
"message, getting it from XML Document.");
messageIdSent = getMessageIdFromXmlDocument(message);
String commandName =
message.getStringProperty(MessageProducer.COMMAND_NAME);
String requestName = message.getStringProperty("REQUEST_NAME");
message.clearProperties();
message.setStringProperty(MessageProducer.COMMAND_NAME, commandName);
message.setStringProperty(MessageProducer.MESSAGE_ID, messageIdSent);
if (requestName != null) {
message.setStringProperty("REQUEST_NAME", requestName);
}
}
String correlationIdSent = messageIdSent;
message.setJMSCorrelationID(correlationIdSent);
logger.debug("Correlation id from the message I'm sending: " +
message.getJMSCorrelationID());
logger.debug("QueueRequestor: MessageId I'm sending: " + messageIdSent);
m_sender.send(message);
Message retMessage = null;
// synchronized (_lock) {
retMessage = receiver.receive(getTimeoutInterval());
// }
if (retMessage == null) {
receiver.close();
getTempQueuePool().reinitializePooledTempQueue(pq);
throw new JMSTimeoutException("Timed out waiting for response to request.");
}
else {
getTempQueuePool().releasePooledTempQueue(pq);
}
String messageIdReturned =
retMessage.getStringProperty(MessageProducer.MESSAGE_ID) == null ? "" :
retMessage.getStringProperty(MessageProducer.MESSAGE_ID);
String correlationIdReturned = retMessage.getJMSCorrelationID();
logger.debug("QueueRequestor: MessageId I got back: " +
messageIdReturned);
if (messageIdSent.equalsIgnoreCase(messageIdReturned) == false) {
logger.info("No MESSAGE_ID returned, checking JMS Correlation ID.");
if (correlationIdSent.equalsIgnoreCase(correlationIdReturned) == false) {
logger.fatal("Failed to correlate message id sent to message id returned.");
logger.fatal(" Message Id sent: " + messageIdSent);
logger.fatal(" Message Id returned: " + messageIdReturned);
logger.fatal(" Correlation Id sent: " + correlationIdSent);
logger.fatal(" Correlation Id returned: " + correlationIdReturned);
throw new JMSException("Unable to correlate response returned to request sent.");
}
}
logger.debug("[QueueRequestor] Got response");
return retMessage;
}
/**
* Since a provider may allocate some resources on behalf of a
* QueueRequestor outside the JVM, clients should close them when they
* are not needed. Relying on garbage collection to eventually reclaim
* these resources may not be timely enough.
* <P>
* Note that this method closes the Session object passed to the
* QueueRequestor constructor and all objects in the TempQueuePool.
* <P>
* @exception JMSException if a JMS error occurs.
**/
public void close() throws JMSException {
// publisher and consumer created by constructor are implicitly closed.
if (m_session != null) {
m_session.close();
}
getTempQueuePool().close();
}
private String getMessageIdFromXmlDocument(Message aMessage) throws JMSException {
// Handle Text Message
TextMessage textMsg = null;
try {
textMsg = (TextMessage)aMessage;
}
catch (ClassCastException e) {
logger.fatal(e.getMessage(), e);
throw new JMSException(e.getMessage());
}
// Build an XML Document out of the contents of the message passed in...
Document inDoc = null;
try {
XmlDocumentReader xmlReader = new XmlDocumentReader();
inDoc =
xmlReader.initializeDocument(new ByteArrayInputStream(textMsg.getText().getBytes()),
false);
Element eControlArea = getControlArea(inDoc.getRootElement());
Element eMessageId =
eControlArea.getChild("Sender").getChild("MessageId");
String senderAppId = eMessageId.getChild("SenderAppId").getText();
String producerId = eMessageId.getChild("ProducerId").getText();
String messageSeq = eMessageId.getChild("MessageSeq").getText();
return senderAppId + "-" + producerId + "-" + messageSeq;
}
catch (XmlDocumentReaderException e) {
logger.fatal("Error creating document from message passed in");
logger.fatal(e.getMessage(), e);
throw new JMSException(e.getMessage());
}
}
/**
* This method looks at the document and returns the appropriate ControlArea.
* Since there can be three different control areas based on the message
* (ControlAreaRequest, ControlAreaReply and ControlAreaSync) we need to have
* some intelligence built in when retrieving the element from the document.
* Therefore, command implementations can just call this method and get a ControlArea
* element based on the type of message being processed without having to make
* assumptions regarding the type of message it's processing and without performing
* this logic themselves.
*
* @param root org.jdom.Element the root element of the document
*
* @return Element the ControlArea element (may be ControlAreaRequest,
* ControlAreaReply or ControlAreaSync depending on the doc)
*/
private Element getControlArea(Element root) {
java.util.List cList = root.getChildren();
Element retElem = null;
for (int i = 0; i < cList.size(); i++) {
Element current = (Element)cList.get(i);
if (current.getName().indexOf("ControlArea") != -1) {
retElem = current;
}
}
return retElem;
}
}