package openjms.examples.nuix;
import java.util.Properties;
import java.sql.SQLException;
import javax.jms.MessageListener;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.QueueConnection;
import javax.jms.Queue;
import javax.jms.QueueSession;
import javax.jms.QueueReceiver;
import javax.jms.QueueSender;
import javax.jms.QueueConnectionFactory;
import javax.jms.Session;
import javax.jms.ObjectMessage;
import javax.jms.DeliveryMode;
import javax.jms.Message;
import javax.naming.NamingException;
import javax.naming.Context;
import javax.naming.InitialContext;
/**
*
*/
public abstract class BaseJmsSenderReceiver implements MessageListener,
ExceptionListener {
//////////////////////////////////////////////////////////////////////////////////////
// Constants
/**
* Default time to wait before trying to connect again.
*/
public static final int DEFAULT_TIMEOUT = 5000;
/**
* Constant from the config file to set if this sender/receiver should listen
* to message or pull messages.
*/
public static final String LISTEN_SETTING = "listen.message";
/**
* True constant.
*/
public static final String TRUE = "true";
/**
* Connection timeout.
*/
protected int connectionTimeOut = 0;
/**
* The receiver for the JMS queue to listen to.
*/
protected QueueReceiver receiver = null;
/**
* The sender for the JMS queue to output to.
*/
protected QueueSender sender = null;
/**
* The connection factory for JMS queues.
*/
protected QueueConnectionFactory factory = null;
/**
* The configuration.
*/
protected Properties configuration;
/**
* Queue session.
*/
protected QueueSession session;
/**
* Store the ack mode
*/
protected int ack = Session.CLIENT_ACKNOWLEDGE;
//////////////////////////////////////////////////////////////////////////////////////
// Constructor
/**
* Construct this server and setup the sender/receiver based on the configuration.
* @param configuration the configuration.
*/
public BaseJmsSenderReceiver()
{
configuration = System.getProperties();
}
//////////////////////////////////////////////////////////////////////////////////////
// Methods
/**
* Do initialisation for the Controller. Set up the following:
* <ul>
* <li>The connection timeout</li>
* <li>Bind to the specified JMS Queue.</li>
* <li>Setup the DB connection pool. </li>
* <li>Cache the STAGES table. </li>
* </ul>
* @throws javax.naming.NamingException
*/
protected void init() throws NamingException, JMSException
{
// Setup the timeout value.
String timeoutStr = (String)configuration.getProperty("jms.connection.timeout");
if (timeoutStr == null) {
connectionTimeOut = DEFAULT_TIMEOUT;
} else {
try {
connectionTimeOut = Integer.parseInt(timeoutStr);
} catch (NumberFormatException e) {
connectionTimeOut = DEFAULT_TIMEOUT;
}
}
// Get the connection factory from JNDI.
Context jndiContext = getContext();
factory = (QueueConnectionFactory)
jndiContext.lookup("JmsQueueConnectionFactory");
// if we can't find the factory then throw an exception
if (factory == null) {
throw new RuntimeException(
"Failed to locate connection factory");
}
// Create a connection to the queue.
QueueConnection connection = factory.createQueueConnection();
connection.start();
String ackMode = (String) configuration.getProperty("jms.ackmode");
if (ackMode != null) {
if (ackMode.equals("auto")) {
ack = Session.AUTO_ACKNOWLEDGE;
} else if (ackMode.equals("dups")) {
ack = Session.DUPS_OK_ACKNOWLEDGE;
} else if (ackMode.equals("client")) {
// no ack mode
ack = Session.CLIENT_ACKNOWLEDGE;
}
}
// Setup the listener.
session = connection.createQueueSession(true, ack);
// Setup the listener.
setupListener(jndiContext);
// Setup the sender.
setupSender(jndiContext);
}
/**
* Set up the sender to send outgoing messages to the JMS queue.
* @param jndiContext the context to lookup resources at.
* @throws NamingException when exceptions are thrown during resource lookup.
* @throws JMSException when exceptions are thrown in JMS.
*/
protected void setupSender(Context jndiContext) throws NamingException, JMSException
{
System.out.println("Setting up sender ...");
String queueName = (String) configuration.getProperty("jms.queue.send");
Queue queue = (Queue) jndiContext.lookup(queueName);
sender = session.createSender(queue);
}
/**
* Set up the listener to listen for JMS Messages.
* @param jndiContext the context to look for the queue.
* @throws NamingException when it can't find JNDI resources.
* @throws JMSException when JMS errors occur.
*/
protected void setupListener(Context jndiContext) throws NamingException, JMSException
{
// Set up the queue.
String queueName = (String) configuration.getProperty("jms.queue.listen");
Queue queue = (Queue) jndiContext.lookup(queueName);
receiver = session.createReceiver(queue);
String listenSetting = (String) configuration.getProperty(LISTEN_SETTING);
if (listenSetting != null && listenSetting.equalsIgnoreCase(TRUE)) {
receiver.setMessageListener(this);
}
}
/**
* Get the JNDI context.
* @return the JNDI Context
* @throws NamingException
*/
protected Context getContext() throws NamingException
{
Properties props = new Properties();
String host = (String) configuration.getProperty("jndi.jms.host");
String port = (String) configuration.getProperty("jndi.jms.port");
String jndiname = (String) configuration.getProperty("jndi.jms.name");
String modeType =
(String) configuration.getProperty("jndi.jms.initialcontextfactory");
props.put(Context.PROVIDER_URL,
"rmi://" + host + ":" + port + "/" + jndiname);
props.put(Context.INITIAL_CONTEXT_FACTORY, modeType);
Context jndiContext = new InitialContext(props);
return jndiContext;
}
/**
* Any exception, then die.
* @param e exception.
*/
public void onException(JMSException e)
{
System.err.println("Got exception in server: " + e.getMessage());
e.printStackTrace();
System.exit(-1);
}
/**
* Send the message off to the out JMS queue.
* @param sme the sme to send.
*/
protected void sendMessage(java.io.Serializable sme)
throws JMSException
{
System.out.println("Sending message " + sme);
// Create the message.
ObjectMessage msg = session.createObjectMessage(sme);
// Send it off.
sender.send(msg, DeliveryMode.PERSISTENT, 1, 0);
System.out.println("Message has been sent " + sender.getQueue().getQueueName());
}
/**
* Exit this JMS sender and receiver gracefully.
*/
protected void exit()
{
System.out.println("\n Clossing all connections in BaseJmsSenderReceiver ...");
try {
System.out.print("\n Closing session ................... ");
session.close();
System.out.print(" [ok]");
System.out.print("\n Closing sender .................... ");
sender.close();
System.out.print(" [ok]\n");
} catch (Exception e) {
System.err.print("Error while closing the final queue");
e.printStackTrace();
}
}
public Message receive()
throws JMSException
{
return receiver.receive();
}
public Message receive(long wait)
throws JMSException
{
return receiver.receive(wait);
}
public Message receiveNoWait()
throws JMSException
{
return receiver.receiveNoWait();
}
}