Package org.activeio.net.benchmark

Source Code of org.activeio.net.benchmark.ClientLoadSimulator$Client

/**
*
* Copyright 2004 Hiram Chirino
*
*  Licensed under the Apache License, Version 2.0 (the "License");
*  you may not use this file except in compliance with the License.
*  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
*  Unless required by applicable law or agreed to in writing, software
*  distributed under the License is distributed on an "AS IS" BASIS,
*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*  See the License for the specific language governing permissions and
*  limitations under the License.
*/
package org.activeio.net.benchmark;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;

import org.activeio.ChannelFactory;
import org.activeio.Packet;
import org.activeio.SynchChannel;
import org.activeio.packet.ByteArrayPacket;
import org.activeio.packet.EOSPacket;
import org.activeio.stats.CountStatisticImpl;
import org.activeio.stats.TimeStatisticImpl;
import org.apache.commons.beanutils.BeanUtils;

import EDU.oswego.cs.dl.util.concurrent.Latch;
import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;

/**
* Simulates multiple a simple tcp echo clients for use in benchmarking activeio
* channel implementations.
*
* @version $Revision$
*/
public class ClientLoadSimulator implements Runnable {

    private URI url;

    // Afects how clients are created
    private int concurrentClients = 10;
    private long rampUpTime = 1000 * concurrentClients;

    // Afects how clients behave
    private long requestDelay = 500;
    private int requestIterations = Integer.MAX_VALUE;
    private int requestSize = 1024;
   
    PooledExecutor threadPool = new PooledExecutor();

    // The packet the clients send to the server.
    private Packet requestPacket;
    private long sampleInterval = 1000;
    private ChannelFactory factory = new ChannelFactory();
    private Latch shutdownLatch;

    private final CountStatisticImpl activeConnectionsCounter = new CountStatisticImpl("activeConnectionsCounter",
            "The number of active connection attached to the server.");
    private final CountStatisticImpl echoedBytesCounter = new CountStatisticImpl("echoedBytesCounter",
            "The number of bytes that have been echoed by the server.");
    private final TimeStatisticImpl requestLatency = new TimeStatisticImpl("requestLatency",
            "The amount of time that is spent waiting for a request to be serviced");

    public static void main(String[] args) throws URISyntaxException, IllegalAccessException, InvocationTargetException {

        ClientLoadSimulator client = new ClientLoadSimulator();

        HashMap options = new HashMap();
        for (int i = 0; i < args.length; i++) {

            String option = args[i];
            if (!option.startsWith("-") || option.length() < 2 || i + 1 >= args.length) {
                System.out.println("Invalid usage");
                return;
            }

            option = option.substring(1);
            options.put(option, args[++i]);
        }

        BeanUtils.populate(client, options);
       
        System.out.println();
        System.out.println("Server starting with the following options: ");
        System.out.println(" url="+client.getUrl());
        System.out.println(" sampleInterval="+client.getSampleInterval());
        System.out.println(" concurrentClients="+client.getConcurrentClients());
        System.out.println(" rampUpTime="+client.getRampUpTime());
        System.out.println(" requestIterations="+client.getRequestIterations());
        System.out.println(" requestSize="+client.getRequestSize());
        System.out.println(" requestDelay="+client.getRequestDelay());
        System.out.println();
        client.run();

    }
    private void printSampleData() {
        long now = System.currentTimeMillis();
        float runDuration = (now - activeConnectionsCounter.getStartTime()) / 1000f;
        System.out.println("Active connections: " + activeConnectionsCounter.getCount());
        System.out.println("Echoed bytes: " + (echoedBytesCounter.getCount()/1024f) + " kb"
                + ", Request latency: " + requestLatency.getAverageTime()+" ms");
        echoedBytesCounter.reset();
        requestLatency.reset();
    }

    public void run() {
        ArrayList clients = new ArrayList();
        try {

            shutdownLatch = new Latch();
            activeConnectionsCounter.reset();
            echoedBytesCounter.reset();

            new Thread("Sampler") {
                public void run() {
                    System.out.println("Sampler started.");
                    try {
                        while (!shutdownLatch.attempt(sampleInterval)) {
                            printSampleData();
                        }
                    } catch (InterruptedException e) {
                    }
                    System.out.println("Sampler stopped.");
                }
            }.start();

            byte data[] = new byte[requestSize];
            for (int i = 0; i < data.length; i++) {
                data[i] = (byte) i;
            }
            requestPacket = new ByteArrayPacket(data);

            // Loop to ramp up the clients.

            long clientActivationDelay = rampUpTime / concurrentClients;
            for (int i = 0; i < concurrentClients && !shutdownLatch.attempt(clientActivationDelay); i++) {
                System.out.println("Adding Client: " + i);
                Client client = new Client();
                clients.add(client);
                new Thread(client, "Client: " + i).start();
            }

            shutdownLatch.acquire();

        } catch (InterruptedException e) {
        } finally {
            System.out.println("Shutting down clients.");
            for (Iterator iter = clients.iterator(); iter.hasNext();) {
                Client client = (Client) iter.next();
                client.dispose();
            }
        }
    }

    public String getUrl() {
        return url.toString();
    }

    public void setUrl(String url) throws URISyntaxException {
        this.url = new URI(url);
    }

    class Client implements Runnable {

        private Latch shutdownLatch = new Latch();

        Packet packet = requestPacket.duplicate();

        private SynchChannel synchChannel;

        public void run() {
            try {
                System.out.println("Client started.");

                activeConnectionsCounter.increment();
                synchChannel = factory.openSynchChannel(url);
                for (int i = 0; i < requestIterations && !shutdownLatch.attempt(1) ; i++) {

                    long start = System.currentTimeMillis();
                    sendRequest();
                    long end = System.currentTimeMillis();

                    requestLatency.addTime(end - start);
                    echoedBytesCounter.add(packet.remaining());

                    if( requestDelay > 0 ) {
                        Thread.sleep(requestDelay);
                    }
                }

            } catch (IOException e) {
                e.printStackTrace();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                System.out.println("Client stopped.");

                activeConnectionsCounter.decrement();
                if( synchChannel!=null ) {
                    synchChannel.dispose();
                    synchChannel = null;
                }
            }
        }

        private void sendRequest() throws IOException, InterruptedException {
           
            final Latch done = new Latch();
           
            // Read the data async to avoid dead locks due buffers being to small for
            // data being sent.
            threadPool.execute(new Runnable() {
                public void run() {
                    try {
                        int c = 0;
                        while (c < packet.remaining()) {
                            Packet p = synchChannel.read(1000*5);
                            if( p==null ) {
                                continue;
                            }
                            if( p == EOSPacket.EOS_PACKET ) {
                                System.out.println("Peer disconnected.");
                                dispose();
                            }
                            c += p.remaining();
                        }
                        done.release();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
            });

            synchChannel.write(packet.duplicate());
            synchChannel.flush();
            done.acquire();
           
        }

        public void dispose() {
            shutdownLatch.release();
        }
    }

    /**
     * @return Returns the concurrentClients.
     */
    public int getConcurrentClients() {
        return concurrentClients;
    }

    /**
     * @param concurrentClients
     *            The concurrentClients to set.
     */
    public void setConcurrentClients(int concurrentClients) {
        this.concurrentClients = concurrentClients;
    }

    /**
     * @return Returns the rampUpTime.
     */
    public long getRampUpTime() {
        return rampUpTime;
    }

    /**
     * @param rampUpTime
     *            The rampUpTime to set.
     */
    public void setRampUpTime(long rampUpTime) {
        this.rampUpTime = rampUpTime;
    }

    /**
     * @return Returns the requestDelay.
     */
    public long getRequestDelay() {
        return requestDelay;
    }

    /**
     * @param requestDelay
     *            The requestDelay to set.
     */
    public void setRequestDelay(long requestDelay) {
        this.requestDelay = requestDelay;
    }

    /**
     * @return Returns the requestIterations.
     */
    public int getRequestIterations() {
        return requestIterations;
    }

    /**
     * @param requestIterations
     *            The requestIterations to set.
     */
    public void setRequestIterations(int requestIterations) {
        this.requestIterations = requestIterations;
    }

    /**
     * @return Returns the requestSize.
     */
    public int getRequestSize() {
        return requestSize;
    }

    /**
     * @param requestSize
     *            The requestSize to set.
     */
    public void setRequestSize(int requestSize) {
        this.requestSize = requestSize;
    }   

    /**
     * @return Returns the sampleInterval.
     */
    public long getSampleInterval() {
        return sampleInterval;
    }
    /**
     * @param sampleInterval The sampleInterval to set.
     */
    public void setSampleInterval(long sampleInterval) {
        this.sampleInterval = sampleInterval;
    }
}
TOP

Related Classes of org.activeio.net.benchmark.ClientLoadSimulator$Client

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.