Package org.codehaus.activemq.usecases

Source Code of org.codehaus.activemq.usecases.TopicClusterTest

/**
*
* Copyright 2004 Protique Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
**/

package org.codehaus.activemq.usecases;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.codehaus.activemq.ActiveMQConnectionFactory;
import org.codehaus.activemq.broker.BrokerContainer;
import org.codehaus.activemq.broker.impl.BrokerContainerImpl;
import org.codehaus.activemq.message.ActiveMQQueue;
import org.codehaus.activemq.message.ActiveMQTextMessage;
import org.codehaus.activemq.message.ActiveMQTopic;
import org.codehaus.activemq.transport.DiscoveryNetworkConnector;
import org.codehaus.activemq.transport.zeroconf.ZeroconfDiscoveryAgent;
import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;

/**
* @version $Revision: 1.3 $
*/
public class TopicClusterTest extends TestCase implements MessageListener {
    protected Log log = LogFactory.getLog(getClass());
    protected Destination destination;
    protected boolean topic = true;
    protected SynchronizedInt receivedMessageCount = new SynchronizedInt(0);
    protected static int MESSAGE_COUNT = 50;
    protected static int NUMBER_IN_CLUSTER = 3;
    protected int deliveryMode = DeliveryMode.NON_PERSISTENT;
    protected MessageProducer[] producers;
    protected Connection[] connections;

    protected void setUp() throws Exception {
        connections = new Connection[NUMBER_IN_CLUSTER];
        producers = new MessageProducer[NUMBER_IN_CLUSTER];
        Destination destination = createDestination();
        int portStart = 50000;
        for (int i = 0;i < NUMBER_IN_CLUSTER;i++) {
            connections[i] = createConnection("broker(" + i + ")");
            connections[i].setClientID("ClusterTest" + i);
            connections[i].start();
            Session session = connections[i].createSession(false, Session.AUTO_ACKNOWLEDGE);
            producers[i] = session.createProducer(destination);
            producers[i].setDeliveryMode(deliveryMode);
            MessageConsumer consumer = createMessageConsumer(session,destination);
            consumer.setMessageListener(this);
           
        }
        System.out.println("Sleeping to ensure cluster is fully connected");
        Thread.sleep(5000);
    }

    protected void tearDown() throws Exception {
        if (connections != null) {
            for (int i = 0;i < connections.length;i++) {
                connections[i].close();
            }
        }
    }
   
    protected MessageConsumer createMessageConsumer(Session session, Destination destination) throws JMSException{
        return session.createConsumer(destination);
    }

    protected ActiveMQConnectionFactory createGenericClusterFactory(String brokerName) throws JMSException {
        BrokerContainer container = new BrokerContainerImpl(brokerName);
        ZeroconfDiscoveryAgent agent = new ZeroconfDiscoveryAgent();
        agent.setType(getClass().getName() + ".");
        container.setDiscoveryAgent(agent);
        String url = "tcp://localhost:0";
        container.addConnector(url);
        container.addNetworkConnector(new DiscoveryNetworkConnector(container));
        container.start();
        //embedded brokers are resolved by url - so make url unique
        //this confused me tests for a while :-)
        return new ActiveMQConnectionFactory(container,"vm://"+brokerName);
    }

    protected int expectedReceiveCount() {
        return MESSAGE_COUNT * NUMBER_IN_CLUSTER * NUMBER_IN_CLUSTER;
    }

    protected Connection createConnection(String name) throws JMSException {
        return createGenericClusterFactory(name).createConnection();
    }

    protected Destination createDestination() {
        return createDestination(getClass().getName());
    }

    protected Destination createDestination(String name) {
        if (topic) {
            return new ActiveMQTopic(name);
        }
        else {
            return new ActiveMQQueue(name);
        }
    }


    /**
     * @param msg
     */
    public void onMessage(Message msg) {
        //System.out.println("GOT: " + msg);
        receivedMessageCount.increment();
        synchronized (receivedMessageCount) {
            if (receivedMessageCount.get() >= expectedReceiveCount()) {
                receivedMessageCount.notify();
            }
        }
    }

    /**
     * @throws Exception
     */
    public void testSendReceive() throws Exception {
        for (int i = 0;i < MESSAGE_COUNT;i++) {
            TextMessage textMessage = new ActiveMQTextMessage();
            textMessage.setText("MSG-NO:" + i);
            for (int x = 0;x < producers.length;x++) {
                producers[x].send(textMessage);
            }
        }
        synchronized (receivedMessageCount) {
            if (receivedMessageCount.get() < expectedReceiveCount()) {
                receivedMessageCount.wait(20000);
            }
        }
        //sleep a little - to check we don't get too many messages
        Thread.sleep(2000);
        System.err.println("GOT: " + receivedMessageCount.get());
        assertEquals("Expected message count not correct", expectedReceiveCount(), receivedMessageCount.get());
    }

}
TOP

Related Classes of org.codehaus.activemq.usecases.TopicClusterTest

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.