Package org.jgroups.tests

Source Code of org.jgroups.tests.StreamingStateTransferTest$StreamingStateTransferApplication

package org.jgroups.tests;

import java.io.IOException;
import java.io.InputStream;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;

import org.jgroups.*;
import org.jgroups.util.Util;

/**
* Tests streaming state transfer.
*
*
* @author Vladimir Blagojevic
* @version $Id$
*
*/
public class StreamingStateTransferTest extends ChannelTestBase {

    public void setUp() throws Exception {
        super.setUp();
        CHANNEL_CONFIG = System.getProperty("channel.conf.flush", "flush-udp.xml");
    }

    public boolean useBlocking() {
        return true;
    }

    public void testTransfer() {
        String[] channelNames = new String[] { "A", "B", "C", "D" };
        transferHelper(channelNames, false);
    }

    public void testRpcChannelTransfer() {
        // do this test for regular channels only
        String channelNames[] = new String[] { "A", "B", "C", "D" };
        transferHelper(channelNames, true);
    }



    public void transferHelper(String channelNames[], boolean useDispatcher) {
        transferHelper(channelNames, false, false, useDispatcher);
    }

    public void transferHelper(String channelNames[],
                               boolean crash,
                               boolean largeTransfer,
                               boolean useDispatcher) {
        int channelCount = channelNames.length;
        ArrayList<StreamingStateTransferApplication> channels=new ArrayList<StreamingStateTransferApplication>(channelCount);

        // Create a semaphore and take all its tickets
        Semaphore semaphore = new Semaphore(channelCount);

        try{

            semaphore.acquire(channelCount);
            boolean crashed = false;
            // Create activation threads that will block on the semaphore
            for(int i = 0;i < channelCount;i++){
                StreamingStateTransferApplication channel = null;
               
                channel = new StreamingStateTransferApplication(channelNames[i],                                                                   
                                                                semaphore,
                                                                useDispatcher,
                                                                largeTransfer);               

                // Start threads and let them join the channel
                channels.add(channel);
                semaphore.release(1);
                channel.start();
                Util.sleep(2000);

                if(crash && !crashed && i > 2){
                    StreamingStateTransferApplication coord = channels.remove(0);
                    coord.cleanup();
                    crashed = true;
                }
            }

            blockUntilViewsReceived(channels, 60000);

            // Reacquire the semaphore tickets; when we have them all
            // we know the threads are done
            boolean acquired = semaphore.tryAcquire(channelCount, 60, TimeUnit.SECONDS);
            if(!acquired){
                log.warn("Most likely a bug, analyse the stack below:");
                log.warn(Util.dumpThreads());
            }

            int getStateInvokedCount = 0;
            int setStateInvokedCount = 0;
            int partialGetStateInvokedCount = 0;
            int partialSetStateInvokedCount = 0;

            Util.sleep(3000);
            for(int i = 0;i < channels.size();i++){
                StreamingStateTransferApplication current =channels.get(i);
                if(current.getStateInvoked){
                    getStateInvokedCount++;
                }
                if(current.setStateInvoked){
                    setStateInvokedCount++;
                }
                if(current.partialGetStateInvoked){
                    partialGetStateInvokedCount++;
                }
                if(current.partialSetStateInvoked){
                    partialSetStateInvokedCount++;
                }
                Map map = current.getMap();
                for(int j = 0;j < channels.size();j++){
                    StreamingStateTransferApplication app =channels.get(j);
                    List l = (List) map.get(app.getLocalAddress());
                    int size = l != null ? l.size() : 0;
                    assertEquals("Correct element count in map ",
                                 StreamingStateTransferApplication.COUNT,
                                 size);
                }
            }

            assertEquals("Correct invocation count of getState ", 1, getStateInvokedCount);
            assertEquals("Correct invocation count of setState ",
                         channelCount - 1,
                         setStateInvokedCount);
            assertEquals("Correct invocation count of partial getState ",
                         1,
                         partialGetStateInvokedCount);
            assertEquals("Correct invocation count of partial setState ",
                         channelCount - 1,
                         partialSetStateInvokedCount);

        }catch(Exception ex){
            log.warn(ex);
        }finally{
            for(int i = 0;i < channels.size();i++){
                StreamingStateTransferApplication app =channels.get(i);
                Util.sleep(500);
                app.cleanup();
            }
        }
    }

    protected class StreamingStateTransferApplication extends PushChannelApplicationWithSemaphore {
        private final Map stateMap = new HashMap();

        public static final int COUNT = 25;

        boolean partialSetStateInvoked = false;

        boolean partialGetStateInvoked = false;

        boolean setStateInvoked = false;

        boolean getStateInvoked = false;

        boolean largeTransfer = false;

        public StreamingStateTransferApplication(String name,
                                                 Semaphore s,
                                                 boolean useDispatcher,
                                                 boolean largeTransfer) throws Exception{
            super(name, s, useDispatcher);
            this.largeTransfer = largeTransfer;
            channel.connect("test");
        }     

        public void receive(Message msg) {
            Address sender = msg.getSrc();
            synchronized(stateMap){
                List list = (List) stateMap.get(sender);
                if(list == null){
                    list = new ArrayList();
                    stateMap.put(sender, list);
                }
                list.add(msg.getObject());
            }
        }

        public Map getMap() {
            return stateMap;
        }

        public void useChannel() throws Exception {
            for(int i = 0;i < COUNT;i++){
                channel.send(null, null, new Integer(i));
            }
            channel.getState(null, 25000);
            channel.getState(null, name, 25000);
        }

        public void getState(OutputStream ostream) {
            if(largeTransfer)
                Util.sleep(4000);

            super.getState(ostream);
            ObjectOutputStream oos = null;
            try{
                oos = new ObjectOutputStream(ostream);
                HashMap copy = null;
                synchronized(stateMap){
                    copy = new HashMap(stateMap);
                }
                oos.writeObject(copy);
                oos.flush();
            }catch(IOException e){
                e.printStackTrace();
            }finally{
                getStateInvoked = true;
                Util.close(oos);
            }
        }

        public byte[] getState() {
            if(largeTransfer)
                Util.sleep(4000);

            byte[] result = null;
            try{
                synchronized(stateMap){
                    result = Util.objectToByteBuffer(stateMap);
                }
            }catch(Exception e){
                e.printStackTrace();
            }finally{
                getStateInvoked = true;
            }
            return result;
        }

        public void setState(byte[] state) {
            if(largeTransfer)
                Util.sleep(4000);

            Map result = null;
            try{
                result = (Map) Util.objectFromByteBuffer(state);
            }catch(Exception e){
                e.printStackTrace();
            }finally{
                setStateInvoked = true;
            }
            synchronized(stateMap){
                stateMap.clear();
                stateMap.putAll(result);
            }
        }

        public void setState(InputStream istream) {
            if(largeTransfer)
                Util.sleep(4000);

            super.setState(istream);
            ObjectInputStream ois = null;
            try{
                ois = new ObjectInputStream(istream);
                Map map = (Map) ois.readObject();
                synchronized(stateMap){
                    stateMap.clear();
                    stateMap.putAll(map);
                }
            }catch(Exception e){
                e.printStackTrace();
            }finally{
                setStateInvoked = true;
                Util.close(ois);
            }
        }

        public void setState(String stateId, byte[] state) {
            if(largeTransfer)
                Util.sleep(4000);

            Object nameTransfer = null;
            try{
                nameTransfer = Util.objectFromByteBuffer(state);
                TestCase.assertEquals("Got partial state requested ", nameTransfer, name);
            }catch(Exception e){
                e.printStackTrace();
            }finally{
                partialSetStateInvoked = true;
            }
        }

        public byte[] getState(String stateId) {
            if(largeTransfer)
                Util.sleep(4000);

            byte[] result = null;
            try{
                result = Util.objectToByteBuffer(stateId);
            }catch(Exception e){
                e.printStackTrace();
            }finally{
                partialGetStateInvoked = true;
            }
            return result;
        }

        public void setState(String state_id, InputStream istream) {
            if(largeTransfer)
                Util.sleep(4000);

            super.setState(state_id, istream);
            ObjectInputStream ois = null;
            try{
                ois = new ObjectInputStream(istream);
                TestCase.assertEquals("Got partial state requested ", ois.readObject(), name);
            }catch(Exception e){
                e.printStackTrace();
            }finally{
                partialSetStateInvoked = true;
                Util.close(ois);
            }
        }

        public void getState(String state_id, OutputStream ostream) {
            if(largeTransfer)
                Util.sleep(4000);

            super.getState(state_id, ostream);
            ObjectOutputStream oos = null;
            try{
                oos = new ObjectOutputStream(ostream);
                oos.writeObject(state_id);
                oos.flush();
            }catch(IOException e){
                e.printStackTrace();
            }finally{
                partialGetStateInvoked = true;
                Util.close(oos);
            }
        }
    }

    public static Test suite() {
        return new TestSuite(StreamingStateTransferTest.class);
    }

    public static void main(String[] args) {
        String[] testCaseName = { StreamingStateTransferTest.class.getName() };
        junit.textui.TestRunner.main(testCaseName);
    }
}
TOP

Related Classes of org.jgroups.tests.StreamingStateTransferTest$StreamingStateTransferApplication

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.