/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/
package org.codehaus.activemq.web;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.ActiveMQConnection;
import org.codehaus.activemq.ActiveMQConnectionFactory;
import org.codehaus.activemq.ActiveMQSession;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpSession;
import javax.servlet.http.HttpSessionActivationListener;
import javax.servlet.http.HttpSessionEvent;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.HashMap;
import java.util.Map;
/**
* Represents a messaging client used from inside a web container
* typically stored inside a HttpSession
*
* @version $Revision: 1.11 $
*/
public class WebClient implements HttpSessionActivationListener, Externalizable {
public static final String webClientAttribute = "org.codehaus.activemq.webclient";
public static final String connectionFactoryAttribute = "org.codehaus.activemq.connectionFactory";
public static final String brokerUrlInitParam = "org.codehaus.activemq.brokerURL";
public static final String embeddedBrokerInitParam = "org.codehaus.activemq.embeddedBroker";
private static final Log log = LogFactory.getLog(WebClient.class);
private transient ServletContext context;
private static transient ConnectionFactory factory;
private transient ActiveMQConnection connection;
private transient ActiveMQSession session;
private transient MessageProducer producer;
private transient Map consumers = new HashMap();
/**
* @return the web client for the current HTTP session or null if there is not a web client created yet
*/
public static WebClient getWebClient(HttpSession session) {
return (WebClient) session.getAttribute(webClientAttribute);
}
/**
* Only called by serialization
*/
public WebClient() {
}
public WebClient(ServletContext context) {
this.context = context;
setupConnectionFactory(context);
}
public void start() throws JMSException {
}
public void stop() throws JMSException {
try {
connection.close();
}
finally {
producer = null;
session = null;
connection = null;
consumers.clear();
}
}
public void writeExternal(ObjectOutput out) throws IOException {
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
consumers = new HashMap();
}
public void send(Destination destination, Message message) throws JMSException {
if (producer == null) {
producer = getSession().createProducer(null);
}
log.info("Sending to destination: " + destination);
producer.send(destination, message);
log.info("Sent! message: " + message);
}
public Session getSession() throws JMSException {
if (session == null) {
session = (ActiveMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
}
return session;
}
public ActiveMQConnection getConnection() throws JMSException {
if (connection == null) {
connection = (ActiveMQConnection) factory.createConnection();
connection.start();
}
return connection;
}
public void sessionWillPassivate(HttpSessionEvent event) {
try {
stop();
}
catch (JMSException e) {
log.warn("Could not close connection: " + e, e);
}
}
public void sessionDidActivate(HttpSessionEvent event) {
// lets update the connection factory from the servlet context
context = event.getSession().getServletContext();
setupConnectionFactory(context);
}
private static synchronized void setupConnectionFactory(ServletContext context) {
factory = initConnectionFactory(context);
if (factory == null) {
log.warn("No ConnectionFactory available in the ServletContext for: " + connectionFactoryAttribute);
factory = new ActiveMQConnectionFactory("vm://localhost");
context.setAttribute(connectionFactoryAttribute, factory);
}
}
public static ConnectionFactory initConnectionFactory(ServletContext servletContext) {
ConnectionFactory connectionFactory = (ConnectionFactory) servletContext.getAttribute(connectionFactoryAttribute);
if (connectionFactory == null) {
String brokerURL = (String) servletContext.getInitParameter(brokerUrlInitParam);
servletContext.log("Value of: " + brokerUrlInitParam + " is: " + brokerURL);
if (brokerURL == null) {
brokerURL = "vm://localhost";
}
boolean embeddedBroker = MessageServletSupport.asBoolean(servletContext.getInitParameter(embeddedBrokerInitParam));
servletContext.log("Use embedded broker: " + embeddedBroker);
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL);
factory.setUseEmbeddedBroker(embeddedBroker);
connectionFactory = factory;
servletContext.setAttribute(connectionFactoryAttribute, connectionFactory);
}
return connectionFactory;
}
public synchronized MessageConsumer getConsumer(Destination destination) throws JMSException {
MessageConsumer consumer = (MessageConsumer) consumers.get(destination);
if (consumer == null) {
consumer = session.createConsumer(destination);
consumers.put(destination, consumer);
}
return consumer;
}
}