Package org.apache.airavata.wsmg.messenger.protocol.impl

Source Code of org.apache.airavata.wsmg.messenger.protocol.impl.Axis2Protocol

/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.apache.airavata.wsmg.messenger.protocol.impl;

import java.io.StringReader;
import java.util.LinkedList;
import java.util.List;

import javax.xml.stream.XMLStreamException;

import org.apache.airavata.wsmg.broker.AdditionalMessageContent;
import org.apache.airavata.wsmg.broker.ConsumerInfo;
import org.apache.airavata.wsmg.commons.CommonRoutines;
import org.apache.airavata.wsmg.commons.NameSpaceConstants;
import org.apache.airavata.wsmg.messenger.protocol.DeliveryProtocol;
import org.apache.airavata.wsmg.messenger.protocol.SendingException;
import org.apache.axiom.om.OMAbstractFactory;
import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.util.ElementHelper;
import org.apache.axiom.soap.SOAPFactory;
import org.apache.axiom.soap.SOAPHeaderBlock;
import org.apache.axis2.AxisFault;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.client.Options;
import org.apache.axis2.client.ServiceClient;
import org.apache.axis2.transport.http.HTTPConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class Axis2Protocol implements DeliveryProtocol {

    private static final Logger logger = LoggerFactory.getLogger(Axis2Protocol.class);

    private static SOAPFactory soapfactory = OMAbstractFactory.getSOAP11Factory();

    private ServiceClient nonThreadLocalServiceClient;

    long tcpConnectionTimeout;

    public void deliver(ConsumerInfo consumerInfo, OMElement message, AdditionalMessageContent additionalMessageContent)
            throws SendingException {
        EndpointReference consumerReference = new EndpointReference(consumerInfo.getConsumerEprStr());

        /*
         * Extract information
         */
        String actionString = null;
        List<OMElement> soapHeaders = new LinkedList<OMElement>();
        if (consumerInfo.getType().compareTo("wsnt") == 0) {
            actionString = NameSpaceConstants.WSNT_NS.getNamespaceURI() + "/Notify";
        } else { // wse
            actionString = additionalMessageContent.getAction();
            String topicElString = additionalMessageContent.getTopicElement();
            if (topicElString != null) {
                OMElement topicEl = null;
                try {
                    topicEl = CommonRoutines.reader2OMElement(new StringReader(topicElString));
                    soapHeaders.add(topicEl);
                } catch (XMLStreamException e) {
                    logger.error("exception at topicEl xmlStreamException", e);
                }
            }
        }

        ServiceClient client = null;
        try {

            client = configureServiceClient(actionString, consumerReference, additionalMessageContent.getMessageID(),
                    soapHeaders);

            client.sendRobust(message);

        } catch (AxisFault ex) {
            throw new SendingException(ex.getCause());
        } finally {
            if (client != null) {
                try {
                    client.cleanup();
                    client.cleanupTransport();
                } catch (AxisFault ex) {
                    logger.error(ex.getMessage(), ex);
                }
            }
        }
    }

    public void setTimeout(long timeout) {
        this.tcpConnectionTimeout = timeout;
    }

    private ServiceClient configureServiceClient(String action, EndpointReference consumerLocation, String msgId,
            List<OMElement> soapHeaders) throws AxisFault {

        // not engaging addressing modules
        ServiceClient client = getServiceClient();

        SOAPHeaderBlock msgIdEl = soapfactory.createSOAPHeaderBlock("MessageID", NameSpaceConstants.WSA_NS);
        msgIdEl.setText(msgId);
        SOAPHeaderBlock actionEl = soapfactory.createSOAPHeaderBlock("Action", NameSpaceConstants.WSA_NS);
        actionEl.setText(action);

        SOAPHeaderBlock to = soapfactory.createSOAPHeaderBlock("To", NameSpaceConstants.WSA_NS);
        to.setText(consumerLocation.getAddress());

        client.addHeader(actionEl);
        client.addHeader(msgIdEl);
        client.addHeader(to);

        for (OMElement omHeader : soapHeaders) {
            try {
                SOAPHeaderBlock headerBlock = ElementHelper.toSOAPHeaderBlock(omHeader, soapfactory);
                client.addHeader(headerBlock);
            } catch (Exception e) {
                throw AxisFault.makeFault(e);
            }
        }

        Options opts = new Options();
        opts.setTimeOutInMilliSeconds(tcpConnectionTimeout);
        opts.setMessageId(msgId);
        opts.setTo(consumerLocation);
        opts.setAction(action);
        opts.setProperty(HTTPConstants.CHUNKED, Boolean.FALSE);
        opts.setProperty(HTTPConstants.HTTP_PROTOCOL_VERSION, HTTPConstants.HEADER_PROTOCOL_10);
        client.setOptions(opts);

        return client;
    }

    private ServiceClient getServiceClient() throws AxisFault {
        if (nonThreadLocalServiceClient == null) {
            nonThreadLocalServiceClient = new ServiceClient();
        }
        nonThreadLocalServiceClient.removeHeaders();
        return nonThreadLocalServiceClient;
    }
}
TOP

Related Classes of org.apache.airavata.wsmg.messenger.protocol.impl.Axis2Protocol

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.