Package org.apache.airavata.wsmg.broker

Source Code of org.apache.airavata.wsmg.broker.NotificationProcessor

/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.apache.airavata.wsmg.broker;

import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;

import javax.xml.namespace.QName;
import javax.xml.stream.XMLStreamException;

import org.apache.airavata.wsmg.broker.context.ContextParameters;
import org.apache.airavata.wsmg.broker.context.ProcessingContext;
import org.apache.airavata.wsmg.commons.NameSpaceConstants;
import org.apache.airavata.wsmg.commons.OutGoingMessage;
import org.apache.airavata.wsmg.config.WSMGParameter;
import org.apache.airavata.wsmg.config.WsmgConfigurationContext;
import org.apache.airavata.wsmg.matching.AbstractMessageMatcher;
import org.apache.airavata.wsmg.messenger.OutGoingQueue;
import org.apache.airavata.wsmg.util.BrokerUtil;
import org.apache.airavata.wsmg.util.RunTimeStatistics;
import org.apache.axiom.om.OMAbstractFactory;
import org.apache.axiom.om.OMAttribute;
import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.OMException;
import org.apache.axiom.om.OMFactory;
import org.apache.axiom.om.OMNamespace;
import org.apache.axis2.AxisFault;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class NotificationProcessor {

    private static final Logger logger = LoggerFactory.getLogger(NotificationProcessor.class);

    private WsmgConfigurationContext wsmgConfigContext;

    protected long messageCounter = 0;
    protected long messageId = 0;

    OMFactory factory = OMAbstractFactory.getOMFactory();

    private OutGoingQueue outgoingQueue;

    public NotificationProcessor(WsmgConfigurationContext config) {
        init(config);
    }

    private void init(WsmgConfigurationContext config) {
        this.wsmgConfigContext = config;
        outgoingQueue = config.getOutgoingQueue();
    }

    private synchronized long getNextTrackId() {
        messageCounter++;
        return messageCounter;
    }

    private synchronized long getNextMsgId() {
        messageId++;
        return messageId;
    }

    public void processMsg(ProcessingContext ctx, OMNamespace protocolNs) throws OMException, AxisFault {

        String trackId = "trackId_A_" + getNextTrackId();
        if (WSMGParameter.showTrackId) {
            logger.debug(trackId + ": received.");
        }

        AdditionalMessageContent additionalMessageContent = new AdditionalMessageContent(ctx.getMessageContext()
                .getSoapAction(), ctx.getMessageContext().getMessageID());
        additionalMessageContent.setTrackId(trackId);

        if (NameSpaceConstants.WSNT_NS.equals(protocolNs)) {

            onWSNTMsg(ctx, additionalMessageContent);
            setResponseMsg(ctx, trackId, protocolNs);
        } else { // WSE Notifications No specific namespace

            onWSEMsg(ctx, trackId, additionalMessageContent);
            setResponseMsg(ctx, trackId, protocolNs);
        }

    }

    /**
     * @param ctx
     * @param topicElString
     * @param trackId
     * @param additionalMessageContent
     * @throws OMException
     * @throws XMLStreamException
     */
    private void onWSEMsg(ProcessingContext ctx, String trackId, AdditionalMessageContent additionalMessageContent)
            throws OMException, AxisFault {

        String topicElString = null;
        String topicLocalString = null;

        QName qName = new QName(NameSpaceConstants.WSNT_NS.getNamespaceURI(), "Topic");

        OMElement topicEl = ctx.getMessageContext().getEnvelope().getHeader().getFirstChildWithName(qName);

        if (topicEl == null) {

            topicLocalString = ctx.getContextParameter(ContextParameters.TOPIC_FROM_URL);

            if (topicLocalString != null) {

                topicElString = "<wsnt:Topic "
                        + "Dialect=\"http://www.ibm.com/xmlns/stdwip/web-services/WS-Topics/TopicExpression/simple\" "
                        + "xmlns:ns2=\"http://tutorial.globus.org/auction\" "
                        + "xmlns:wsnt=\"http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification\">" + "ns2:"
                        + topicLocalString + "</wsnt:Topic>";
                // / }
                additionalMessageContent.setTopicElement(topicElString);
            } else {

                topicLocalString = "wseTopic";
                topicElString = "<wsnt:Topic "
                        + "Dialect=\"http://www.ibm.com/xmlns/stdwip/web-services/WS-Topics/TopicExpression/simple\" "
                        + "xmlns:ns2=\"http://tutorial.globus.org/auction\" "
                        + "xmlns:wsnt=\"http://www.ibm.com/xmlns/stdwip/web-services/WS-BaseNotification\">"
                        + "ns2:wseTopic</wsnt:Topic>";
                // / }
                additionalMessageContent.setTopicElement(topicElString);
            }
        } else {

            topicLocalString = BrokerUtil.getTopicLocalString(topicEl.getText());
            try {
                topicElString = topicEl.toStringWithConsume();
            } catch (XMLStreamException e) {
                logger.error("exceptions occured at WSE eventing notification creating", e);
            }
            additionalMessageContent.setTopicElement(topicElString);
        }

        OMElement messageEl = ctx.getSoapBody().getFirstElement();
        if (messageEl == null) {
            throw new AxisFault("no message found");
        }

        String message = null;
        try {
            message = messageEl.toStringWithConsume();
        } catch (XMLStreamException e) {
            logger.error("unable to serialize the message", e);
            throw new AxisFault("unable to serialize the message", e);
        }

        matchAndSave(message, topicLocalString, additionalMessageContent);
    }

    /**
     * @param ctx
     * @param trackId
     * @throws OMException
     */
    private void setResponseMsg(ProcessingContext ctx, String trackId, OMNamespace responseNS) throws OMException {
        // set response message

        ctx.addResponseMsgNameSpaces(responseNS);

        OMAttribute trackIdAttribute = factory.createOMAttribute("trackId", null, trackId);
        OMElement messageElement = ctx.getMessageContext().getEnvelope().getBody().getFirstElement();
        OMElement responseMsgElement = factory.createOMElement(messageElement.getLocalName() + "Response", responseNS);
        responseMsgElement.addAttribute(trackIdAttribute);
        ctx.setRespMessage(responseMsgElement);

    }

    /**
     * @param ctx
     * @param topicLocalString
     * @param topicElString
     * @param producerReferenceElString
     * @param additionalMessageContent
     * @throws OMException
     * @throws XMLStreamException
     * @throws AxisFault
     */
    private void onWSNTMsg(ProcessingContext ctx, AdditionalMessageContent additionalMessageContent)
            throws OMException, AxisFault {

        String producerReferenceElString = null;
        String topicElString = null;

        boolean noElements = true;

        // TODO: set nicely with a processing context
        OMElement notifyEl = ctx.getSoapBody().getFirstElement();
        for (Iterator<OMElement> iter = notifyEl.getChildrenWithLocalName("NotificationMessage"); iter.hasNext();) {
            noElements = false;
            OMElement wrappedMessageEl = iter.next();

            String topicLocalString = null;

            OMElement topicEl = wrappedMessageEl.getFirstChildWithName(new QName(NameSpaceConstants.WSNT_NS
                    .getNamespaceURI(), "Topic"));
            if (topicEl != null) {

                topicLocalString = BrokerUtil.getTopicLocalString(topicEl.getText()); // get what ever inside this
                                                                                      // element

                try {
                    topicElString = topicEl.toStringWithConsume();
                } catch (XMLStreamException e) {
                    logger.error("exception occured while creating NotificationConsumer", e);
                }
                additionalMessageContent.setTopicElement(topicElString);
            }
            OMElement producerReferenceEl = wrappedMessageEl.getFirstChildWithName(new QName(NameSpaceConstants.WSNT_NS
                    .getNamespaceURI(), "ProducerReference"));

            if (producerReferenceEl != null) {
                try {
                    producerReferenceElString = producerReferenceEl.toStringWithConsume();
                } catch (XMLStreamException e) {
                    logger.error("exception occured while creating notification consumer", e);

                }
                additionalMessageContent.setProducerReference(producerReferenceElString);
            }

            OMElement notificationMessageEl = wrappedMessageEl.getFirstChildWithName(
                    new QName(NameSpaceConstants.WSNT_NS.getNamespaceURI(), "Message")).getFirstElement();

            String message = null;
            try {
                message = notificationMessageEl.toStringWithConsume();
            } catch (XMLStreamException e) {
                logger.error("exception occured while creating notification consumer", e);
                throw new AxisFault("unable to serialize the message", e);
            }

            matchAndSave(message, topicLocalString, additionalMessageContent);

        }
        if (noElements) {
            throw new AxisFault("at least one element is required");
        }
    }

    private void matchAndSave(String notificationMessage, String topicLocalString,
            AdditionalMessageContent additionalMessageContent) {

        List<ConsumerInfo> matchedConsumers = new LinkedList<ConsumerInfo>();

        // not use incoming queue
        // This is a fix for the bug seen in yfilter.
        try {

            for (AbstractMessageMatcher matcher : wsmgConfigContext.getMessageMatchers()) {
                matcher.populateMatches(null, additionalMessageContent, notificationMessage, topicLocalString,
                        matchedConsumers);
            }

            save(matchedConsumers, notificationMessage, additionalMessageContent);

        } catch (RuntimeException e) {
            logger.error("Caught RuntimeException", e);
        }

    }

    public void save(List<ConsumerInfo> consumerInfoList, String message,
            AdditionalMessageContent additionalMessageContent) {

        if (consumerInfoList.size() == 0) // No subscription
            return;

        RunTimeStatistics.addNewNotificationMessageSize(message.length());
        OutGoingMessage outGoingMessage = new OutGoingMessage();
        outGoingMessage.setTextMessage(message);
        outGoingMessage.setConsumerInfoList(consumerInfoList);
        outGoingMessage.setAdditionalMessageContent(additionalMessageContent);

        outgoingQueue.storeNotification(outGoingMessage, getNextMsgId());

        if (WSMGParameter.showTrackId)
            logger.info(additionalMessageContent.getTrackId() + ": putIn Outgoing queue.");
    }

}
TOP

Related Classes of org.apache.airavata.wsmg.broker.NotificationProcessor

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.