Package org.jgroups.tests

Source Code of org.jgroups.tests.StateTransferTest

package org.jgroups.tests;

import junit.framework.Test;
import junit.framework.TestSuite;
import org.jgroups.Channel;
import org.jgroups.Message;
import org.jgroups.util.Util;

import java.io.*;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

/**
* Tests correct state transfer while other members continue sending messages to
* the group
*
* @author Bela Ban
* @version $Id: StateTransferTest.java,v 1.18.2.6 2008/10/30 15:03:49 belaban Exp $
*/
public class StateTransferTest extends ChannelTestBase {
    private static final int MSG_SEND_COUNT = 10000;

    private static final int APP_COUNT = 2;

    public StateTransferTest(String name){
        super(name);
    }

    public void setUp() throws Exception {
        super.setUp();
        CHANNEL_CONFIG = System.getProperty("channel.conf.flush", "flush-udp.xml");
    }

    public boolean useBlocking() {
        return true;
    }
   
    public void testStateTransferFromSelfWithRegularChannel() throws Exception {
        Channel ch = createChannel();
        ch.connect("test");
        try {
            boolean rc=ch.getState(null, 2000);
            assertFalse("getState() on singleton should return false", rc);
        }
        finally {
            ch.close();
        }
    }

    public void testStateTransferWhileSending() throws Exception {
        StateTransferApplication[] apps = new StateTransferApplication[APP_COUNT];

        // Create a semaphore and take all its permits
        Semaphore semaphore = new Semaphore(APP_COUNT);
        semaphore.acquire(APP_COUNT);

        int from = 0, to = MSG_SEND_COUNT;
        String[] names = createApplicationNames(APP_COUNT);

        for(int i = 0;i < apps.length;i++){
            apps[i] = new StateTransferApplication(semaphore, names[i], from, to);
            from += MSG_SEND_COUNT;
            to += MSG_SEND_COUNT;
        }

        for(int i = 0;i < apps.length;i++){
            StateTransferApplication app = apps[i];
            app.start();
            semaphore.release();
            Util.sleep(4000);
        }
       
        // Make sure everyone is in sync

        blockUntilViewsReceived(apps, 60000);

        Util.sleep(1000);

        // Reacquire the semaphore tickets; when we have them all
        // we know the threads are done
        boolean acquired = semaphore.tryAcquire( apps.length, 30, TimeUnit.SECONDS);
        if(!acquired){
            log.warn("Most likely a bug, analyse the stack below:");
            log.warn(Util.dumpThreads());
        }
                     
        // have we received all and the correct messages?
        for(int i = 0;i < apps.length;i++){
            StateTransferApplication w = apps[i];
            Map m = w.getMap();
            log.info("map has " + m.size() + " elements");
            assertEquals(MSG_SEND_COUNT * APP_COUNT, m.size());
        }

        Set keys = apps[0].getMap().keySet();
        for(int i = 0;i < apps.length;i++){
            StateTransferApplication app = apps[i];
            Map m = app.getMap();
            Set s = m.keySet();
            assertEquals(keys, s);
        }

        for(StateTransferApplication app:apps){
            app.cleanup();
        }
    }



    protected class StateTransferApplication extends PushChannelApplicationWithSemaphore {
        private final ReentrantLock mapLock = new ReentrantLock();

        private Map map = new HashMap(MSG_SEND_COUNT * APP_COUNT);

        private int from, to;

        public StateTransferApplication(Semaphore semaphore,String name,int from,int to) throws Exception{
            super(name, semaphore);
            this.from = from;
            this.to = to;
        }

        public Map getMap() {
            Map result = null;
            mapLock.lock();
            result = Collections.unmodifiableMap(map);
            mapLock.unlock();
            return result;
        }

        @Override
        public void receive(Message msg) {
            Object[] data = (Object[]) msg.getObject();
            mapLock.lock();
            map.put(data[0], data[1]);
            int num_received = map.size();
            mapLock.unlock();

            if(num_received % 1000 == 0)
                log.info("received " + num_received);

            // are we done?
            if(num_received >= MSG_SEND_COUNT * APP_COUNT)
                semaphore.release();
        }

        @Override
        public byte[] getState() {
            byte[] result = null;
            mapLock.lock();
            try{
                result = Util.objectToByteBuffer(map);
            }catch(Exception e){
                e.printStackTrace();
            }finally{
                mapLock.unlock();
            }
            return result;
        }

        @Override
        public void setState(byte[] state) {
            mapLock.lock();
            try{
                map = (Map) Util.objectFromByteBuffer(state);
            }catch(Exception e){
                e.printStackTrace();
            }finally{
                mapLock.unlock();
            }
            log.info("received state, map has " + map.size() + " elements");

        }

        @Override
        public void getState(OutputStream ostream) {
            ObjectOutputStream out;
            mapLock.lock();
            try{
                out = new ObjectOutputStream(ostream);
                out.writeObject(map);
                out.close();
            }catch(IOException e){
                e.printStackTrace();
            }finally{
                mapLock.unlock();
            }

        }

        @Override
        public void setState(InputStream istream) {
            ObjectInputStream in;
            mapLock.lock();
            try{
                in = new ObjectInputStream(istream);
                map = (Map) in.readObject();
                log.info("received state, map has " + map.size() + " elements");
                in.close();
            }catch(IOException e){
                e.printStackTrace();
            }catch(ClassNotFoundException e){
                e.printStackTrace();
            }finally{
                mapLock.unlock();
            }
        }
       
        public void run() {
            boolean acquired = false;
            try{
                acquired = semaphore.tryAcquire(60000L, TimeUnit.MILLISECONDS);
                if(!acquired){
                    throw new Exception(name + " cannot acquire semaphore");
                }
                useChannel();
            }catch(Exception e){
                log.error(name + ": " + e.getLocalizedMessage(), e);
                // Save it for the test to check
                exception = e;
            }
        }

        @Override
        protected void useChannel() throws Exception {
            channel.connect("test",null,null,10000);           
            Object[] data = new Object[2];
            for(int i = from;i < to;i++){
                data[0] = new Integer(i);
                data[1] = "Value #" + i;
                try{
                    channel.send(null, null, data);
                    if(i % 100 == 0)
                      Util.sleep(50);
                   
                    if(i % 1000 == 0)
                        log.info("sent " + i);
                }catch(Exception e){
                    e.printStackTrace();
                    break;
                }
            }
        }       
    }

    public static Test suite() {
        return new TestSuite(StateTransferTest.class);
    }

    public static void main(String[] args) {
        junit.textui.TestRunner.run(suite());
    }
}
TOP

Related Classes of org.jgroups.tests.StateTransferTest

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.