Package org.apache.qpid.fragmentation

Source Code of org.apache.qpid.fragmentation.TestLargePublisher$CallbackHandler

/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.fragmentation;

import org.apache.qpid.AMQException;
import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQTopic;
import org.apache.qpid.jms.MessageProducer;
import org.apache.qpid.jms.Session;
import org.apache.log4j.Logger;

import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import java.net.InetAddress;
import java.net.UnknownHostException;

/**
* A client that behaves as follows:
* <ul><li>Connects to a queue, whose name is specified as a cmd-line argument</li>
* <li>Creates a temporary queue</li>
* <li>Creates messages containing a property that is the name of the temporary queue</li>
* <li>Fires off a message on the original queue and waits for a response on the temporary queue</li>
* </ul>
*/
public class TestLargePublisher
{
    private static final Logger _log = Logger.getLogger(TestLargePublisher.class);

    private AMQConnection _connection;

    private Session _session;

    private class CallbackHandler implements MessageListener
    {
        private int _expectedMessageCount;

        private int _actualMessageCount;

        private long _startTime;

        public CallbackHandler(int expectedMessageCount, long startTime)
        {
            _expectedMessageCount = expectedMessageCount;
            _startTime = startTime;
        }

        public void onMessage(Message m)
        {
            if (_log.isDebugEnabled())
            {
                _log.debug("Message received: " + m);
            }
            _actualMessageCount++;
            if (_actualMessageCount%1000 == 0)
            {
                _log.info("Received message count: " + _actualMessageCount);
            }
            /*if (!"henson".equals(m.toString()))
           {
               _log.error("AbstractJMSMessage response not correct: expected 'henson' but got " + m.toString());
           }
           else
           {
               if (_log.isDebugEnabled())
               {
                   _log.debug("AbstractJMSMessage " + m + " received");
               }
               else
               {
                   _log.info("AbstractJMSMessage received");
               }
           } */

            if (_actualMessageCount == _expectedMessageCount)
            {
                long timeTaken = System.currentTimeMillis() - _startTime;
                System.out.println("Total time taken to receive " + _expectedMessageCount+ " messages was " +
                                   timeTaken + "ms, equivalent to " +
                                   (_expectedMessageCount/(timeTaken/1000.0)) + " messages per second");
            }
        }
    }

    public TestLargePublisher(String host, int port, String clientID,
                              final int messageCount) throws AMQException,URLSyntaxException
    {
        try
        {
            createConnection(host, port, clientID);
           
            _session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            AMQTopic destination = new AMQTopic("large");
            MessageProducer producer = (MessageProducer) _session.createProducer(destination);

            _connection.start();
            //TextMessage msg = _session.createTextMessage(tempDestination.getQueueName() + "/Presented to in conjunction with Mahnah Mahnah and the Snowths");
            final long startTime = System.currentTimeMillis();

            for (int i = 0; i < messageCount; i++)
            {
                BytesMessage msg = _session.createBytesMessage();
                populateMessage(msg);
                producer.send(msg);
            }
            _log.info("Finished sending " + messageCount + " messages");
        }
        catch (JMSException e)
        {
            e.printStackTrace();
        }
    }

    private void createConnection(String host, int port, String clientID) throws AMQException , URLSyntaxException
    {
        _connection = new AMQConnection(host, port, "guest", "guest",
                                        clientID, "/test");
    }

    private void populateMessage(BytesMessage msg) throws JMSException
    {
        int size = 1024 * 187; // 187k
        byte[] data = new byte[size];
        for (int i = 0; i < data.length; i++)
        {
            data[i] = (byte)(i%25);
        }
        msg.writeBytes(data);
    }

    /**
     *
     * @param args argument 1 if present specifies the name of the temporary queue to create. Leaving it blank
     * means the server will allocate a name.
     */
    public static void main(String[] args) throws URLSyntaxException
    {
        final String host;
        final int port;
        final int numMessages;
        if (args.length == 0)
        {
            host = "localhost";
            port = 5672;
            numMessages = 100;
//            System.err.println("Usage: TestLargePublisher <host> <port> <number of messages>");
        }
        else
        {
            host = args[0];
            port = Integer.parseInt(args[1]);
            numMessages = Integer.parseInt(args[2]);
        }

        try
        {
            InetAddress address = InetAddress.getLocalHost();
            String clientID = address.getHostName() + System.currentTimeMillis();
            TestLargePublisher client = new TestLargePublisher(host, port, clientID, numMessages);
        }
        catch (UnknownHostException e)
        {
            e.printStackTrace()//To change body of catch statement use File | Settings | File Templates.
        }
        catch (AMQException e)
        {
            System.err.println("Error in client: " + e);
            e.printStackTrace();
        }

        //System.exit(0);
    }
}
TOP

Related Classes of org.apache.qpid.fragmentation.TestLargePublisher$CallbackHandler

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.