Package org.codehaus.activemq.web

Source Code of org.codehaus.activemq.web.WebClient

/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/

package org.codehaus.activemq.web;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.ActiveMQConnection;
import org.codehaus.activemq.ActiveMQConnectionFactory;
import org.codehaus.activemq.ActiveMQSession;

import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpSession;
import javax.servlet.http.HttpSessionActivationListener;
import javax.servlet.http.HttpSessionEvent;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.HashMap;
import java.util.Map;

/**
* Represents a messaging client used from inside a web container
* typically stored inside a HttpSession
*
* @version $Revision: 1.11 $
*/
public class WebClient implements HttpSessionActivationListener, Externalizable {
    public static final String webClientAttribute = "org.codehaus.activemq.webclient";
    public static final String connectionFactoryAttribute = "org.codehaus.activemq.connectionFactory";
    public static final String brokerUrlInitParam = "org.codehaus.activemq.brokerURL";
    public static final String embeddedBrokerInitParam = "org.codehaus.activemq.embeddedBroker";

    private static final Log log = LogFactory.getLog(WebClient.class);


    private transient ServletContext context;
    private static transient ConnectionFactory factory;
    private transient ActiveMQConnection connection;
    private transient ActiveMQSession session;
    private transient MessageProducer producer;
    private transient Map consumers = new HashMap();


    /**
     * @return the web client for the current HTTP session or null if there is not a web client created yet
     */
    public static WebClient getWebClient(HttpSession session) {
        return (WebClient) session.getAttribute(webClientAttribute);
    }

    /**
     * Only called by serialization
     */
    public WebClient() {
    }

    public WebClient(ServletContext context) {
        this.context = context;
        setupConnectionFactory(context);
    }

    public void start() throws JMSException {
    }

    public void stop() throws JMSException {
        try {
            connection.close();
        }
        finally {
            producer = null;
            session = null;
            connection = null;
            consumers.clear();
        }
    }

    public void writeExternal(ObjectOutput out) throws IOException {
    }

    public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
        consumers = new HashMap();
    }

    public void send(Destination destination, Message message) throws JMSException {
        if (producer == null) {
            producer = getSession().createProducer(null);
        }
        log.info("Sending to destination: " + destination);
        producer.send(destination, message);
        log.info("Sent! message: " + message);
    }

    public Session getSession() throws JMSException {
        if (session == null) {
            session = (ActiveMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
        }
        return session;
    }

    public ActiveMQConnection getConnection() throws JMSException {
        if (connection == null) {
            connection = (ActiveMQConnection) factory.createConnection();
            connection.start();
        }
        return connection;
    }

    public void sessionWillPassivate(HttpSessionEvent event) {
        try {
            stop();
        }
        catch (JMSException e) {
            log.warn("Could not close connection: " + e, e);
        }
    }

    public void sessionDidActivate(HttpSessionEvent event) {
        // lets update the connection factory from the servlet context
        context = event.getSession().getServletContext();
        setupConnectionFactory(context);
    }

    private static synchronized void setupConnectionFactory(ServletContext context) {
        factory = initConnectionFactory(context);
        if (factory == null) {
            log.warn("No ConnectionFactory available in the ServletContext for: " + connectionFactoryAttribute);
            factory = new ActiveMQConnectionFactory("vm://localhost");
            context.setAttribute(connectionFactoryAttribute, factory);
        }
    }

    public static ConnectionFactory initConnectionFactory(ServletContext servletContext) {
        ConnectionFactory connectionFactory = (ConnectionFactory) servletContext.getAttribute(connectionFactoryAttribute);
        if (connectionFactory == null) {
            String brokerURL = (String) servletContext.getInitParameter(brokerUrlInitParam);

            servletContext.log("Value of: " + brokerUrlInitParam + " is: " + brokerURL);

            if (brokerURL == null) {
                brokerURL = "vm://localhost";
            }

            boolean embeddedBroker = MessageServletSupport.asBoolean(servletContext.getInitParameter(embeddedBrokerInitParam));
            servletContext.log("Use embedded broker: " + embeddedBroker);

            ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL);
            factory.setUseEmbeddedBroker(embeddedBroker);

            connectionFactory = factory;
            servletContext.setAttribute(connectionFactoryAttribute, connectionFactory);
        }
        return connectionFactory;
    }

    public synchronized MessageConsumer getConsumer(Destination destination) throws JMSException {
        MessageConsumer consumer = (MessageConsumer) consumers.get(destination);
        if (consumer == null) {
            consumer = session.createConsumer(destination);
            consumers.put(destination, consumer);
        }
        return consumer;
    }

}
TOP

Related Classes of org.codehaus.activemq.web.WebClient

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.