Package org.apache.airavata.wsmg.matching.XPath

Source Code of org.apache.airavata.wsmg.matching.XPath.TestAddtionalWseXpathAndTopicScenarios$NotificationReciever

/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.apache.airavata.wsmg.matching.XPath;

import java.net.URL;
import java.util.Properties;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import javax.xml.stream.XMLStreamException;

import junit.framework.TestCase;

import org.apache.airavata.wsmg.client.ConsumerNotificationHandler;
import org.apache.airavata.wsmg.client.WseMsgBrokerClient;
import org.apache.airavata.wsmg.util.ConfigKeys;
import org.apache.airavata.wsmg.util.TestUtilServer;
import org.apache.axiom.om.impl.llom.util.AXIOMUtil;
import org.apache.axiom.soap.SOAPEnvelope;
import org.apache.axis2.AxisFault;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

public class TestAddtionalWseXpathAndTopicScenarios extends TestCase {

    static Properties configs = new Properties();

    class NotificationReciever implements ConsumerNotificationHandler {

        private BlockingQueue<SOAPEnvelope> queue = new LinkedBlockingQueue<SOAPEnvelope>();

        private String id;

        public NotificationReciever(String id) {
            this.id = id;
        }

        public void handleNotification(SOAPEnvelope msgEnvelope) {
            queue.add(msgEnvelope);
            System.out.println(String.format("[reciever id: %s] %s", id, msgEnvelope));
        }

        public BlockingQueue<SOAPEnvelope> getMsgQueue() {
            return queue;
        }
    }

    @Before
    public void setUp() throws Exception {
        URL configURL = ClassLoader.getSystemResource(ConfigKeys.CONFIG_FILE_NAME);
        configs.load(configURL.openStream());

        TestUtilServer.start(null, null);
    }

    @After
    public void tearDown() throws Exception {
    }

    @Test
    public final void testXpathAndTopicOnlyRoundTrip() {

        try {

            String topic = "RoundTripTestXpathAndTopicWse";

            String xpathExpression = "/c/b/a[text()=1]";

            String msgFormat = "<c><b><a>%d</a></b></c>";

            long value = 1;
            String matchingMsg = String.format(msgFormat, value);
            String unmatchingMsg = String.format(msgFormat, value + 1);

            int consumerPort = TestUtilServer.getAvailablePort();

            String brokerEpr = "http://localhost:" + TestUtilServer.TESTING_PORT + "/axis2/services/EventingService";

            WseMsgBrokerClient topicOnlyReceiverApi = new WseMsgBrokerClient();
            topicOnlyReceiverApi.init(brokerEpr);
            NotificationReciever topicOnlyMsgReceiver = new NotificationReciever("Topic Only");
            String[] topicConsumerEPRs = topicOnlyReceiverApi.startConsumerService(consumerPort, topicOnlyMsgReceiver);
            assertTrue("invalid consumer eprs returned", topicConsumerEPRs.length > 0);
            String topicOnlySubId = topicOnlyReceiverApi.subscribe(topicConsumerEPRs[0], topic, null);
            System.out.println("Topic only subscription ID: " + topicOnlySubId);

            WseMsgBrokerClient xpathAndTopicReceiverApi = new WseMsgBrokerClient();
            xpathAndTopicReceiverApi.init(brokerEpr);
            NotificationReciever topicAndXpathMsgReceiver = new NotificationReciever("Topic And Xpath");
            String[] topicAndXpathConsumerEPRs = xpathAndTopicReceiverApi.startConsumerService(consumerPort + 1,
                    topicAndXpathMsgReceiver);
            assertTrue("invalid consumer eprs returned", topicAndXpathConsumerEPRs.length > 0);
            String topicAndXpathSubId = xpathAndTopicReceiverApi.subscribe(topicAndXpathConsumerEPRs[0], topic,
                    xpathExpression);
            System.out.println("Xpath and Topic subscription ID: " + topicAndXpathSubId);

            WseMsgBrokerClient senderApi = new WseMsgBrokerClient();
            senderApi.init(brokerEpr);

            try {

                senderApi.publish(topic, AXIOMUtil.stringToOM(matchingMsg));
                senderApi.publish(topic, AXIOMUtil.stringToOM(unmatchingMsg));

                Thread.sleep(5000);

                assertTrue("topic only reciever should get all messages" + topicOnlyMsgReceiver.getMsgQueue().size(),
                        topicOnlyMsgReceiver.getMsgQueue().size() == 2);

                assertTrue("xpath and topic reciever should only get one message"
                        + topicAndXpathMsgReceiver.getMsgQueue().size(),
                        topicAndXpathMsgReceiver.getMsgQueue().size() == 1);
            } catch (XMLStreamException x) {
                fail("Error while creating OMElement");
            } catch (InterruptedException e) {
                fail("interrupted while waiting for message");
            }

            topicOnlyReceiverApi.unSubscribe(topicOnlySubId);
            topicOnlyReceiverApi.shutdownConsumerService();

            xpathAndTopicReceiverApi.unSubscribe(topicAndXpathSubId);
            xpathAndTopicReceiverApi.shutdownConsumerService();

        } catch (AxisFault e) {
            e.printStackTrace();
            fail("unexpected exception occured");
        }

    }
}
TOP

Related Classes of org.apache.airavata.wsmg.matching.XPath.TestAddtionalWseXpathAndTopicScenarios$NotificationReciever

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.