/**
* Copyright (c) 2009, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
* <p/>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p/>
* http://www.apache.org/licenses/LICENSE-2.0
* <p/>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.wso2.carbon.broker.core.internal.jms;
import org.apache.axiom.om.OMElement;
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.wso2.carbon.CarbonConstants;
import org.wso2.carbon.broker.core.BrokerConfiguration;
import org.wso2.carbon.broker.core.BrokerListener;
import org.wso2.carbon.broker.core.BrokerTypeDto;
import org.wso2.carbon.broker.core.Property;
import org.wso2.carbon.broker.core.exception.BrokerEventProcessingException;
import org.wso2.carbon.broker.core.internal.BrokerType;
import org.wso2.carbon.broker.core.internal.util.BrokerConstants;
import javax.jms.JMSException;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.ResourceBundle;
/**
* JMS implementation of BrokerType
*/
public class JMSBrokerType implements BrokerType {
private static final Log log = LogFactory.getLog(JMSBrokerType.class);
private static JMSBrokerType instance = new JMSBrokerType();
private BrokerTypeDto brokerTypeDto = null;
private JMSBrokerType() {
this.brokerTypeDto = new BrokerTypeDto();
this.brokerTypeDto.setName(BrokerConstants.BROKER_TYPE_JMS_QPID);
ResourceBundle resourceBundle = ResourceBundle.getBundle(
"org.wso2.carbon.broker.core.i18n.Resources", Locale.getDefault());
// set initial factory as a property
Property factoryInitialProperty = new Property(BrokerConstants.BROKER_CONF_JMS_PROP_JNDI_NAME);
factoryInitialProperty.setRequired(true);
factoryInitialProperty.setDisplayName(
resourceBundle.getString(BrokerConstants.BROKER_CONF_JMS_PROP_JNDI_NAME));
this.brokerTypeDto.addProperty(factoryInitialProperty);
// set connection user name as property
Property principalProperty = new Property(BrokerConstants.BROKER_CONF_JMS_PROP_PRINCIPAL);
principalProperty.setRequired(true);
principalProperty.setDisplayName(
resourceBundle.getString(BrokerConstants.BROKER_CONF_JMS_PROP_PRINCIPAL));
this.brokerTypeDto.addProperty(principalProperty);
// set connection password as property
Property credentialProperty = new Property(BrokerConstants.BROKER_CONF_JMS_PROP_CREDENTIALS);
credentialProperty.setRequired(true);
credentialProperty.setSecured(true);
credentialProperty.setDisplayName(
resourceBundle.getString(BrokerConstants.BROKER_CONF_JMS_PROP_CREDENTIALS));
this.brokerTypeDto.addProperty(credentialProperty);
// set ip of broker
Property ipProperty = new Property(BrokerConstants.BROKER_CONF_JMS_PROP_IP_ADDRESS);
ipProperty.setDisplayName(
resourceBundle.getString(BrokerConstants.BROKER_CONF_JMS_PROP_IP_ADDRESS));
ipProperty.setRequired(true);
this.brokerTypeDto.addProperty(ipProperty);
// set broker port listening
Property portProperty = new Property(BrokerConstants.BROKER_CONF_JMS_PROP_PORT);
portProperty.setDisplayName(
resourceBundle.getString(BrokerConstants.BROKER_CONF_JMS_PROP_PORT));
portProperty.setRequired(true);
this.brokerTypeDto.addProperty(portProperty);
// set virtual host name as property
Property virtualHostNameProperty =
new Property(BrokerConstants.BROKER_CONF_JMS_PROP_VIRTURAL_HOST_NAME);
virtualHostNameProperty.setRequired(true);
virtualHostNameProperty.setDisplayName(
resourceBundle.getString(BrokerConstants.BROKER_CONF_JMS_PROP_VIRTURAL_HOST_NAME));
this.brokerTypeDto.addProperty(virtualHostNameProperty);
}
public static JMSBrokerType getInstance() {
return instance;
}
public BrokerTypeDto getBrokerTypeDto() {
return brokerTypeDto;
}
/**
* Subscribe to given topic
*
* @param topicName - topic name to subscribe
* @param brokerListener - broker type will invoke this when it receive events
* @param brokerConfiguration - broker configuration details
* @throws BrokerEventProcessingException
*/
public void subscribe(String topicName, BrokerListener brokerListener,
BrokerConfiguration brokerConfiguration,
AxisConfiguration axisConfiguration)
throws BrokerEventProcessingException {
// create connection
TopicConnection topicConnection = getTopicConnection(brokerConfiguration);
// create session, subscriber, message listener and listen on that topic
try {
TopicSession session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(topicName);
TopicSubscriber subscriber = session.createSubscriber(topic);
MessageListener messageListener = new JMSMessageListener(brokerListener);
subscriber.setMessageListener(messageListener);
topicConnection.start();
} catch (JMSException e) {
String error = "Failed to subscribe to topic:" + topicName;
log.error(error, e);
throw new BrokerEventProcessingException(error, e);
}
}
/**
* Create Connection factory with initial context
*
* @param brokerConfiguration broker - configuration details to create a broker
* @return Topic connection
* @throws BrokerEventProcessingException - jndi look up failed
*/
private TopicConnection getTopicConnection(BrokerConfiguration brokerConfiguration)
throws BrokerEventProcessingException {
// create connection factory
try {
TopicConnectionFactory topicConnectionFactory =
(TopicConnectionFactory) createInitialContext(brokerConfiguration).
lookup(BrokerConstants.BROKER_CONF_JMS_QPID_PROP_CONNECTION_FACTORY_LOOK_UP_NAME);
return topicConnectionFactory.createTopicConnection();
} catch (NamingException e) {
throw new BrokerEventProcessingException("Can not create topic connection factory.", e);
} catch (JMSException e) {
throw new BrokerEventProcessingException("Can not create topic connection .", e);
}
}
/**
* Create Initial Context with given configuration
*
* @param brokerConfiguration - configuration details
* @return Initial context created with configuration details
* @throws BrokerEventProcessingException - invalid parameters
*/
private InitialContext createInitialContext(BrokerConfiguration brokerConfiguration)
throws BrokerEventProcessingException {
// get configuration details from brokerConfiguration provided
Map<String, String> properties = brokerConfiguration.getProperties();
String factoryInitial = properties.get(BrokerConstants.BROKER_CONF_JMS_PROP_JNDI_NAME);
String principal = properties.get(BrokerConstants.BROKER_CONF_JMS_PROP_PRINCIPAL);
String credentials = properties.get(BrokerConstants.BROKER_CONF_JMS_PROP_CREDENTIALS);
String virtualHostName = properties.get(BrokerConstants.BROKER_CONF_JMS_PROP_VIRTURAL_HOST_NAME);
String ipAddress = properties.get(BrokerConstants.BROKER_CONF_JMS_PROP_IP_ADDRESS);
String port = properties.get(BrokerConstants.BROKER_CONF_JMS_PROP_PORT);
// create connection url
String providerUrl = createConnectionUrl(principal, credentials, virtualHostName, ipAddress, port);
// create qpid connection factory lookup name
String qpidConnectionFactoryName = BrokerConstants.BROKER_CONF_JMS_QPID_PROP_JNDI_OBJECT_NAME_PREfIX +
BrokerConstants.BROKER_CONF_JMS_QPID_PROP_CONNECTION_FACTORY_LOOK_UP_NAME;
// use property file to create context
Properties property = new Properties();
property.put(Context.INITIAL_CONTEXT_FACTORY, factoryInitial);
property.put(qpidConnectionFactoryName, providerUrl);
property.put(CarbonConstants.REQUEST_BASE_CONTEXT,"true");
try {
return new InitialContext(property);
} catch (NamingException e) {
throw new BrokerEventProcessingException("Can not create initial context with given parameters.", e);
}
}
/**
* Get the amqp provider url with given parameters
*
* @param userName - user name who have permission to make connection to qpid broker
* @param password - password to authenticate to qpid broker
* @param virtualHostName - qpid configuration details as in config.xml in qpid, eg: test, development
* @param ipAddress - broker running machines ip address
* @param port - qpid port listening
* @return connection url as string
*/
private String createConnectionUrl(String userName, String password, String virtualHostName,
String ipAddress,
String port) {
StringBuffer buffer = new StringBuffer("amqp://");
buffer.append(userName)
.append(":")
.append(password)
.append("@")
.append("clientId")
.append("/")
.append(virtualHostName)
.append("?brokerlist='tcp://")
.append(ipAddress)
.append(":")
.append(port)
.append("'");
return buffer.toString();
}
/**
* Publish message to given topic
*
* @param topicName - topic name to publish messages
* @param message - message to send
* @param brokerConfiguration - broker configuration to be used
* @throws BrokerEventProcessingException
*/
public void publish(String topicName, OMElement message,
BrokerConfiguration brokerConfiguration)
throws BrokerEventProcessingException {
// create topic connection
TopicConnection topicConnection = getTopicConnection(brokerConfiguration);
// create session, producer, message and send message to given destination(topic)
// OMElement message text is published here.
Session session = null;
try {
session = topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(topicName);
MessageProducer producer = session.createProducer(topic);
TextMessage jmsMessage = session.createTextMessage(message.toString());
producer.send(jmsMessage);
} catch (JMSException e) {
String error = "Failed to publish to topic:" + topicName;
log.error(error, e);
throw new BrokerEventProcessingException(error, e);
} finally {
// close used resources.
try {
if (session != null) {
session.close();
}
if (topicConnection != null) {
topicConnection.close();
}
} catch (JMSException e) {
log.warn("Failed to reallocate resources.", e);
}
}
}
}