Package org.apache.activemq.usecases

Source Code of org.apache.activemq.usecases.AMQStackOverFlowTest

/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*      http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.activemq.usecases;

import java.net.URI;
import java.util.ArrayList;
import java.util.List;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

import junit.framework.Assert;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.usage.SystemUsage;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;

public class AMQStackOverFlowTest extends TestCase {

    private static final String URL1 = "tcp://localhost:61616";

    private static final String URL2 = "tcp://localhost:61617";

    public void testStackOverflow() throws Exception {
        BrokerService brokerService1 = null;
        BrokerService brokerService2 = null;

        try {
            brokerService1 = createBrokerService("broker1", URL1, URL2);
            brokerService1.start();
            brokerService2 = createBrokerService("broker2", URL2, URL1);
            brokerService2.start();

            final ActiveMQConnectionFactory cf1 = new ActiveMQConnectionFactory(URL1);
            cf1.setUseAsyncSend(false);

            final ActiveMQConnectionFactory cf2 = new ActiveMQConnectionFactory(URL2);
            cf2.setUseAsyncSend(false);

            final JmsTemplate template1 = new JmsTemplate(cf1);
            template1.setReceiveTimeout(10000);

            template1.send("test.q", new MessageCreator() {

                public Message createMessage(Session session) throws JMSException {
                    return session.createTextMessage("test");
                }

            });

            final JmsTemplate template2 = new JmsTemplate(cf2);
            template2.setReceiveTimeout(10000);

            final Message m = template2.receive("test.q");
            assertTrue(m instanceof TextMessage);

            final TextMessage tm = (TextMessage)m;

            Assert.assertEquals("test", tm.getText());

            template2.send("test2.q", new MessageCreator() {

                public Message createMessage(Session session) throws JMSException {
                    return session.createTextMessage("test2");
                }

            });

            final Message m2 = template1.receive("test2.q");
            assertNotNull(m2);
            assertTrue(m2 instanceof TextMessage);

            final TextMessage tm2 = (TextMessage)m2;

            Assert.assertEquals("test2", tm2.getText());

        } finally {

            brokerService1.stop();
            brokerService1 = null;
            brokerService2.stop();
            brokerService2 = null;

        }

    }

    private BrokerService createBrokerService(final String brokerName, final String uri1, final String uri2)
        throws Exception {
        final BrokerService brokerService = new BrokerService();

        brokerService.setBrokerName(brokerName);
        brokerService.setPersistent(false);
        brokerService.setUseJmx(true);

        final SystemUsage memoryManager = new SystemUsage();
        //memoryManager.getMemoryUsage().setLimit(10);
        brokerService.setSystemUsage(memoryManager);

        final List<PolicyEntry> policyEntries = new ArrayList<PolicyEntry>();

        final PolicyEntry entry = new PolicyEntry();
        entry.setQueue(">");
        //entry.setMemoryLimit(1);
        policyEntries.add(entry);

        final PolicyMap policyMap = new PolicyMap();
        policyMap.setPolicyEntries(policyEntries);
        brokerService.setDestinationPolicy(policyMap);

        final TransportConnector tConnector = new TransportConnector();
        tConnector.setUri(new URI(uri1));
        tConnector.setName(brokerName + ".transportConnector");
        brokerService.addConnector(tConnector);

        if (uri2 != null) {
            final NetworkConnector nc = new DiscoveryNetworkConnector(new URI("static:" + uri2));
            nc.setBridgeTempDestinations(true);
            nc.setBrokerName(brokerName);
            //nc.setPrefetchSize(1);
            brokerService.addNetworkConnector(nc);
        }

        return brokerService;

    }
}
TOP

Related Classes of org.apache.activemq.usecases.AMQStackOverFlowTest

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.