Package org.apache.qpid.requestreply1

Source Code of org.apache.qpid.requestreply1.ServiceRequestingClient$CallbackHandler

/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.requestreply1;

import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.jms.MessageConsumer;
import org.apache.qpid.jms.MessageProducer;
import org.apache.qpid.jms.Session;

import javax.jms.*;
import java.net.InetAddress;
import java.net.UnknownHostException;

/**
* A client that behaves as follows:
* <ul><li>Connects to a queue, whose name is specified as a cmd-line argument</li>
* <li>Creates a temporary queue</li>
* <li>Creates messages containing a property that is the name of the temporary queue</li>
* <li>Fires off a message on the original queue and waits for a response on the temporary queue</li>
* </ul>
*
*/
public class ServiceRequestingClient implements ExceptionListener
{
    private static final Logger _log = Logger.getLogger(ServiceRequestingClient.class);

    private static final String MESSAGE_DATA_BYTES = "jfd ghljgl hjvhlj cvhvjf ldhfsj lhfdsjf hldsjfk hdslkfj hsdflk  ";

    private String MESSAGE_DATA;

    private AMQConnection _connection;

    private Session _session;

    private long _averageLatency;

    private int _messageCount;

    private volatile boolean _completed;

    private AMQDestination _tempDestination;

    private MessageProducer _producer;

    private Object _waiter;

    private static String createMessagePayload(int size)
    {
        _log.info("Message size set to " + size + " bytes");
        StringBuffer buf = new StringBuffer(size);
        int count = 0;
        while (count < size + MESSAGE_DATA_BYTES.length())
        {
            buf.append(MESSAGE_DATA_BYTES);
            count += MESSAGE_DATA_BYTES.length();
        }
        if (count < size)
        {
            buf.append(MESSAGE_DATA_BYTES, 0, size - count);
        }

        return buf.toString();
    }

    private class CallbackHandler implements MessageListener
    {
        private int _expectedMessageCount;

        private int _actualMessageCount;

        private long _startTime;

        public CallbackHandler(int expectedMessageCount, long startTime)
        {
            _expectedMessageCount = expectedMessageCount;
            _startTime = startTime;
        }

        public void onMessage(Message m)
        {
            if (_log.isDebugEnabled())
            {
                _log.debug("Message received: " + m);
            }
            try
            {
                if (m.propertyExists("timeSent"))
                {
                    long timeSent = Long.parseLong(m.getStringProperty("timeSent"));
                    long now = System.currentTimeMillis();
                    if (_averageLatency == 0)
                    {
                        _averageLatency = now - timeSent;
                        _log.info("Latency " + _averageLatency);
                    }
                    else
                    {
                        _log.info("Individual latency: " + (now - timeSent));
                        _averageLatency = (_averageLatency + (now - timeSent)) / 2;
                        _log.info("Average latency now: " + _averageLatency);
                    }
                }
            }
            catch (JMSException e)
            {
                _log.error("Error getting latency data: " + e, e);
            }
            _actualMessageCount++;
            if (_actualMessageCount % 1000 == 0)
            {
                _log.info("Received message count: " + _actualMessageCount);
            }

            if (_actualMessageCount == _expectedMessageCount)
            {
                _completed = true;
                notifyWaiter();
                long timeTaken = System.currentTimeMillis() - _startTime;
                _log.info("Total time taken to receive " + _expectedMessageCount + " messages was " +
                          timeTaken + "ms, equivalent to " +
                          (_expectedMessageCount / (timeTaken / 1000.0)) + " messages per second");

                try
                {
                    _connection.close();
                    _log.info("Connection closed");
                }
                catch (JMSException e)
                {
                    _log.error("Error closing connection");
                }
            }
        }
    }

    private void notifyWaiter()
    {
        if (_waiter != null)
        {
            synchronized (_waiter)
            {
                _waiter.notify();
            }
        }
    }
    public ServiceRequestingClient(String brokerHosts, String clientID, String username, String password,
                                   String vpath, String commandQueueName,
                                   final int messageCount, final int messageDataLength) throws AMQException, URLSyntaxException
    {
        _messageCount = messageCount;
        MESSAGE_DATA = createMessagePayload(messageDataLength);
        try
        {
            createConnection(brokerHosts, clientID, username, password, vpath);
            _session = (Session) _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);


            _connection.setExceptionListener(this);


            AMQQueue destination = new AMQQueue(commandQueueName);
            _producer = (MessageProducer) _session.createProducer(destination);
            _producer.setDisableMessageTimestamp(true);
            _producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

            _tempDestination = new AMQQueue("TempResponse" +
                                            Long.toString(System.currentTimeMillis()), true);
            MessageConsumer messageConsumer = (MessageConsumer) _session.createConsumer(_tempDestination, 100, true,
                                                                                        true, null);

            //Send first message, then wait a bit to allow the provider to get initialised
            TextMessage first = _session.createTextMessage(MESSAGE_DATA);
            first.setJMSReplyTo(_tempDestination);
            _producer.send(first);
            try
            {
                Thread.sleep(1000);
            }
            catch (InterruptedException ignore)
            {
            }

            //now start the clock and the test...
            final long startTime = System.currentTimeMillis();

            messageConsumer.setMessageListener(new CallbackHandler(messageCount, startTime));
        }
        catch (JMSException e)
        {
            e.printStackTrace()//To change body of catch statement use File | Settings | File Templates.
        }
    }

    /**
     * Run the test and notify an object upon receipt of all responses.
     * @param waiter the object that will be notified
     * @throws JMSException
     */
    public void run(Object waiter) throws JMSException
    {
        _waiter = waiter;
        _connection.start();
        for (int i = 1; i < _messageCount; i++)
        {
            TextMessage msg = _session.createTextMessage(MESSAGE_DATA + i);
            msg.setJMSReplyTo(_tempDestination);
            if (i % 1000 == 0)
            {
                long timeNow = System.currentTimeMillis();
                msg.setStringProperty("timeSent", String.valueOf(timeNow));
            }
            _producer.send(msg);
        }
        _log.info("Finished sending " + _messageCount + " messages");
    }

    public boolean isCompleted()
    {
        return _completed;
    }

    private void createConnection(String brokerHosts, String clientID, String username, String password,
                                  String vpath) throws AMQException, URLSyntaxException
    {
        _connection = new AMQConnection(brokerHosts, username, password,
                                        clientID, vpath);
    }

    /**
     * @param args argument 1 if present specifies the name of the temporary queue to create. Leaving it blank
     *             means the server will allocate a name.
     */
    public static void main(String[] args)
    {
        if (args.length < 6)
        {
            System.err.println(
                    "Usage: ServiceRequestingClient <brokerDetails - semicolon separated host:port list> <username> <password> <vpath> <command queue name> <number of messages> <message size>");
        }
        try
        {
            int messageDataLength = args.length > 6 ? Integer.parseInt(args[6]) : 4096;

            InetAddress address = InetAddress.getLocalHost();
            String clientID = address.getHostName() + System.currentTimeMillis();
            ServiceRequestingClient client = new ServiceRequestingClient(args[0], clientID, args[1], args[2], args[3],
                                                                         args[4], Integer.parseInt(args[5]),
                                                                         messageDataLength);
            Object waiter = new Object();
            client.run(waiter);
            synchronized (waiter)
            {
                while (!client.isCompleted())
                {
                    waiter.wait();
                }
            }

        }
        catch (UnknownHostException e)
        {
            e.printStackTrace()//To change body of catch statement use File | Settings | File Templates.
        }
        catch (Exception e)
        {
            System.err.println("Error in client: " + e);
            e.printStackTrace();
        }
    }

     /**
     * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
     */
    public void onException(JMSException e)
    {
        System.err.println(e.getMessage());
        e.printStackTrace(System.err);
    }
}
TOP

Related Classes of org.apache.qpid.requestreply1.ServiceRequestingClient$CallbackHandler

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.