Package net.joshdevins.rabbitmq.client.ha.it

Source Code of net.joshdevins.rabbitmq.client.ha.it.RabbitConsistencyIntegrationTest

package net.joshdevins.rabbitmq.client.ha.it;

import java.util.Collections;
import java.util.Date;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Set;

import org.apache.log4j.Logger;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import com.rabbitmq.client.AMQP.Queue.BindOk;

@RunWith(SpringJUnit4ClassRunner.class)
@ContextConfiguration("/META-INF/spring/applicationContext.xml")
public class RabbitConsistencyIntegrationTest {

    public class PojoHandler {

        public void handleMessage(final String message) {

            if (LOG.isInfoEnabled()) {
                LOG.info("Received message: " + message);
            }

            // check for message
            Integer integer = Integer.valueOf(message);

            if (!messages.contains(integer)) {
                messagesReceivedNotDelivered.add(integer);
                return;
            }

            messages.remove(integer);
        }
    }

    private static final Logger LOG = Logger.getLogger(RabbitConsistencyIntegrationTest.class);

    private static final int NUM_MESSAGES = 1000;

    private static final int NUM_CONSUMERS = 1;

    private Set<Integer> messages;

    private List<Integer> messagesReceivedNotDelivered;

    @Autowired
    private ConnectionFactory connectionFactory;

    @Autowired
    private RabbitTemplate template;

    @Before
    public void before() {
        messages = Collections.synchronizedSet(new HashSet<Integer>());
        messagesReceivedNotDelivered = Collections.synchronizedList(new LinkedList<Integer>());
    }

    @Test
    public void test() throws InterruptedException {

        // ensure queue already created
        BindOk bindOk = template.execute(new TestChannelCallback());
        Assert.assertNotNull(bindOk);

        // setup async container
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames("testQueue");
        container.setConcurrentConsumers(NUM_CONSUMERS);
        // container.setChannelTransacted(true);

        MessageListenerAdapter adapter = new MessageListenerAdapter();
        adapter.setDelegate(new PojoHandler());
        container.setMessageListener(adapter);
        container.afterPropertiesSet();

        // empty out queue
        while (template.receive("testQueue") != null) {
            // do
        }

        container.start();

        // template.setChannelTransacted(true);

        // send all the messages
        long start = new Date().getTime();

        for (int i = 0; i < NUM_MESSAGES; i++) {

            messages.add(i);
            template.convertAndSend(Integer.toString(i));
            Thread.sleep(10);
        }

        long end = new Date().getTime();
        float time = (end - start) / (float) 1000;

        Thread.sleep(1000);

        LOG.debug("Time (secs.): " + time);
        LOG.debug("Avg. rate (msgs/sec): " + NUM_MESSAGES / time);

        // everything should be empty
        Assert.assertEquals(0, messagesReceivedNotDelivered.size());
        Assert.assertEquals(0, messages.size());
    }
}
TOP

Related Classes of net.joshdevins.rabbitmq.client.ha.it.RabbitConsistencyIntegrationTest

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.