Package org.apache.qpid.client.prefetch

Source Code of org.apache.qpid.client.prefetch.PrefetchBehaviourTest

/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.client.prefetch;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.qpid.configuration.ClientProperties;
import org.apache.qpid.test.utils.QpidBrokerTestCase;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.TextMessage;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;


public class PrefetchBehaviourTest extends QpidBrokerTestCase
{
    protected static final Logger _logger = LoggerFactory.getLogger(PrefetchBehaviourTest.class);
    private Connection _normalConnection;
    private AtomicBoolean _exceptionCaught;
    private CountDownLatch _processingStarted;
    private CountDownLatch _processingCompleted;

    protected void setUp() throws Exception
    {
        super.setUp();
        _normalConnection = getConnection();
        _exceptionCaught = new AtomicBoolean();
        _processingStarted = new CountDownLatch(1);
        _processingCompleted = new CountDownLatch(1);
    }

    /**
     * Verifies that a slow processing asynchronous transacted consumer with prefetch=1 only
     * gets 1 of the messages sent, with the second consumer picking up the others while the
     * slow consumer is processing, i.e that prefetch=1 actually does what it says on the tin.
     */
    public void testPrefetchOneWithAsynchronousTransactedConsumer() throws Exception
    {
        final long processingTime = 5000;
       
        //create a second connection with prefetch set to 1
        setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(1).toString());
        Connection prefetch1Connection = getConnection();

        prefetch1Connection.start();
        _normalConnection.start();

        //create an asynchronous consumer with simulated slow processing
        final Session prefetch1session = prefetch1Connection.createSession(true, Session.SESSION_TRANSACTED);
        Queue queue = prefetch1session.createQueue(getTestQueueName());
        MessageConsumer prefetch1consumer = prefetch1session.createConsumer(queue);
        prefetch1consumer.setMessageListener(new MessageListener()
        {
            public void onMessage(Message message)
            {
                try
                {
                    _logger.debug("starting processing");
                    _processingStarted.countDown();
                    _logger.debug("processing started");

                    //simulate message processing
                    Thread.sleep(processingTime);

                    prefetch1session.commit();

                    _processingCompleted.countDown();
                }
                catch(Exception e)
                {
                    _logger.error("Exception caught in message listener");
                    _exceptionCaught.set(true);
                }
            }
        });

        //create producer and send 5 messages
        Session producerSession = _normalConnection.createSession(true, Session.SESSION_TRANSACTED);
        MessageProducer producer = producerSession.createProducer(queue);

        for (int i = 0; i < 5; i++)
        {
            producer.send(producerSession.createTextMessage("test"));
        }
        producerSession.commit();

        //wait for the first message to start being processed by the async consumer
        assertTrue("Async processing failed to start in allowed timeframe", _processingStarted.await(2000, TimeUnit.MILLISECONDS));
        _logger.debug("proceeding with test");

        //try to consumer the other messages with another consumer while the async procesisng occurs
        Session normalSession = _normalConnection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer normalConsumer = normalSession.createConsumer(queue);       
       
        Message msg;
        // Check that other consumer gets the other 4 messages
        for (int i = 0; i < 4; i++)
        {
            msg = normalConsumer.receive(1500);
            assertNotNull("Consumer should receive 4 messages",msg);               
        }
        msg = normalConsumer.receive(250);
        assertNull("Consumer should not have received a 5th message",msg);

        //wait for the other consumer to finish to ensure it completes ok
        _logger.debug("waiting for async consumer to complete");
        assertTrue("Async processing failed to complete in allowed timeframe", _processingStarted.await(processingTime + 2000, TimeUnit.MILLISECONDS));
        assertFalse("Unexpected exception during async message processing",_exceptionCaught.get());
    }

    /**
     * This test was originally known as AMQConnectionTest#testPrefetchSystemProperty.
     *
     */
    public void testMessagesAreDistributedBetweenConsumersWithLowPrefetch() throws Exception
    {
        Queue queue = getTestQueue();

        setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, new Integer(2).toString());

        Connection connection = getConnection();
        connection.start();
        // Create Consumer A
        Session consSessA = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumerA = consSessA.createConsumer(queue);

        // ensure message delivery to consumer A is started (required for 0-8..0-9-1)
        final Message msg = consumerA.receiveNoWait();
        assertNull(msg);

        Session producerSession = connection.createSession(true, Session.SESSION_TRANSACTED);
        sendMessage(producerSession, queue, 3);

        // Create Consumer B
        MessageConsumer consumerB = null;
        if (isBroker010())
        {
            // 0-10 prefetch is per consumer so we create Consumer B on the same session as Consumer A
            consumerB = consSessA.createConsumer(queue);
        }
        else
        {
            // 0-8..0-9-1 prefetch is per session so we create Consumer B on a separate session
            Session consSessB = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            consumerB = consSessB.createConsumer(queue);
        }

        // As message delivery to consumer A is already started, the first two messages should
        // now be with consumer A.  The last message will still be on the Broker as consumer A's
        // credit is exhausted and message delivery for consumer B is not yet running.

        // As described by QPID-3747, for 0-10 we *must* check Consumer B before Consumer A.
        // If we were to reverse the order, the SessionComplete will restore Consumer A's credit,
        // and the third message could be delivered to either Consumer A or Consumer B.

        // Check that consumer B gets the last (third) message.
        final Message msgConsumerB = consumerB.receive(1500);
        assertNotNull("Consumer B should have received a message", msgConsumerB);
        assertEquals("Consumer B received message with unexpected index", 2, msgConsumerB.getIntProperty(INDEX));

        // Now check that consumer A has indeed got the first two messages.
        for (int i = 0; i < 2; i++)
        {
            final Message msgConsumerA = consumerA.receive(1500);
            assertNotNull("Consumer A should have received a message " + i, msgConsumerA);
            assertEquals("Consumer A received message with unexpected index", i, msgConsumerA.getIntProperty(INDEX));
        }
    }

    /**
     * Test Goal: Verify if connection stop releases all messages in it's prefetch buffer.
     * Test Strategy: Send 10 messages to a queue. Create a consumer with maxprefetch of 5, but never consume them.
     *                Stop the connection. Create a new connection and a consumer with maxprefetch 10 on the same queue.
     *                Try to receive all 10 messages.
     */
    public void testConnectionStop() throws Exception
    {
        setTestClientSystemProperty(ClientProperties.MAX_PREFETCH_PROP_NAME, "10");
        Connection con = getConnection();
        con.start();
        Session ssn = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Destination queue = ssn.createQueue("ADDR:my-queue;{create: always}");

        MessageProducer prod = ssn.createProducer(queue);
        for (int i=0; i<10;i++)
        {
           prod.send(ssn.createTextMessage("Msg" + i));
        }

        MessageConsumer consumer = ssn.createConsumer(queue);
        // This is to ensure we get the first client to prefetch.
        Message msg = consumer.receive(1000);
        assertNotNull("The first consumer should get one message",msg);
        con.stop();

        Connection con2 = getConnection();
        con2.start();
        Session ssn2 = con2.createSession(false, Session.AUTO_ACKNOWLEDGE);
        MessageConsumer consumer2 = ssn2.createConsumer(queue);
        for (int i=0; i<9;i++)
        {
           TextMessage m = (TextMessage)consumer2.receive(1000);
           assertNotNull("The second consumer should get 9 messages, but received only " + i,m);
        }
    }
}
TOP

Related Classes of org.apache.qpid.client.prefetch.PrefetchBehaviourTest

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.