Package org.apache.qpid.requestreply

Source Code of org.apache.qpid.requestreply.PingPongProducer$PerCorrelationId

/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.requestreply;

import org.apache.log4j.Logger;
import org.apache.log4j.NDC;

import org.apache.qpid.test.framework.TestUtils;

import org.apache.qpid.junit.extensions.BatchedThrottle;
import org.apache.qpid.junit.extensions.Throttle;
import org.apache.qpid.junit.extensions.util.CommandLineParser;
import org.apache.qpid.junit.extensions.util.ParsedProperties;

import javax.jms.*;
import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;

import java.io.*;
import java.net.InetAddress;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

/**
* PingPongProducer is a client that sends test messages, and waits for replies to these messages. The replies may
* either be generated by another client (see {@link PingPongBouncer}, or an extension of it may be used that listens
* to its own messages and does not send replies (see {@link org.apache.qpid.ping.PingClient}). The intention of ping
* pong producer is that it is a swiss-army knife test client that makes almost every aspect of its behaviour
* configurable.
*
* <p/>The pings are sent with a reply-to field set to a single temporary queue, which is the same for all pings. This
* means that this class has to do some work to correlate pings with pongs; it expectes the original message correlation
* id in the ping to be bounced back in the reply correlation id.
*
* <p/>This ping tool accepts a vast number of configuration options, all of which are passed in to the constructor. It
* can ping topics or queues; ping multiple destinations; do persistent pings; send messages of any size; do pings within
* transactions; control the number of pings to send in each transaction; limit its sending rate; and perform failover
* testing. A complete list of accepted parameters, default values and comments on their usage is provided here:
*
* <p/><table><caption>Parameters</caption>
* <tr><th> Parameter        <th> Default  <th> Comments
* <tr><td> messageSize      <td> 0        <td> Message size in bytes. Not including any headers.
* <tr><td> destinationName  <td> ping     <td> The root name to use to generate destination names to ping.
* <tr><td> persistent       <td> false    <td> Determines whether peristent delivery is used.
* <tr><td> transacted       <td> false    <td> Determines whether messages are sent/received in transactions.
* <tr><td> broker           <td> tcp://localhost:5672 <td> Determines the broker to connect to.
* <tr><td> virtualHost      <td> test     <td> Determines the virtual host to send all ping over.
* <tr><td> rate             <td> 0        <td> The maximum rate (in hertz) to send messages at. 0 means no limit.
* <tr><td> verbose          <td> false    <td> The verbose flag for debugging. Prints to console on every message.
* <tr><td> pubsub           <td> false    <td> Whether to ping topics or queues. Uses p2p by default.
* <tr><td> failAfterCommit  <td> false    <td> Whether to prompt user to kill broker after a commit batch.
* <tr><td> failBeforeCommit <td> false    <td> Whether to prompt user to kill broker before a commit batch.
* <tr><td> failAfterSend    <td> false    <td> Whether to prompt user to kill broker after a send.
* <tr><td> failBeforeSend   <td> false    <td> Whether to prompt user to kill broker before a send.
* <tr><td> failOnce         <td> true     <td> Whether to prompt for failover only once.
* <tr><td> username         <td> guest    <td> The username to access the broker with.
* <tr><td> password         <td> guest    <td> The password to access the broker with.
* <tr><td> selector         <td> null     <td> Not used. Defines a message selector to filter pings with.
* <tr><td> destinationCount <td> 1        <td> The number of destinations to send pings to.
* <tr><td> numConsumers     <td> 1        <td> The number of consumers on each destination.
* <tr><td> timeout          <td> 30000    <td> In milliseconds. The timeout to stop waiting for replies.
* <tr><td> commitBatchSize  <td> 1        <td> The number of messages per transaction in transactional mode.
* <tr><td> uniqueDests      <td> true     <td> Whether each receivers only listens to one ping destination or all.
* <tr><td> durableDests     <td> false    <td> Whether or not durable destinations are used.
* <tr><td> ackMode          <td> AUTO_ACK <td> The message acknowledgement mode. Possible values are:
*                                               0 - SESSION_TRANSACTED
*                                               1 - AUTO_ACKNOWLEDGE
*                                               2 - CLIENT_ACKNOWLEDGE
*                                               3 - DUPS_OK_ACKNOWLEDGE
*                                               257 - NO_ACKNOWLEDGE
*                                               258 - PRE_ACKNOWLEDGE
* <tr><td> consTransacted   <td> false    <td> Whether or not consumers use transactions. Defaults to the same value
*                                              as the 'transacted' option if not seperately defined.
* <tr><td> consAckMode      <td> AUTO_ACK <td> The message acknowledgement mode for consumers. Defaults to the same
*                                              value as 'ackMode' if not seperately defined.
* <tr><td> maxPending       <td> 0        <td> The maximum size in bytes, of messages sent but not yet received.
*                                              Limits the volume of messages currently buffered on the client
*                                              or broker. Can help scale test clients by limiting amount of buffered
*                                              data to avoid out of memory errors.
* </table>
*
* <p/>This implements the Runnable interface with a run method that implements an infinite ping loop. The ping loop
* does all its work through helper methods, so that code wishing to run a ping-pong cycle is not forced to do so by
* starting a new thread. The command line invocation does take advantage of this ping loop. A shutdown hook is also
* registered to terminate the ping-pong loop cleanly.
*
* <p/><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations
* <tr><td> Provide a ping and wait for all responses cycle.
* <tr><td> Provide command line invocation to loop the ping cycle on a configurable broker url.
* </table>
*
* @todo Use read/write lock in the onmessage, not for reading writing but to make use of a shared and exlcusive lock pair.
*       Obtain read lock on all messages, before decrementing the message count. At the end of the on message method add a
*       block that obtains the write lock for the very last message, releases any waiting producer. Means that the last
*       message waits until all other messages have been handled before releasing producers but allows messages to be
*       processed concurrently, unlike the current synchronized block.
*/
public class PingPongProducer implements Runnable, ExceptionListener
{
    /** Used for debugging. */
    private static final Logger log = Logger.getLogger(PingPongProducer.class);

    /** Holds the name of the property to determine whether of not client id is overridden at connection time.  */
    public static final String OVERRIDE_CLIENT_ID_PROPNAME = "overrideClientId";

    /** Holds the default value of the override client id flag. */
    public static final String OVERRIDE_CLIENT_ID_DEAFULT = "false";

    /** Holds the name of the property to define the JNDI factory name with. */
    public static final String FACTORY_NAME_PROPNAME = "factoryName";

    /** Holds the default JNDI name of the connection factory. */
    public static final String FACTORY_NAME_DEAFULT = "local";

    /** Holds the name of the property to set the JNDI initial context properties with. */
    public static final String FILE_PROPERTIES_PROPNAME = "properties";

    /** Holds the default file name of the JNDI initial context properties. */
    public static final String FILE_PROPERTIES_DEAFULT = "perftests.properties";

    /** Holds the name of the property to get the test message size from. */
    public static final String MESSAGE_SIZE_PROPNAME = "messageSize";

    /** Used to set up a default message size. */
    public static final int MESSAGE_SIZE_DEAFULT = 0;

    /** Holds the name of the property to get the ping queue name from. */
    public static final String PING_QUEUE_NAME_PROPNAME = "destinationName";

    /** Holds the name of the default destination to send pings on. */
    public static final String PING_QUEUE_NAME_DEFAULT = "ping";

    /** Holds the name of the property to get the queue name postfix from. */
    public static final String QUEUE_NAME_POSTFIX_PROPNAME = "queueNamePostfix";

    /** Holds the default queue name postfix value. */
    public static final String QUEUE_NAME_POSTFIX_DEFAULT = "";

    /** Holds the name of the property to get the test delivery mode from. */
    public static final String PERSISTENT_MODE_PROPNAME = "persistent";

    /** Holds the message delivery mode to use for the test. */
    public static final boolean PERSISTENT_MODE_DEFAULT = false;

    /** Holds the name of the property to get the test transactional mode from. */
    public static final String TRANSACTED_PROPNAME = "transacted";

    /** Holds the transactional mode to use for the test. */
    public static final boolean TRANSACTED_DEFAULT = false;

    /** Holds the name of the property to get the test consumer transacted mode from. */
    public static final String CONSUMER_TRANSACTED_PROPNAME = "consTransacted";

    /** Holds the consumer transactional mode default setting. */
    public static final boolean CONSUMER_TRANSACTED_DEFAULT = false;

    /** Holds the name of the property to get the test broker url from. */
    public static final String BROKER_PROPNAME = "broker";

    /** Holds the default broker url for the test. */
    public static final String BROKER_DEFAULT = "tcp://localhost:5672";

    /** Holds the name of the property to get the test broker virtual path. */
    public static final String VIRTUAL_HOST_PROPNAME = "virtualHost";

    /** Holds the default virtual path for the test. */
    public static final String VIRTUAL_HOST_DEFAULT = "";

    /** Holds the name of the property to get the message rate from. */
    public static final String RATE_PROPNAME = "rate";

    /** Defines the default rate (in pings per second) to send pings at. 0 means as fast as possible, no restriction. */
    public static final int RATE_DEFAULT = 0;

    /** Holds the name of the property to get the verbose mode proeprty from. */
    public static final String VERBOSE_PROPNAME = "verbose";

    /** Holds the default verbose mode. */
    public static final boolean VERBOSE_DEFAULT = false;

    /** Holds the name of the property to get the p2p or pub/sub messaging mode from. */
    public static final String PUBSUB_PROPNAME = "pubsub";

    /** Holds the pub/sub mode default, true means ping a topic, false means ping a queue. */
    public static final boolean PUBSUB_DEFAULT = false;

    /** Holds the name of the property to get the fail after commit flag from. */
    public static final String FAIL_AFTER_COMMIT_PROPNAME = "failAfterCommit";

    /** Holds the default failover after commit test flag. */
    public static final boolean FAIL_AFTER_COMMIT_DEFAULT = false;

    /** Holds the name of the proeprty to get the fail before commit flag from. */
    public static final String FAIL_BEFORE_COMMIT_PROPNAME = "failBeforeCommit";

    /** Holds the default failover before commit test flag. */
    public static final boolean FAIL_BEFORE_COMMIT_DEFAULT = false;

    /** Holds the name of the proeprty to get the fail after send flag from. */
    public static final String FAIL_AFTER_SEND_PROPNAME = "failAfterSend";

    /** Holds the default failover after send test flag. */
    public static final boolean FAIL_AFTER_SEND_DEFAULT = false;

    /** Holds the name of the property to get the fail before send flag from. */
    public static final String FAIL_BEFORE_SEND_PROPNAME = "failBeforeSend";

    /** Holds the default failover before send test flag. */
    public static final boolean FAIL_BEFORE_SEND_DEFAULT = false;

    /** Holds the name of the property to get the fail once flag from. */
    public static final String FAIL_ONCE_PROPNAME = "failOnce";

    /** The default failover once flag, true means only do one failover, false means failover on every commit cycle. */
    public static final boolean FAIL_ONCE_DEFAULT = true;

    /** Holds the name of the property to get the broker access username from. */
    public static final String USERNAME_PROPNAME = "username";

    /** Holds the default broker log on username. */
    public static final String USERNAME_DEFAULT = "guest";

    /** Holds the name of the property to get the broker access password from. */
    public static final String PASSWORD_PROPNAME = "password";

    /** Holds the default broker log on password. */
    public static final String PASSWORD_DEFAULT = "guest";

    /** Holds the name of the proeprty to get the. */
    public static final String SELECTOR_PROPNAME = "selector";

    /** Holds the default message selector. */
    public static final String SELECTOR_DEFAULT = "";

    /** Holds the name of the property to get the destination count from. */
    public static final String DESTINATION_COUNT_PROPNAME = "destinationCount";

    /** Defines the default number of destinations to ping. */
    public static final int DESTINATION_COUNT_DEFAULT = 1;

    /** Holds the name of the property to get the number of consumers per destination from. */
    public static final String NUM_CONSUMERS_PROPNAME = "numConsumers";

    /** Defines the default number consumers per destination. */
    public static final int NUM_CONSUMERS_DEFAULT = 1;

    /** Holds the name of the property to get the waiting timeout for response messages. */
    public static final String TIMEOUT_PROPNAME = "timeout";

    /** Default time to wait before assuming that a ping has timed out. */
    public static final long TIMEOUT_DEFAULT = 30000;

    /** Holds the name of the property to get the commit batch size from. */
    public static final String TX_BATCH_SIZE_PROPNAME = "commitBatchSize";

    /** Defines the default number of pings to send in each transaction when running transactionally. */
    public static final int TX_BATCH_SIZE_DEFAULT = 1;

    /** Holds the name of the property to get the unique destinations flag from. */
    public static final String UNIQUE_DESTS_PROPNAME = "uniqueDests";

    /** Defines the default value for the unique destinations property. */
    public static final boolean UNIQUE_DESTS_DEFAULT = true;

    /** Holds the name of the property to get the durable destinations flag from. */
    public static final String DURABLE_DESTS_PROPNAME = "durableDests";

    /** Defines the default value of the durable destinations flag. */
    public static final boolean DURABLE_DESTS_DEFAULT = false;

    /** Holds the name of the proeprty to get the message acknowledgement mode from. */
    public static final String ACK_MODE_PROPNAME = "ackMode";

    /** Defines the default message acknowledgement mode. */
    public static final int ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE;

    /** Holds the name of the property to get the consumers message acknowledgement mode from. */
    public static final String CONSUMER_ACK_MODE_PROPNAME = "consAckMode";

    /** Defines the default consumers message acknowledgement mode. */
    public static final int CONSUMER_ACK_MODE_DEFAULT = Session.AUTO_ACKNOWLEDGE;

    /** Holds the name of the property to get the maximum pending message size setting from. */
    public static final String MAX_PENDING_PROPNAME = "maxPending";

    /** Defines the default value for the maximum pending message size setting. 0 means no limit. */
    public static final int MAX_PENDING_DEFAULT = 0;

    /** Defines the default prefetch size to use when consuming messages. */
    public static final int PREFETCH_DEFAULT = 100;

    /** Defines the default value of the no local flag to use when consuming messages. */
    public static final boolean NO_LOCAL_DEFAULT = false;

    /** Defines the default value of the exclusive flag to use when consuming messages. */
    public static final boolean EXCLUSIVE_DEFAULT = false;

    /** Holds the name of the property to store nanosecond timestamps in ping messages with. */
    public static final String MESSAGE_TIMESTAMP_PROPNAME = "timestamp";

    /** Holds the name of the property to get the number of message to prefill the broker with before starting the main test. */
    public static final String PREFILL_PROPNAME = "preFill";

    /** Defines the default value for the number of messages to prefill. 0,default, no messages. */
    public static final int PREFILL_DEFAULT = 0;

    /** Holds the name of the property to get the delay to wait in ms before starting the main test after having prefilled. */
    public static final String DELAY_BEFORE_CONSUME_PROPNAME = "delayBeforeConsume";

    /** Defines the default value for delay in ms to wait before starting thet test run. 0,default, no delay. */
    public static final long  DELAY_BEFORE_CONSUME = 0;

    /** Holds the name of the property to get when no messasges should be sent. */
    public static final String CONSUME_ONLY_PROPNAME = "consumeOnly";

    /** Defines the default value of the consumeOnly flag to use when publishing messages is not desired. */
    public static final boolean CONSUME_ONLY_DEFAULT = false;

    /** Holds the name of the property to get when no messasges should be sent. */
    public static final String SEND_ONLY_PROPNAME = "sendOnly";

    /** Defines the default value of the consumeOnly flag to use when publishing messages is not desired. */
    public static final boolean SEND_ONLY_DEFAULT = false;

    /** Holds the default configuration properties. */
    public static ParsedProperties defaults = new ParsedProperties();

    static
    {
        defaults.setPropertyIfNull(OVERRIDE_CLIENT_ID_PROPNAME, OVERRIDE_CLIENT_ID_DEAFULT);
        defaults.setPropertyIfNull(FILE_PROPERTIES_PROPNAME, FILE_PROPERTIES_DEAFULT);
        defaults.setPropertyIfNull(FACTORY_NAME_PROPNAME, FACTORY_NAME_DEAFULT);
        defaults.setPropertyIfNull(BROKER_PROPNAME, BROKER_DEFAULT);
        defaults.setPropertyIfNull(USERNAME_PROPNAME, USERNAME_DEFAULT);
        defaults.setPropertyIfNull(PASSWORD_PROPNAME, PASSWORD_DEFAULT);
        defaults.setPropertyIfNull(VIRTUAL_HOST_PROPNAME, VIRTUAL_HOST_DEFAULT);
        defaults.setPropertyIfNull(PING_QUEUE_NAME_PROPNAME, PING_QUEUE_NAME_DEFAULT);
        defaults.setPropertyIfNull(QUEUE_NAME_POSTFIX_PROPNAME, QUEUE_NAME_POSTFIX_DEFAULT);
        defaults.setPropertyIfNull(SELECTOR_PROPNAME, SELECTOR_DEFAULT);
        defaults.setPropertyIfNull(TRANSACTED_PROPNAME, TRANSACTED_DEFAULT);
        defaults.setPropertyIfNull(CONSUMER_TRANSACTED_PROPNAME, CONSUMER_TRANSACTED_DEFAULT);
        defaults.setPropertyIfNull(PERSISTENT_MODE_PROPNAME, PERSISTENT_MODE_DEFAULT);
        defaults.setPropertyIfNull(ACK_MODE_PROPNAME, ACK_MODE_DEFAULT);
        defaults.setPropertyIfNull(CONSUMER_ACK_MODE_PROPNAME, CONSUMER_ACK_MODE_DEFAULT);
        defaults.setPropertyIfNull(MESSAGE_SIZE_PROPNAME, MESSAGE_SIZE_DEAFULT);
        defaults.setPropertyIfNull(VERBOSE_PROPNAME, VERBOSE_DEFAULT);
        defaults.setPropertyIfNull(PUBSUB_PROPNAME, PUBSUB_DEFAULT);
        defaults.setPropertyIfNull(UNIQUE_DESTS_PROPNAME, UNIQUE_DESTS_DEFAULT);
        defaults.setPropertyIfNull(DURABLE_DESTS_PROPNAME, DURABLE_DESTS_DEFAULT);
        defaults.setPropertyIfNull(FAIL_BEFORE_COMMIT_PROPNAME, FAIL_BEFORE_COMMIT_DEFAULT);
        defaults.setPropertyIfNull(FAIL_AFTER_COMMIT_PROPNAME, FAIL_AFTER_COMMIT_DEFAULT);
        defaults.setPropertyIfNull(FAIL_BEFORE_SEND_PROPNAME, FAIL_BEFORE_SEND_DEFAULT);
        defaults.setPropertyIfNull(FAIL_AFTER_SEND_PROPNAME, FAIL_AFTER_SEND_DEFAULT);
        defaults.setPropertyIfNull(FAIL_ONCE_PROPNAME, FAIL_ONCE_DEFAULT);
        defaults.setPropertyIfNull(TX_BATCH_SIZE_PROPNAME, TX_BATCH_SIZE_DEFAULT);
        defaults.setPropertyIfNull(DESTINATION_COUNT_PROPNAME, DESTINATION_COUNT_DEFAULT);
        defaults.setPropertyIfNull(NUM_CONSUMERS_PROPNAME, NUM_CONSUMERS_DEFAULT);
        defaults.setPropertyIfNull(RATE_PROPNAME, RATE_DEFAULT);
        defaults.setPropertyIfNull(TIMEOUT_PROPNAME, TIMEOUT_DEFAULT);
        defaults.setPropertyIfNull(MAX_PENDING_PROPNAME, MAX_PENDING_DEFAULT);
        defaults.setPropertyIfNull(PREFILL_PROPNAME, PREFILL_DEFAULT);
        defaults.setPropertyIfNull(DELAY_BEFORE_CONSUME_PROPNAME, DELAY_BEFORE_CONSUME);
        defaults.setPropertyIfNull(CONSUME_ONLY_PROPNAME, CONSUME_ONLY_DEFAULT);
        defaults.setPropertyIfNull(SEND_ONLY_PROPNAME, SEND_ONLY_DEFAULT);
    }

    /** Allows setting of client ID on the connection, rather than through the connection URL. */
    protected boolean _overrideClientId;

    /** Holds the JNDI name of the JMS connection factory. */
    protected String _factoryName;

    /** Holds the name of the properties file to configure JNDI with. */
    protected String _fileProperties;

    /** Holds the broker url. */
    protected String _brokerDetails;

    /** Holds the username to access the broker with. */
    protected String _username;

    /** Holds the password to access the broker with. */
    protected String _password;

    /** Holds the virtual host on the broker to run the tests through. */
    protected String _virtualpath;

    /** Holds the root name from which to generate test destination names. */
    protected String _destinationName;

    /** Holds the default queue name postfix value. */
    protected String _queueNamePostfix;

    /** Holds the message selector to filter the pings with. */
    protected String _selector;

    /** Holds the producers transactional mode flag. */
    protected boolean _transacted;

    /** Holds the consumers transactional mode flag. */
    protected boolean _consTransacted;

    /** Determines whether this producer sends persistent messages. */
    protected boolean _persistent;

    /** Holds the acknowledgement mode used for the producers. */
    protected int _ackMode;

    /** Holds the acknowledgement mode setting for the consumers. */
    protected int _consAckMode;

    /** Determines what size of messages this producer sends. */
    protected int _messageSize;

    /** Used to indicate that the ping loop should print out whenever it pings. */
    protected boolean _verbose;

    /** Flag used to indicate if this is a point to point or pub/sub ping client. */
    protected boolean _isPubSub;

    /** Flag used to indicate if the destinations should be unique client. */
    protected boolean _isUnique;

    /** Flag used to indicate that durable destination should be used. */
    protected boolean _isDurable;

    /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a commit. */
    protected boolean _failBeforeCommit;

    /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a commit. */
    protected boolean _failAfterCommit;

    /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover before a send. */
    protected boolean _failBeforeSend;

    /** Flag used to indicate that the user should be prompted to terminate a broker, to test failover after a send. */
    protected boolean _failAfterSend;

    /** Flag used to indicate that failover prompting should only be done on the first commit, not on every commit. */
    protected boolean _failOnce;

    /** Holds the number of sends that should be performed in every transaction when using transactions. */
    protected int _txBatchSize;

    /** Holds the number of destinations to ping. */
    protected int _noOfDestinations;

    /** Holds the number of consumers per destination. */
    protected int _noOfConsumers;

    /** Holds the maximum send rate in herz. */
    protected int _rate;

    /**
     * Holds the size of the maximum amount of pending data that the client should buffer, sending is suspended
     * if this limit is breached.
     */
    protected int _maxPendingSize;

    /**
     * Holds the number of messages to send during the setup phase, before the clients start consuming.
     */
    private Integer _preFill;

    /**
     * Holds the time in ms to wait after preFilling before starting thet test.
     */
    private Long _delayBeforeConsume;

    /**
     * Holds a boolean value of wither this test should just consume, i.e. skips
     * sending messages, but still expects to receive the specified number.
     */
    private boolean _consumeOnly;

    /**
     * Holds a boolean value of wither this test should just send, i.e. skips
     * consuming messages, but still creates clients just doesn't start them.
     */
    private boolean _sendOnly;


    /** A source for providing sequential unique correlation ids. These will be unique within the same JVM. */
    private static AtomicLong _correlationIdGenerator = new AtomicLong(0L);

    /** A source for providing sequential unqiue ids for instances of this class to be identifed with. */
    private static AtomicInteger _instanceIdGenerator = new AtomicInteger(0);

    /** Holds this instances unique id. */
    private int instanceId;

    /**
     * Holds a map from message ids to latches on which threads wait for replies. This map is shared accross multiple
     * ping producers on the same JVM.
     */
    private static Map<String, PerCorrelationId> perCorrelationIds =
        Collections.synchronizedMap(new HashMap<String, PerCorrelationId>());

    /** A convenient formatter to use when time stamping output. */
    protected static final DateFormat timestampFormatter = new SimpleDateFormat("hh:mm:ss:SS");

    /** Holds the connection for the message producer. */
    protected Connection _connection;

    /** Holds the consumer connections. */
    protected Connection[] _consumerConnection;

    /** Holds the controlSession on which ping replies are received. */
    protected Session[] _consumerSession;

    /** Holds the producer controlSession, needed to create ping messages. */
    protected Session _producerSession;

    /** Holds the destination where the response messages will arrive. */
    protected Destination _replyDestination;

    /** Holds the set of destinations that this ping producer pings. */
    protected List<Destination> _pingDestinations;

    /** Used to restrict the sending rate to a specified limit. */
    protected Throttle _rateLimiter;

    /** Holds a message listener that this message listener chains all its messages to. */
    protected ChainedMessageListener _chainedMessageListener = null;

    /**
     * This id generator is used to generate ids to append to the queue name to ensure that queues can be unique when
     * creating multiple ping producers in the same JVM.
     */
    protected static AtomicInteger _queueJVMSequenceID = new AtomicInteger();

    /**
     * This id generator is used to generates ids that are only unique within this pinger. Creating multiple pingers
     * on the same JVM using this id generator will allow them to ping on the same queues.
     */
    protected AtomicInteger _queueSharedID = new AtomicInteger();

    /** Used to tell the ping loop when to terminate, it only runs while this is true. */
    protected boolean _publish = true;

    /** Holds the message producer to send the pings through. */
    protected MessageProducer _producer;

    /** Holds the message consumer to receive the ping replies through. */
    protected MessageConsumer[] _consumer;

    /** The prompt to display when asking the user to kill the broker for failover testing. */
    private static final String KILL_BROKER_PROMPT = "Kill broker now, then press Return.";

    /** Holds the name for this test client to be identified to the broker with. */
    private String _clientID;

    /** Keeps count of the total messages sent purely for debugging purposes. */
    private static AtomicInteger numSent = new AtomicInteger();

    /**
     * Holds a monitor which is used to synchronize sender and receivers threads, where the sender has elected
     * to wait until the number of unreceived message is reduced before continuing to send. This monitor is a
     * fair SynchronousQueue becuase that provides fair scheduling, to ensure that all producer threads get an
     * equal chance to produce messages.
     */
    static final SynchronousQueue _sendPauseMonitor = new SynchronousQueue(true);

    /** Keeps a count of the number of message currently sent but not received. */
    static AtomicInteger _unreceived = new AtomicInteger(0);

    /**
     * Creates a ping producer with the specified parameters, of which there are many. See the class level comments
     * for details. This constructor creates a connection to the broker and creates producer and consumer sessions on
     * it, to send and recieve its pings and replies on.
     *
     * @param overrides Properties containing any desired overrides to the defaults.
     *
     * @throws Exception Any exceptions are allowed to fall through.
     */
    public PingPongProducer(Properties overrides) throws Exception
    {
        // log.debug("public PingPongProducer(Properties overrides = " + overrides + "): called");
        instanceId = _instanceIdGenerator.getAndIncrement();

        // Create a set of parsed properties from the defaults overriden by the passed in values.
        ParsedProperties properties = new ParsedProperties(defaults);
        properties.putAll(overrides);

        // Extract the configuration properties to set the pinger up with.
        _overrideClientId = properties.getPropertyAsBoolean(OVERRIDE_CLIENT_ID_PROPNAME);
        _factoryName = properties.getProperty(FACTORY_NAME_PROPNAME);
        _fileProperties = properties.getProperty(FILE_PROPERTIES_PROPNAME);
        _brokerDetails = properties.getProperty(BROKER_PROPNAME);
        _username = properties.getProperty(USERNAME_PROPNAME);
        _password = properties.getProperty(PASSWORD_PROPNAME);
        _virtualpath = properties.getProperty(VIRTUAL_HOST_PROPNAME);
        _destinationName = properties.getProperty(PING_QUEUE_NAME_PROPNAME);
        _queueNamePostfix = properties.getProperty(QUEUE_NAME_POSTFIX_PROPNAME);
        _selector = properties.getProperty(SELECTOR_PROPNAME);
        _transacted = properties.getPropertyAsBoolean(TRANSACTED_PROPNAME);
        _consTransacted = properties.getPropertyAsBoolean(CONSUMER_TRANSACTED_PROPNAME);
        _persistent = properties.getPropertyAsBoolean(PERSISTENT_MODE_PROPNAME);
        _messageSize = properties.getPropertyAsInteger(MESSAGE_SIZE_PROPNAME);
        _verbose = properties.getPropertyAsBoolean(VERBOSE_PROPNAME);
        _failAfterCommit = properties.getPropertyAsBoolean(FAIL_AFTER_COMMIT_PROPNAME);
        _failBeforeCommit = properties.getPropertyAsBoolean(FAIL_BEFORE_COMMIT_PROPNAME);
        _failAfterSend = properties.getPropertyAsBoolean(FAIL_AFTER_SEND_PROPNAME);
        _failBeforeSend = properties.getPropertyAsBoolean(FAIL_BEFORE_SEND_PROPNAME);
        _failOnce = properties.getPropertyAsBoolean(FAIL_ONCE_PROPNAME);
        _txBatchSize = properties.getPropertyAsInteger(TX_BATCH_SIZE_PROPNAME);
        _noOfDestinations = properties.getPropertyAsInteger(DESTINATION_COUNT_PROPNAME);
        _noOfConsumers = properties.getPropertyAsInteger(NUM_CONSUMERS_PROPNAME);
        _rate = properties.getPropertyAsInteger(RATE_PROPNAME);
        _isPubSub = properties.getPropertyAsBoolean(PUBSUB_PROPNAME);
        _isUnique = properties.getPropertyAsBoolean(UNIQUE_DESTS_PROPNAME);
        _isDurable = properties.getPropertyAsBoolean(DURABLE_DESTS_PROPNAME);
        _ackMode = _transacted ? 0 : properties.getPropertyAsInteger(ACK_MODE_PROPNAME);
        _consAckMode = _consTransacted ? 0 : properties.getPropertyAsInteger(CONSUMER_ACK_MODE_PROPNAME);
        _maxPendingSize = properties.getPropertyAsInteger(MAX_PENDING_PROPNAME);
        _preFill = properties.getPropertyAsInteger(PREFILL_PROPNAME);
        _delayBeforeConsume = properties.getPropertyAsLong(DELAY_BEFORE_CONSUME_PROPNAME);
        _consumeOnly = properties.getPropertyAsBoolean(CONSUME_ONLY_PROPNAME);
        _sendOnly = properties.getPropertyAsBoolean(SEND_ONLY_PROPNAME);

        // Check that one or more destinations were specified.
        if (_noOfDestinations < 1)
        {
            throw new IllegalArgumentException("There must be at least one destination.");
        }

        // Set up a throttle to control the send rate, if a rate > 0 is specified.
        if (_rate > 0)
        {
            _rateLimiter = new BatchedThrottle();
            _rateLimiter.setRate(_rate);
        }

        // Create the connection and message producers/consumers.
        // establishConnection(true, true);
    }

    /**
     * Establishes a connection to the broker and creates message consumers and producers based on the parameters
     * that this ping client was created with.
     *
     * @param producer Flag to indicate whether or not the producer should be set up.
     * @param consumer Flag to indicate whether or not the consumers should be set up.
     *
     * @throws Exception Any exceptions are allowed to fall through.
     */
    public void establishConnection(boolean producer, boolean consumer) throws Exception
    {
        // log.debug("public void establishConnection(): called");

        // Generate a unique identifying name for this client, based on it ip address and the current time.
        InetAddress address = InetAddress.getLocalHost();
        // _clientID = address.getHostName() + System.currentTimeMillis();
        _clientID = "perftest_" + instanceId;

        // Create a connection to the broker.
        createConnection(_clientID);

        // Create transactional or non-transactional sessions, based on the command line arguments.
        _producerSession = _connection.createSession(_transacted, _ackMode);

        _consumerSession = new Session[_noOfConsumers];

        for (int i = 0; i < _noOfConsumers; i++)
        {
            _consumerSession[i] = _consumerConnection[i].createSession(_consTransacted, _consAckMode);
        }

        // Create the destinations to send pings to and receive replies from.
        if (_noOfConsumers > 0)
        {
            _replyDestination = _consumerSession[0].createTemporaryQueue();
        }
        createPingDestinations(_noOfDestinations, _selector, _destinationName, _isUnique, _isDurable);

        // Create the message producer only if instructed to.
        if (producer)
        {
            createProducer();
        }

        // Create the message consumer only if instructed to.
        if (consumer)
        {
            createReplyConsumers(getReplyDestinations(), _selector);
        }
    }

    /**
     * Establishes a connection to the broker, based on the configuration parameters that this ping client was
     * created with.
     *
     * @param clientID The clients identifier.
     *
     * @throws JMSException Underlying exceptions allowed to fall through.
     * @throws NamingException Underlying exceptions allowed to fall through.
     * @throws IOException Underlying exceptions allowed to fall through.
     */
    protected void createConnection(String clientID) throws JMSException, NamingException, IOException
    {
        // _log.debug("protected void createConnection(String clientID = " + clientID + "): called");

        // _log.debug("Creating a connection for the message producer.");
        File propsFile = new File(_fileProperties);
        InputStream is = new FileInputStream(propsFile);
        Properties properties = new Properties();
        properties.load(is);

        Context context = new InitialContext(properties);
        ConnectionFactory factory = (ConnectionFactory) context.lookup(_factoryName);
        _connection = factory.createConnection(_username, _password);

        if (_overrideClientId)
        {
            _connection.setClientID(clientID);
        }

        // _log.debug("Creating " + _noOfConsumers + " connections for the consumers.");

        _consumerConnection = new Connection[_noOfConsumers];

        for (int i = 0; i < _noOfConsumers; i++)
        {
            _consumerConnection[i] = factory.createConnection(_username, _password);
            // _consumerConnection[i].setClientID(clientID);
        }
    }

    /**
     * Starts a ping-pong loop running from the command line. The bounce back client {@link PingPongBouncer} also needs
     * to be started to bounce the pings back again.
     *
     * @param args The command line arguments.
     */
    public static void main(String[] args)
    {
        try
        {
            Properties options =
                CommandLineParser.processCommandLine(args, new CommandLineParser(new String[][] {}), System.getProperties());

            // Create a ping producer overriding its defaults with all options passed on the command line.
            PingPongProducer pingProducer = new PingPongProducer(options);
            pingProducer.establishConnection(true, true);

            // Start the ping producers dispatch thread running.
            pingProducer._connection.start();

            // Create a shutdown hook to terminate the ping-pong producer.
            Runtime.getRuntime().addShutdownHook(pingProducer.getShutdownHook());

            // Ensure that the ping pong producer is registered to listen for exceptions on the connection too.
            pingProducer._connection.setExceptionListener(pingProducer);

            // Create the ping loop thread and run it until it is terminated by the shutdown hook or exception.
            Thread pingThread = new Thread(pingProducer);
            pingThread.run();
            pingThread.join();
        }
        catch (Exception e)
        {
            System.err.println(e.getMessage());
            log.error("Top level handler caught execption.", e);
            System.exit(1);
        }
    }

    /**
     * Convenience method for a short pause.
     *
     * @param sleepTime The time in milliseconds to pause for.
     */
    public static void pause(long sleepTime)
    {
        if (sleepTime > 0)
        {
            try
            {
                Thread.sleep(sleepTime);
            }
            catch (InterruptedException ie)
            { }
        }
    }

    /**
     * Gets all the reply destinations (to listen for replies on). In this case this will just be the single reply to
     * destination of this pinger.
     *
     * @return The single reply to destination of this pinger, wrapped in a list.
     */
    public List<Destination> getReplyDestinations()
    {
        // log.debug("public List<Destination> getReplyDestinations(): called");

        List<Destination> replyDestinations = new ArrayList<Destination>();
        replyDestinations.add(_replyDestination);

        // log.debug("replyDestinations = " + replyDestinations);

        return replyDestinations;
    }

    /**
     * Creates the producer to send the pings on. This is created without a default destination. Its persistent delivery
     * flag is set accoring the ping producer creation options.
     *
     * @throws JMSException Any JMSExceptions are allowed to fall through.
     */
    public void createProducer() throws JMSException
    {
        // log.debug("public void createProducer(): called");

        _producer = (MessageProducer) _producerSession.createProducer(null);
        _producer.setDeliveryMode(_persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);

        // log.debug("Created producer for " + (_persistent ? "persistent" : "non-persistent") + " messages.");
    }

    /**
     * Creates consumers for the specified number of destinations. The destinations themselves are also created by this
     * method.
     *
     * @param noOfDestinations The number of destinations to create consumers for.
     * @param selector         The message selector to filter the consumers with.
     * @param rootName         The root of the name, or actual name if only one is being created.
     * @param unique           <tt>true</tt> to make the destinations unique to this pinger, <tt>false</tt> to share the
     *                         numbering with all pingers on the same JVM.
     * @param durable          If the destinations are durable topics.
     *
     * @throws JMSException Any JMSExceptions are allowed to fall through.
     */
    public void createPingDestinations(int noOfDestinations, String selector, String rootName, boolean unique,
        boolean durable) throws JMSException
    {
        /*log.debug("public void createPingDestinations(int noOfDestinations = " + noOfDestinations + ", String selector = "
            + selector + ", String rootName = " + rootName + ", boolean unique = " + unique + ", boolean durable = "
            + durable + "): called");*/

        _pingDestinations = new ArrayList<Destination>();

        // Create the desired number of ping destinations and consumers for them.
        // log.debug("Creating " + noOfDestinations + " destinations to ping.");

        for (int i = 0; i < noOfDestinations; i++)
        {
            Destination destination;
            String id;

            // Generate an id, unique within this pinger or to the whole JVM depending on the unique flag.
            if (unique)
            {
                // log.debug("Creating unique destinations.");
                id = "_" + _queueJVMSequenceID.incrementAndGet() + "_" + _connection.getClientID();
            }
            else
            {
                // log.debug("Creating shared destinations.");
                id = "_" + _queueSharedID.incrementAndGet();
            }

            // Check if this is a pub/sub pinger, in which case create topics.
            if (_isPubSub)
            {
                destination = _producerSession.createTopic(rootName + id);
                // log.debug("Created non-durable topic " + destination);

                if (durable)
                {
                    _producerSession.createDurableSubscriber((Topic) destination, _connection.getClientID());
                }
            }
            // Otherwise this is a p2p pinger, in which case create queues.
            else
            {
                destination = _producerSession.createQueue(rootName + id + _queueNamePostfix);
                // log.debug("Created queue " + destination);
            }

            // Keep the destination.
            _pingDestinations.add(destination);
        }
    }

    /**
     * Creates consumers for the specified destinations and registers this pinger to listen to their messages.
     *
     * @param destinations The destinations to listen to.
     * @param selector     A selector to filter the messages with.
     *
     * @throws javax.jms.JMSException Any JMSExceptions are allowed to fall through.
     */
    public void createReplyConsumers(Collection<Destination> destinations, String selector) throws JMSException
    {
        /*log.debug("public void createReplyConsumers(Collection<Destination> destinations = " + destinations
            + ", String selector = " + selector + "): called");*/

        log.debug("There are " + destinations.size() + " destinations.");
        log.debug("Creating " + _noOfConsumers + " consumers on each destination.");
        log.debug("Total number of consumers is: " + (destinations.size() * _noOfConsumers));

        for (Destination destination : destinations)
        {
            _consumer = new MessageConsumer[_noOfConsumers];

            // If we don't have consumers then ensure we have created the
            // destination.  
            if (_noOfConsumers == 0)
            {
                _producerSession.createConsumer(destination, selector,
                                                NO_LOCAL_DEFAULT).close();
            }

            for (int i = 0; i < _noOfConsumers; i++)
            {
                // Create a consumer for the destination and set this pinger to listen to its messages.
                _consumer[i] = _consumerSession[i].createConsumer(destination, selector, NO_LOCAL_DEFAULT);

                final int consumerNo = i;

                _consumer[i].setMessageListener(new MessageListener()
                    {
                        public void onMessage(Message message)
                        {
                            onMessageWithConsumerNo(message, consumerNo);
                        }
                    });

                log.debug("Set consumer " + i + " to listen to replies sent to destination: " + destination);
            }
        }
    }

    /**
     * Stores the received message in the replies map, then resets the boolean latch that a thread waiting for a
     * correlating reply may be waiting on. This is only done if the reply has a correlation id that is expected in the
     * replies map.
     *
     * @param message    The received message.
     * @param consumerNo The consumer number within this test pinger instance.
     */
    public void onMessageWithConsumerNo(Message message, int consumerNo)
    {
        // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo = " + consumerNo + "): called");
        try
        {
            long now = System.nanoTime();
            long timestamp = getTimestamp(message);
            long pingTime = now - timestamp;

            // NDC.push("id" + instanceId + "/cons" + consumerNo);

            // Extract the messages correlation id.
            String correlationID = message.getJMSCorrelationID();
            // log.debug("correlationID = " + correlationID);

            // int num = message.getIntProperty("MSG_NUM");
            // log.info("Message " + num + " received.");

            boolean isRedelivered = message.getJMSRedelivered();
            // log.debug("isRedelivered = " + isRedelivered);

            if (!isRedelivered)
            {
                // Countdown on the traffic light if there is one for the matching correlation id.
                PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID);

                if (perCorrelationId != null)
                {
                    CountDownLatch trafficLight = perCorrelationId.trafficLight;

                    // Restart the timeout timer on every message.
                    perCorrelationId.timeOutStart = System.nanoTime();

                    // log.debug("Reply was expected, decrementing the latch for the id, " + correlationID);

                    // Release waiting senders if there are some and using maxPending limit.
                    if ((_maxPendingSize > 0))
                    {
                        // Decrement the count of sent but not yet received messages.
                        int unreceived = _unreceived.decrementAndGet();
                        int unreceivedSize =
                            (unreceived * ((_messageSize == 0) ? 1 : _messageSize))
                            / (_isPubSub ? getConsumersPerDestination() : 1);

                        // log.debug("unreceived = " + unreceived);
                        // log.debug("unreceivedSize = " + unreceivedSize);

                        // synchronized (_sendPauseMonitor)
                        // {
                        if (unreceivedSize < _maxPendingSize)
                        {
                            _sendPauseMonitor.poll();
                        }
                        // }
                    }

                    // Decrement the countdown latch. Before this point, it is possible that two threads might enter this
                    // method simultanesouly with the same correlation id. Decrementing the latch in a synchronized block
                    // ensures that each thread will get a unique value for the remaining messages.
                    long trueCount;
                    long remainingCount;

                    synchronized (trafficLight)
                    {
                        trafficLight.countDown();

                        trueCount = trafficLight.getCount();
                        remainingCount = trueCount - 1;

                        // NDC.push("/rem" + remainingCount);

                        // log.debug("remainingCount = " + remainingCount);
                        // log.debug("trueCount = " + trueCount);

                        // Commit on transaction batch size boundaries. At this point in time the waiting producer
                        // remains blocked, even on the last message.
                        // Commit count is divided by noOfConsumers in p2p mode, so that each consumer only commits on
                        // each batch boundary. For pub/sub each consumer gets every message so no division is done.
                        // When running in client ack mode, an ack is done instead of a commit, on the commit batch
                        // size boundaries.
                        long commitCount = _isPubSub ? remainingCount : (remainingCount / _noOfConsumers);
                        // _noOfConsumers can be set to 0 on the command line but we will not get here to
                        // divide by 0 as this is executed by the onMessage code when a message is recevied.
                        // no consumers means no message reception.


                        // log.debug("commitCount = " + commitCount);

                        if ((commitCount % _txBatchSize) == 0)
                        {
                            if (_consAckMode == 2)
                            {
                                // log.debug("Doing client ack for consumer " + consumerNo + ".");
                                message.acknowledge();
                            }
                            else
                            {
                                // log.debug("Trying commit for consumer " + consumerNo + ".");
                                commitTx(_consumerSession[consumerNo]);
                                // log.info("Tx committed on consumer " + consumerNo);
                            }
                        }

                        // Forward the message and remaining count to any interested chained message listener.
                        if (_chainedMessageListener != null)
                        {
                            _chainedMessageListener.onMessage(message, (int) remainingCount, pingTime);
                        }

                        // Check if this is the last message, in which case release any waiting producers. This is done
                        // after the transaction has been committed and any listeners notified.
                        if (trueCount == 1)
                        {
                            trafficLight.countDown();
                        }
                    }
                }
                else
                {
                    log.warn(consumerNo + " Got unexpected message with correlationId: " + correlationID);
                    log.warn(consumerNo + " Map contains:" + perCorrelationIds.entrySet());
                }
            }
            else
            {
                log.warn("Got redelivered message, ignoring.");
            }
        }
        catch (JMSException e)
        {
            log.warn("There was a JMSException: " + e.getMessage(), e);
        }
        finally
        {
            // log.debug("public void onMessageWithConsumerNo(Message message, int consumerNo): ending");
            // NDC.clear();
        }
    }

    public void setupCorrelationID(String correlationId, int expectedCount)
    {
        PerCorrelationId perCorrelationId = new PerCorrelationId();

        // Create a count down latch to count the number of replies with. This is created before the messages are
        // sent so that the replies cannot be received before the count down is created.
        // One is added to this, so that the last reply becomes a special case. The special case is that the
        // chained message listener must be called before this sender can be unblocked, but that decrementing the
        // countdown needs to be done before the chained listener can be called.
        perCorrelationId.trafficLight = new CountDownLatch(expectedCount + 1);

        perCorrelationIds.put(correlationId, perCorrelationId);
    }


    /**
     * Sends the specified number of ping message and then waits for all correlating replies. If the wait times out
     * before a reply arrives, then a null reply is returned from this method. This method allows the caller to specify
     * the correlation id.
     *
     * Can be augmented through a pre-fill property (PingPongProducer.PREFILL_PROPNAME) that will populate the destination
     * with a set number of messages so the total pings sent and therefore expected will be PREFILL + numPings.
     *
     * If pre-fill is specified then the consumers will start paused to allow the prefilling to occur.
     *
     * @param message              The message to send. If this is null, one is generated.
     * @param numPings             The number of ping messages to send.
     * @param timeout              The timeout in milliseconds.
     * @param messageCorrelationId The message correlation id. If this is null, one is generated.
     *
     * @return The number of replies received. This may be less than the number sent if the timeout terminated the wait
     *         for all prematurely. If we are running in noConsumer=0 so send only mode then it will return the no msgs sent.
     *
     * @throws JMSException         All underlying JMSExceptions are allowed to fall through.
     * @throws InterruptedException When interrupted by a timeout
     */
    public int pingAndWaitForReply(Message message, int numPings, long timeout, String messageCorrelationId)
        throws JMSException, InterruptedException
    {
        return pingAndWaitForReply(message, numPings, 0, timeout, messageCorrelationId);
    }

    public int pingAndWaitForReply(Message message, int numPings, int preFill, long timeout, String messageCorrelationId)
        throws JMSException, InterruptedException
    {
        /*log.debug("public int pingAndWaitForReply(Message message, int numPings = " + numPings + ", long timeout = "
            + timeout + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/

        int totalPingsRequested = numPings + preFill;

        // Generate a unique correlation id to put on the messages before sending them, if one was not specified.
        if (messageCorrelationId == null)
        {
            messageCorrelationId = Long.toString(_correlationIdGenerator.incrementAndGet());

            setupCorrelationID(messageCorrelationId, getExpectedNumPings(totalPingsRequested));
        }

        try
        {
            // NDC.push("prod");

            PerCorrelationId perCorrelationId = perCorrelationIds.get(messageCorrelationId);

            // Set up the current time as the start time for pinging on the correlation id. This is used to determine
            // timeouts.
            perCorrelationId.timeOutStart = System.nanoTime();

            // Send the specifed number of messages for this test           
            pingNoWaitForReply(message, numPings, messageCorrelationId);

            boolean timedOut;
            boolean allMessagesReceived;
            int numReplies;

            // We don't have a consumer so don't try and wait for the messages.
            // this does mean that if the producerSession is !TXed then we may
            // get to exit before all msgs have been received.
            //
            // Return the number of requested messages, this will let the test
            // report a pass.
            if (_noOfConsumers == 0 || _sendOnly)
            {
                return getExpectedNumPings(totalPingsRequested);
            }

            do
            {
                // Block the current thread until replies to all the messages are received, or it times out.
                perCorrelationId.trafficLight.await(timeout, TimeUnit.MILLISECONDS);

                // Work out how many replies were receieved.
                numReplies = getExpectedNumPings(totalPingsRequested) - (int) perCorrelationId.trafficLight.getCount();

                allMessagesReceived = numReplies == getExpectedNumPings(totalPingsRequested);

                // log.debug("numReplies = " + numReplies);
                // log.debug("allMessagesReceived = " + allMessagesReceived);

                // Recheck the timeout condition.
                long now = System.nanoTime();
                long lastMessageReceievedAt = perCorrelationId.timeOutStart;
                timedOut = (now - lastMessageReceievedAt) > (timeout * 1000000);

                // log.debug("now = " + now);
                // log.debug("lastMessageReceievedAt = " + lastMessageReceievedAt);
            }
            while (!timedOut && !allMessagesReceived);

            if ((numReplies < getExpectedNumPings(totalPingsRequested)) && _verbose)
            {
                log.info("Timed out (" + timeout + " ms) before all replies received on id, " + messageCorrelationId);
            }
            else if (_verbose)
            {
                log.info("Got all replies on id, " + messageCorrelationId);
            }

            // commitTx(_consumerSession);

            // log.debug("public int pingAndWaitForReply(Message message, int numPings, long timeout): ending");

            return numReplies;
        }
        // Ensure that the message countdown latch is always removed from the reply map. The reply map is long lived,
        // so will be a memory leak if this is not done.
        finally
        {
            // NDC.pop();
            perCorrelationIds.remove(messageCorrelationId);
        }
    }

    /**
     * Sends the specified number of ping messages and does not wait for correlating replies.
     *
     * @param message              The message to send.
     * @param numPings             The number of pings to send.
     * @param messageCorrelationId A correlation id to place on all messages sent.
     *
     * @throws JMSException All underlying JMSExceptions are allowed to fall through.
     */
    public void pingNoWaitForReply(Message message, int numPings, String messageCorrelationId) throws JMSException
    {
        /*log.debug("public void pingNoWaitForReply(Message message, int numPings = " + numPings
            + ", String messageCorrelationId = " + messageCorrelationId + "): called");*/

        // If we are runnning a consumeOnly test then don't send any messages
        if (_consumeOnly)
        {
            return;
        }
       
        if (message == null)
        {
            message = getTestMessage(getReplyDestinations().get(0), _messageSize, _persistent);
        }

        message.setJMSCorrelationID(messageCorrelationId);

        // Set up a committed flag to detect uncommitted messages at the end of the send loop. This may occurr if the
        // transaction batch size is not a factor of the number of pings. In which case an extra commit at the end is
        // needed.
        boolean committed = false;

        // Send all of the ping messages.
        for (int i = 0; i < numPings; i++)
        {
            // Re-timestamp the message.
            // message.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());

            // Send the message, passing in the message count.
            committed = sendMessage(i, message);

            // Spew out per message timings on every message sonly in verbose mode.
            /*if (_verbose)
            {
                log.info(timestampFormatter.format(new Date()) + ": Pinged at with correlation id, " + messageCorrelationId);
            }*/
        }

        // Call commit if the send loop finished before reaching a batch size boundary so there may still be uncommitted messages.
        if (!committed)
        {
            commitTx(_producerSession);
        }
    }

    /**
     * Sends the sepcified message, applies rate limiting and possibly commits the current transaction. The count of
     * messages sent so far must be specified and is used to round robin the ping destinations (where there are more
     * than one), and to determine if the transaction batch size has been reached and the sent messages should be
     * committed.
     *
     * @param i       The count of messages sent so far in a loop of multiple calls to this send method.
     * @param message The message to send.
     *
     * @return <tt>true</tt> if the messages were committed, <tt>false</tt> otherwise.
     *
     * @throws JMSException All underlyiung JMSExceptions are allowed to fall through.
     */
    protected boolean sendMessage(int i, Message message) throws JMSException
    {
        try
        {
            NDC.push("id" + instanceId + "/prod");

            // log.debug("protected boolean sendMessage(int i = " + i + ", Message message): called");
            // log.debug("_txBatchSize = " + _txBatchSize);

            // Round robin the destinations as the messages are sent.
            Destination destination = _pingDestinations.get(i % _pingDestinations.size());

            // Prompt the user to kill the broker when doing failover testing.
            _failBeforeSend = waitForUserToPromptOnFailure(_failBeforeSend);

            // Get the test setup for the correlation id.
            String correlationID = message.getJMSCorrelationID();
            PerCorrelationId perCorrelationId = perCorrelationIds.get(correlationID);

            // If necessary, wait until the max pending message size comes within its limit.
            if (_maxPendingSize > 0)
            {
                synchronized (_sendPauseMonitor)
                {
                    // Used to keep track of the number of times that send has to wait.
                    int numWaits = 0;

                    // The maximum number of waits before the test gives up and fails. This has been chosen to correspond with
                    // the test timeout.
                    int waitLimit = (int) (TIMEOUT_DEFAULT / 10000);

                    while (true)
                    {
                        // Get the size estimate of sent but not yet received messages.
                        int unreceived = _unreceived.get();
                        int unreceivedSize =
                            (unreceived * ((_messageSize == 0) ? 1 : _messageSize))
                            / (_isPubSub ? getConsumersPerDestination() : 1);

                        // log.debug("unreceived = " + unreceived);
                        // log.debug("unreceivedSize = " + unreceivedSize);
                        // log.debug("_maxPendingSize = " + _maxPendingSize);

                        if (unreceivedSize > _maxPendingSize)
                        {
                            // log.debug("unreceived size estimate over limit = " + unreceivedSize);

                            // Fail the test if the send has had to wait more than the maximum allowed number of times.
                            if (numWaits > waitLimit)
                            {
                                String errorMessage =
                                    "Send has had to wait for the unreceivedSize (" + unreceivedSize
                                    + ") to come below the maxPendingSize (" + _maxPendingSize + ") more that " + waitLimit
                                    + " times.";
                                log.warn(errorMessage);
                                throw new RuntimeException(errorMessage);
                            }

                            // Wait on the send pause barrier for the limit to be re-established.
                            try
                            {
                                long start = System.nanoTime();
                                // _sendPauseMonitor.wait(10000);
                                _sendPauseMonitor.offer(new Object(), 10000, TimeUnit.MILLISECONDS);
                                long end = System.nanoTime();

                                // Count the wait only if it was for > 99% of the requested wait time.
                                if (((float) (end - start) / (float) (10000 * 1000000L)) > 0.99)
                                {
                                    numWaits++;
                                }
                            }
                            catch (InterruptedException e)
                            {
                                // Restore the interrupted status
                                Thread.currentThread().interrupt();
                                throw new RuntimeException(e);
                            }
                        }
                        else
                        {
                            break;
                        }
                    }
                }
            }

            // Send the message either to its round robin destination, or its default destination.
            // int num = numSent.incrementAndGet();
            // message.setIntProperty("MSG_NUM", num);
            setTimestamp(message);

            if (destination == null)
            {
                _producer.send(message);
            }
            else
            {
                _producer.send(destination, message);
            }

            // Increase the unreceived size, this may actually happen after the message is received.
            // The unreceived size is incremented by the number of consumers that will get a copy of the message,
            // in pub/sub mode.
            if (_maxPendingSize > 0)
            {
                int newUnreceivedCount = _unreceived.addAndGet(_isPubSub ? getConsumersPerDestination() : 1);
                // log.debug("newUnreceivedCount = " + newUnreceivedCount);
            }

            // Apply message rate throttling if a rate limit has been set up.
            if (_rateLimiter != null)
            {
                _rateLimiter.throttle();
            }

            // Call commit every time the commit batch size is reached.
            boolean committed = false;

            // Commit on every transaction batch size boundary. Here i + 1 is the count of actual messages sent.
            if (((i + 1) % _txBatchSize) == 0)
            {
                // log.debug("Trying commit on producer session.");
                committed = commitTx(_producerSession);
            }

            return committed;
        }
        finally
        {
            NDC.clear();
        }
    }

    /**
     * If the specified fail flag is set, this method waits for the user to cause a failure and then indicate to the
     * test that the failure has occurred, before the method returns.
     *
     * @param failFlag The fail flag to test.
     *
     * @return The new value for the fail flag. If the {@link #_failOnce} flag is set, then each fail flag is only
     *         used once, then reset.
     */
    private boolean waitForUserToPromptOnFailure(boolean failFlag)
    {
        if (failFlag)
        {
            if (_failOnce)
            {
                failFlag = false;
            }

            // log.debug("Failing Before Send");
            waitForUser(KILL_BROKER_PROMPT);
        }

        return failFlag;
    }

    /**
     * Implements a single iteration of the ping loop. This sends the number of pings specified by the transaction batch
     * size property, and waits for replies to all of them. Any errors cause the publish flag to be cleared, which will
     * terminate the pinger.
     */
    public void pingLoop()
    {
        try
        {
            // Generate a sample message and time stamp it.
            Message msg = getTestMessage(_replyDestination, _messageSize, _persistent);
            // setTimestamp(msg);

            // Send the message and wait for a reply.
            pingAndWaitForReply(msg, TX_BATCH_SIZE_DEFAULT, TIMEOUT_DEFAULT, null);
        }
        catch (JMSException e)
        {
            _publish = false;
            // log.debug("There was a JMSException: " + e.getMessage(), e);
        }
        catch (InterruptedException e)
        {
            _publish = false;
            // log.debug("There was an interruption: " + e.getMessage(), e);
        }
    }

    /**
     * Sets a chained message listener. The message listener on this pinger, chains all its messages to the one set
     * here.
     *
     * @param messageListener The chained message listener.
     */
    public void setChainedMessageListener(ChainedMessageListener messageListener)
    {
        _chainedMessageListener = messageListener;
    }

    /** Removes any chained message listeners from this pinger. */
    public void removeChainedMessageListener()
    {
        _chainedMessageListener = null;
    }

    /**
     * Generates a test message of the specified size, with the specified reply-to destination and persistence flag.
     *
     * @param replyQueue  The reply-to destination for the message.
     * @param messageSize The desired size of the message in bytes.
     * @param persistent  <tt>true</tt> if the message should use persistent delivery, <tt>false</tt> otherwise.
     *
     * @return A freshly generated test message.
     *
     * @throws javax.jms.JMSException All underlying JMSException are allowed to fall through.
     */
    public Message getTestMessage(Destination replyQueue, int messageSize, boolean persistent) throws JMSException
    {
        // return TestMessageFactory.newObjectMessage(_producerSession, replyQueue, messageSize, persistent);
        return TestUtils.createTestMessageOfSize(_producerSession, messageSize);
    }

    /**
     * Sets the current time in nanoseconds as the timestamp on the message.
     *
     * @param msg The message to timestamp.
     *
     * @throws JMSException Any JMSExceptions are allowed to fall through.
     */
    protected void setTimestamp(Message msg) throws JMSException
    {
        /*if (((AMQSession)_producerSession).isStrictAMQP())
        {
            ((AMQMessage)msg).setTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME), System.nanoTime());
        }
        else
        {*/
        msg.setLongProperty(MESSAGE_TIMESTAMP_PROPNAME, System.nanoTime());
        // }
    }

    /**
     * Extracts the nanosecond timestamp from a message.
     *
     * @param msg The message to extract the time stamp from.
     *
     * @return The timestamp in nanos.
     *
     * @throws JMSException Any JMSExceptions are allowed to fall through.
     */
    protected long getTimestamp(Message msg) throws JMSException
    {
        /*if (((AMQSession)_producerSession).isStrictAMQP())
        {
            Long value = ((AMQMessage)msg).getTimestampProperty(new AMQShortString(MESSAGE_TIMESTAMP_PROPNAME));

            return (value == null) ? 0L : value;
        }
        else
        {*/
        return msg.getLongProperty(PingPongProducer.MESSAGE_TIMESTAMP_PROPNAME);
            // }
    }

    /**
     * Stops the ping loop by clearing the publish flag. The current loop will complete when it notices that this flag
     * has been cleared.
     */
    public void stop()
    {
        _publish = false;
    }

    /**
     * Starts the producer and consumer connections.
     *
     * @throws JMSException Any JMSExceptions are allowed to fall through.
     */
    public void start() throws JMSException
    {
        // log.debug("public void start(): called");

        _connection.start();
        // log.debug("Producer started.");

        for (int i = 0; i < _noOfConsumers; i++)
        {
            _consumerConnection[i].start();
            // log.debug("Consumer " + i + " started.");
        }
    }

    /** Implements a ping loop that repeatedly pings until the publish flag becomes false. */
    public void run()
    {
        // Keep running until the publish flag is cleared.
        while (_publish)
        {
            pingLoop();
        }
    }

    /**
     * Callback method, implementing ExceptionListener. This should be registered to listen for exceptions on the
     * connection, this clears the publish flag which in turn will halt the ping loop.
     *
     * @param e The exception that triggered this callback method.
     */
    public void onException(JMSException e)
    {
        // log.debug("public void onException(JMSException e = " + e + "): called", e);
        _publish = false;
    }

    /**
     * Gets a shutdown hook that will cleanly shut this down when it is running the ping loop. This can be registered
     * with the runtime system as a shutdown hook.
     *
     * @return A shutdown hook for the ping loop.
     */
    public Thread getShutdownHook()
    {
        return new Thread(new Runnable()
                {
                    public void run()
                    {
                        stop();
                    }
                });
    }

    /**
     * Closes all of the producer and consumer connections.
     *
     * @throws JMSException All JMSException are allowed to fall through.
     */
    public void close() throws JMSException
    {
        // log.debug("public void close(): called");

        try
        {
            if (_connection != null)
            {
                // log.debug("Before close producer connection.");
                _connection.close();
                // log.debug("Closed producer connection.");
            }

            for (int i = 0; i < _noOfConsumers; i++)
            {
                if (_consumerConnection[i] != null)
                {
                    // log.debug("Before close consumer connection " + i + ".");
                    _consumerConnection[i].close();
                    // log.debug("Closed consumer connection " + i + ".");
                }
            }
        }
        finally
        {
            _connection = null;
            _producerSession = null;
            _consumerSession = null;
            _consumerConnection = null;
            _producer = null;
            _consumer = null;
            _pingDestinations = null;
            _replyDestination = null;
        }
    }

    /**
     * Convenience method to commit the transaction on the specified controlSession. If the controlSession to commit on is not a
     * transactional controlSession, this method does nothing (unless the failover after send flag is set).
     *
     * <p/>If the {@link #_failAfterSend} flag is set, this will prompt the user to kill the broker before the commit is
     * applied. This flag applies whether the pinger is transactional or not.
     *
     * <p/>If the {@link #_failBeforeCommit} flag is set, this will prompt the user to kill the broker before the commit
     * is applied. If the {@link #_failAfterCommit} flag is set, this will prompt the user to kill the broker after the
     * commit is applied. These flags will only apply if using a transactional pinger.
     *
     * @param session The controlSession to commit
     *
     * @return <tt>true</tt> if the controlSession was committed, <tt>false</tt> if it was not.
     *
     * @throws javax.jms.JMSException If the commit fails and then the rollback fails.
     *
     * @todo Consider moving the fail after send logic into the send method. It is confusing to have it in this commit
     * method, because commits only apply to transactional pingers, but fail after send applied to transactional and
     * non-transactional alike.
     */
    protected boolean commitTx(Session session) throws JMSException
    {
        // log.debug("protected void commitTx(Session session): called");

        boolean committed = false;

        _failAfterSend = waitForUserToPromptOnFailure(_failAfterSend);

        if (session.getTransacted())
        {
            // log.debug("Session is transacted.");

            try
            {
                _failBeforeCommit = waitForUserToPromptOnFailure(_failBeforeCommit);

                long start = System.nanoTime();
                session.commit();
                committed = true;
                // log.debug("Time taken to commit :" + ((System.nanoTime() - start) / 1000000f) + " ms");

                _failAfterCommit = waitForUserToPromptOnFailure(_failAfterCommit);

                // log.debug("Session Commited.");
            }
            catch (JMSException e)
            {
                // log.debug("JMSException on commit:" + e.getMessage(), e);

                try
                {
                    session.rollback();
                    // log.debug("Message rolled back.");
                }
                catch (JMSException jmse)
                {
                    // log.debug("JMSE on rollback:" + jmse.getMessage(), jmse);

                    // Both commit and rollback failed. Throw the rollback exception.
                    throw jmse;
                }
            }
        }

        return committed;
    }

    /**
     * Outputs a prompt to the console and waits for the user to press return.
     *
     * @param prompt The prompt to display on the console.
     */
    public void waitForUser(String prompt)
    {
        System.out.println(prompt);

        try
        {
            System.in.read();
        }
        catch (IOException e)
        {
            // Ignored.
        }

        System.out.println("Continuing.");
    }

    /**
     * Gets the number of consumers that are listening to each destination in the test.
     *
     * @return int The number of consumers subscribing to each topic.
     */
    public int getConsumersPerDestination()
    {
        return _noOfConsumers;
    }

    /**
     * Calculates how many pings are expected to be received for the given number sent.
     *
     * Note : that if you have set noConsumers to 0 then this will also return 0
     * in the case of PubSub testing. This is correct as without consumers there
     * will be no-one to receive the sent messages so they will be unable to respond.
     *
     * @param numpings The number of pings that will be sent.
     *
     * @return The number that should be received, for the test to pass.
     */
    public int getExpectedNumPings(int numpings)
    {
        // Wow, I'm freaking sorry about this return here...
        return ((_failAfterSend || _failBeforeCommit) ? numpings - 1: numpings) *
                                    (_isPubSub ? getConsumersPerDestination() : 1);
    }

    /**
     * Defines a chained message listener interface that can be attached to this pinger. Whenever this pinger's {@link
     * PingPongProducer#onMessageWithConsumerNo} method is called, the chained listener set through the {@link
     * PingPongProducer#setChainedMessageListener} method is passed the message, and the remaining expected count of
     * messages with that correlation id.
     *
     * <p/>Provided only one pinger is producing messages with that correlation id, the chained listener will always be
     * given unique message counts. It will always be called while the producer waiting for all messages to arrive is
     * still blocked.
     */
    public static interface ChainedMessageListener
    {
        /**
         * Notifies interested listeners about message arrival and important test stats, the number of messages
         * remaining in the test, and the messages send timestamp.
         *
         * @param message        The newly arrived message.
         * @param remainingCount The number of messages left to complete the test.
         * @param latency        The nanosecond latency of the message.
         *
         * @throws JMSException Any JMS exceptions is allowed to fall through.
         */
        public void onMessage(Message message, int remainingCount, long latency) throws JMSException;
    }

    /**
     * Holds information on each correlation id. The countdown latch, the current timeout timer... More stuff to be
     * added to this: read/write lock to make onMessage more concurrent as described in class header comment.
     */
    protected static class PerCorrelationId
    {
        /** Holds a countdown on number of expected messages. */
        CountDownLatch trafficLight;

        /** Holds the last timestamp that the timeout was reset to. */
        Long timeOutStart;
    }
}
TOP

Related Classes of org.apache.qpid.requestreply.PingPongProducer$PerCorrelationId

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.
ga('send', 'pageview');