Package org.apache.qpid.test.client

Source Code of org.apache.qpid.test.client.QueueBrowserAutoAckTest

/*
*  Licensed to the Apache Software Foundation (ASF) under one
*  or more contributor license agreements.  See the NOTICE file
*  distributed with this work for additional information
*  regarding copyright ownership.  The ASF licenses this file
*  to you under the Apache License, Version 2.0 (the
*  "License"); you may not use this file except in compliance
*  with the License.  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
*  Unless required by applicable law or agreed to in writing,
*  software distributed under the License is distributed on an
*  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
*  KIND, either express or implied.  See the License for the
*  specific language governing permissions and limitations
*  under the License.
*
*
*/
package org.apache.qpid.test.client;

import org.apache.qpid.AMQException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
import org.apache.qpid.test.utils.FailoverBaseCase;

import javax.jms.Connection;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.QueueBrowser;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.NamingException;
import java.util.Enumeration;
import java.util.Random;

public class QueueBrowserAutoAckTest extends FailoverBaseCase
{
    protected Connection _clientConnection;
    protected Session _clientSession;
    protected Queue _queue;
    protected static final String MESSAGE_ID_PROPERTY = "MessageIDProperty";
    protected boolean CLUSTERED = Boolean.getBoolean("profile.clustered");

    public void setUp() throws Exception
    {
        super.setUp();

        //Create Client
        _clientConnection = getConnection();
        _clientConnection.start();

        setupSession();

        _queue = _clientSession.createQueue(getTestQueueName());
        _clientSession.createConsumer(_queue).close();
       
        //Ensure there are no messages on the queue to start with.
        checkQueueDepth(0);
    }

    protected void setupSession() throws Exception
    {
        _clientSession = _clientConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
    }

    public void tearDown() throws Exception
    {
        if (_clientConnection != null)
        {
            _clientConnection.close();
        }

        super.tearDown();
    }

    protected void sendMessages(int num) throws JMSException
    {
        Connection producerConnection = null;
        try
        {
            producerConnection = getConnection();
        }
        catch (Exception e)
        {
            fail("Unable to lookup connection in JNDI.");
        }

        sendMessages(producerConnection, num);
    }

    protected void sendMessages(String connection, int num) throws JMSException
    {
        Connection producerConnection = null;
        try
        {
            producerConnection = getConnectionFactory(connection).createConnection("guest", "guest");
        }
        catch (Exception e)
        {
            e.printStackTrace();
            fail("Unable to lookup connection in JNDI.");
        }
        sendMessages(producerConnection, num);
    }


    protected void sendMessages(Connection producerConnection, int messageSendCount) throws JMSException
    {
        producerConnection.start();

        Session producerSession = producerConnection.createSession(true, Session.AUTO_ACKNOWLEDGE);

        //Ensure _queue is created
        producerSession.createConsumer(_queue).close();

        MessageProducer producer = producerSession.createProducer(_queue);

        for (int messsageID = 0; messsageID < messageSendCount; messsageID++)
        {
            TextMessage textMsg = producerSession.createTextMessage("Message " + messsageID);
            textMsg.setIntProperty(MESSAGE_ID_PROPERTY, messsageID);
            producer.send(textMsg);
        }
        producerSession.commit();

        producerConnection.close();
    }

    /**
     * Using the Protocol getQueueDepth method ensure that the correct number of messages are on the queue.
     *
     * Also uses a QueueBrowser as a second method of validating the message count on the queue.
     *
     * @param expectedDepth The expected Queue depth
     * @throws JMSException on error
     */
    protected void checkQueueDepth(int expectedDepth) throws JMSException
    {

        // create QueueBrowser
        _logger.info("Creating Queue Browser");

        QueueBrowser queueBrowser = _clientSession.createBrowser(_queue);

        // check for messages
        if (_logger.isDebugEnabled())
        {
            _logger.debug("Checking for " + expectedDepth + " messages with QueueBrowser");
        }

        //Check what the session believes the queue count to be.
        long queueDepth = 0;

        try
        {
            queueDepth = ((AMQSession) _clientSession).getQueueDepth((AMQDestination) _queue);
        }
        catch (AMQException e)
        {
        }

        assertEquals("Session reports Queue expectedDepth not as expected", expectedDepth, queueDepth);



        // Browse the queue to get a second opinion
        int msgCount = 0;
        Enumeration msgs = queueBrowser.getEnumeration();

        while (msgs.hasMoreElements())
        {
            msgs.nextElement();
            msgCount++;
        }

        if (_logger.isDebugEnabled())
        {
            _logger.debug("Found " + msgCount + " messages total in browser");
        }

        // check to see if all messages found
        assertEquals("Browser did not find all messages", expectedDepth, msgCount);

        //Close browser
        queueBrowser.close();
    }

    protected void closeBrowserBeforeAfterGetNext(int messageCount) throws JMSException
    {
        QueueBrowser queueBrowser = _clientSession.createBrowser(_queue);

        Enumeration msgs = queueBrowser.getEnumeration();

        int msgCount = 0;

        while (msgs.hasMoreElements() && msgCount < messageCount)
        {
            msgs.nextElement();
            msgCount++;
        }

        try
        {
            queueBrowser.close();
        }
        catch (JMSException e)
        {
            fail("Close should happen without error:" + e.getMessage());
        }
    }

    /**
     * This method checks that multiple calls to getEnumeration() on a queueBrowser provide the same behaviour.
     *
     * @param sentMessages The number of messages sent
     * @param browserEnumerationCount The number of times to call getEnumeration()
     * @throws JMSException
     */
    protected void checkMultipleGetEnum(int sentMessages, int browserEnumerationCount) throws JMSException
    {
        QueueBrowser queueBrowser = _clientSession.createBrowser(_queue);

        for (int count = 0; count < browserEnumerationCount; count++)
        {
            _logger.info("Checking getEnumeration:" + count);
            Enumeration msgs = queueBrowser.getEnumeration();

            int msgCount = 0;

            while (msgs.hasMoreElements())
            {
                msgs.nextElement();
                msgCount++;
            }

            // Verify that the browser can see all the messages sent.
            assertEquals(sentMessages, msgCount);
        }

        try
        {
            queueBrowser.close();
        }
        catch (JMSException e)
        {
            fail("Close should happen without error:" + e.getMessage());
        }
    }

    protected void checkOverlappingMultipleGetEnum(int expectedMessages, int browserEnumerationCount) throws JMSException
    {
        checkOverlappingMultipleGetEnum(expectedMessages, browserEnumerationCount, null);
    }

    protected void checkOverlappingMultipleGetEnum(int expectedMessages, int browserEnumerationCount, String selector) throws JMSException
    {
        QueueBrowser queueBrowser = selector == null ?
                                _clientSession.createBrowser(_queue) : _clientSession.createBrowser(_queue);
//                _clientSession.createBrowser(_queue) : _clientSession.createBrowser(_queue, selector);

        Enumeration[] msgs = new Enumeration[browserEnumerationCount];
        int[] msgCount = new int[browserEnumerationCount];

        //create Enums
        for (int count = 0; count < browserEnumerationCount; count++)
        {
            msgs[count] = queueBrowser.getEnumeration();
        }

        //interleave reads
        for (int cnt = 0; cnt < expectedMessages; cnt++)
        {
            for (int count = 0; count < browserEnumerationCount; count++)
            {
                if (msgs[count].hasMoreElements())
                {
                    msgs[count].nextElement();
                    msgCount[count]++;
                }
            }
        }

        //validate all browsers get right message count.
        for (int count = 0; count < browserEnumerationCount; count++)
        {
            assertEquals(msgCount[count], expectedMessages);
        }

        try
        {
            queueBrowser.close();
        }
        catch (JMSException e)
        {
            fail("Close should happen without error:" + e.getMessage());
        }
    }

    protected void validate(int messages) throws JMSException
    {
        //Create a new connection to validate message content
        Connection connection = null;

        try
        {
            connection = getConnection();
        }
        catch (Exception e)
        {
            fail("Unable to make validation connection");
        }

        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        connection.start();

        MessageConsumer consumer = session.createConsumer(_queue);

        _logger.info("Verify messages are still on the queue");

        Message tempMsg;

        for (int msgCount = 0; msgCount < messages; msgCount++)
        {
            tempMsg = (TextMessage) consumer.receive(RECEIVE_TIMEOUT);
            if (tempMsg == null)
            {
                fail("Message " + msgCount + " not retrieved from queue");
            }
        }

        //Close this new connection
        connection.close();

        _logger.info("All messages recevied from queue");

        //ensure no message left.
        checkQueueDepth(0);
    }

    protected void checkQueueDepthWithSelectors(int totalMessages, int clients) throws JMSException
    {

        String selector = MESSAGE_ID_PROPERTY + " % " + clients;

        checkOverlappingMultipleGetEnum(totalMessages / clients, clients, selector);
    }


    /**
     * This tests you can browse an empty queue, see QPID-785
     *
     * @throws Exception
     */
    public void testBrowsingEmptyQueue() throws Exception
    {
        checkQueueDepth(0);
    }

    /*
    * Test Messages Remain on Queue
    * Create a queu and send messages to it. Browse them and then receive them all to verify they were still there
    *
    */
    public void testQueueBrowserMsgsRemainOnQueue() throws Exception
    {
        int messages = 10;

        sendMessages(messages);

        checkQueueDepth(messages);

        validate(messages);
    }


    public void testClosingBrowserMidReceiving() throws NamingException, JMSException
    {
        int messages = 100;

        sendMessages(messages);

        checkQueueDepth(messages);

        closeBrowserBeforeAfterGetNext(10);

        validate(messages);
    }

    /**
     * This tests that multiple getEnumerations on a QueueBrowser return the required number of messages.
     * @throws NamingException
     * @throws JMSException
     */
    public void testMultipleGetEnum() throws NamingException, JMSException
    {
        int messages = 10;

        sendMessages(messages);

        checkQueueDepth(messages);

        checkMultipleGetEnum(messages, 5);

        validate(messages);
    }

    public void testMultipleOverlappingGetEnum() throws NamingException, JMSException
    {
        int messages = 25;

        sendMessages(messages);

        checkQueueDepth(messages);

        checkOverlappingMultipleGetEnum(messages, 5);

        validate(messages);
    }


    public void testBrowsingWithSelector() throws JMSException
    {
        int messages = 40;

        sendMessages(messages);

        checkQueueDepth(messages);

        for (int clients = 2; clients <= 10; clients++)
        {
            checkQueueDepthWithSelectors(messages, clients);
        }

        validate(messages);
    }

    /**
     * Testing that a QueueBrowser doesn't actually consume messages from a broker when it fails over.
     * @throws JMSException
     */
    public void testFailoverWithQueueBrowser() throws JMSException
    {
        int messages = 5;

        sendMessages("connection1", messages);
        if (!CLUSTERED)
        {
            sendMessages("connection2", messages);
        }

        checkQueueDepth(messages);

        _logger.info("Creating Queue Browser");
        QueueBrowser queueBrowser = _clientSession.createBrowser(_queue);

        long queueDepth = 0;

        try
        {
            queueDepth = ((AMQSession) _clientSession).getQueueDepth((AMQDestination) _queue);
        }
        catch (AMQException e)
        {
            fail("Caught exception getting queue depth: " + e.getMessage());
        }

        assertEquals("Session reports Queue depth not as expected", messages, queueDepth);

        int msgCount = 0;
        int failPoint = 0;

        failPoint = new Random().nextInt(messages) + 1;

        Enumeration msgs = queueBrowser.getEnumeration();
        while (msgs.hasMoreElements())
        {
            msgs.nextElement();
            msgCount++;

            if (msgCount == failPoint)
            {
                failBroker(getFailingPort());
            }
        }

        assertTrue("We should get atleast " + messages + " msgs.", msgCount >= messages);

        if (_logger.isDebugEnabled())
        {
            _logger.debug("QBAAT Found " + msgCount + " messages total in browser");
        }

        //Close browser
        queueBrowser.close();

        _logger.info("Closed Queue Browser, validating messages on broker.");

        //Validate all messages still on Broker
        validate(messages);
    }

    public void testFailoverAsQueueBrowserCreated() throws JMSException
    {
        // The IoServiceListenerSupport seems to get stuck in with a managedSession that isn't closing when requested.
        // So it hangs waiting for the session.
        int messages = 50;

        sendMessages("connection1", messages);
        if (!CLUSTERED)
        {
            sendMessages("connection2", messages);
        }

        failBroker(getFailingPort());

        checkQueueDepth(messages);

        //Validate all messages still on Broker 1
        validate(messages);
    }
}
TOP

Related Classes of org.apache.qpid.test.client.QueueBrowserAutoAckTest

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.