Package com.hazelcast.partition.impl

Source Code of com.hazelcast.partition.impl.PartitionStateGeneratorTest$GroupPartitionState

/*
* Copyright (c) 2008-2013, Hazelcast, Inc. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.hazelcast.partition.impl;

import com.hazelcast.config.ClasspathXmlConfig;
import com.hazelcast.config.Config;
import com.hazelcast.config.MemberGroupConfig;
import com.hazelcast.config.PartitionGroupConfig;
import com.hazelcast.core.Hazelcast;
import com.hazelcast.core.Member;
import com.hazelcast.instance.MemberImpl;
import com.hazelcast.nio.Address;
import com.hazelcast.partition.InternalPartition;
import com.hazelcast.partition.membergroup.ConfigMemberGroupFactory;
import com.hazelcast.partition.membergroup.DefaultMemberGroup;
import com.hazelcast.partition.membergroup.HostAwareMemberGroupFactory;
import com.hazelcast.partition.membergroup.MemberGroup;
import com.hazelcast.partition.membergroup.MemberGroupFactory;
import com.hazelcast.partition.membergroup.SingleMemberGroupFactory;
import com.hazelcast.test.HazelcastSerialClassRunner;
import com.hazelcast.test.annotation.QuickTest;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.io.IOException;
import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

@RunWith(HazelcastSerialClassRunner.class)
@Category(QuickTest.class)
public class PartitionStateGeneratorTest {

    private static final boolean printState = false;

    @BeforeClass
    @AfterClass
    public static void init() throws IOException {
        Hazelcast.shutdownAll();
    }

    @Test
    public void testRandomPartitionGenerator() throws Exception {
        final MemberGroupFactory memberGroupFactory = new SingleMemberGroupFactory();
        test(memberGroupFactory);
    }

    //"random host groups may cause non-uniform distribution of partitions when node size go down significantly!")
    @Test
    public void testHostAwarePartitionStateGenerator() throws Exception {
        final HostAwareMemberGroupFactory memberGroupFactory = new HostAwareMemberGroupFactory();
        test(memberGroupFactory);
    }

    @Test
    public void testCustomPartitionStateGenerator() throws Exception {
        final MemberGroupFactory memberGroupFactory = new MemberGroupFactory() {
            public Collection<MemberGroup> createMemberGroups(Collection<? extends Member> members) {
                MemberGroup[] g = new MemberGroup[4];
                for (int i = 0; i < g.length; i++) {
                    g[i] = new DefaultMemberGroup();
                }
                for (Member member : members) {
                    Address address = ((MemberImpl) member).getAddress();
                    if (even(address.getHost().hashCode()) && even(address.getPort())) {
                        g[0].addMember(member);
                    } else if (even(address.getHost().hashCode()) && !even(address.getPort())) {
                        g[1].addMember(member);
                    } else if (!even(address.getHost().hashCode()) && even(address.getPort())) {
                        g[2].addMember(member);
                    } else if (!even(address.getHost().hashCode()) && !even(address.getPort())) {
                        g[3].addMember(member);
                    }
                }
                List<MemberGroup> list = new LinkedList<MemberGroup>();
                for (MemberGroup memberGroup : g) {
                    if (memberGroup.size() > 0) {
                        list.add(memberGroup);
                    }
                }
                return list;
            }

            boolean even(int k) {
                return k % 2 == 0;
            }
        };
        test(memberGroupFactory);
    }

    @Test
    public void testConfigCustomPartitionStateGenerator() throws Exception {
        PartitionGroupConfig config = new PartitionGroupConfig();
        config.setEnabled(true);
        config.setGroupType(PartitionGroupConfig.MemberGroupType.CUSTOM);
        MemberGroupConfig mgCfg0 = new MemberGroupConfig();
        MemberGroupConfig mgCfg1 = new MemberGroupConfig();
        MemberGroupConfig mgCfg2 = new MemberGroupConfig();
        MemberGroupConfig mgCfg3 = new MemberGroupConfig();

        config.addMemberGroupConfig(mgCfg0);
        config.addMemberGroupConfig(mgCfg1);
        config.addMemberGroupConfig(mgCfg2);
        config.addMemberGroupConfig(mgCfg3);

        for (int k = 0; k < 3; k++) {
            for (int i = 0; i < 255; i++) {
                MemberGroupConfig mg;
                switch (i % 4) {
                    case 0:
                        mg = mgCfg0;
                        break;
                    case 1:
                        mg = mgCfg1;
                        break;
                    case 2:
                        mg = mgCfg2;
                        break;
                    case 3:
                        mg = mgCfg3;
                        break;
                    default:
                        throw new IllegalArgumentException();
                }
                mg.addInterface("10.10." + k + "." + i);
            }
        }

        test(new ConfigMemberGroupFactory(config.getMemberGroupConfigs()));
    }

    @Test
    public void testXmlPartitionGroupConfig() {
        Config config = new ClasspathXmlConfig("hazelcast-fullconfig.xml");
        PartitionGroupConfig partitionGroupConfig = config.getPartitionGroupConfig();
        assertFalse(partitionGroupConfig.isEnabled());
        assertEquals(PartitionGroupConfig.MemberGroupType.CUSTOM, partitionGroupConfig.getGroupType());
        assertEquals(2, partitionGroupConfig.getMemberGroupConfigs().size());
    }

    private void test(MemberGroupFactory memberGroupFactory) throws Exception {
        PartitionStateGenerator generator = new PartitionStateGeneratorImpl();
        int maxSameHostCount = 3;
        int[] partitionCounts = new int[]{271, 787, 1549, 3217};
        int[] members = new int[]{3, 6, 9, 10, 11, 17, 57, 100, 130, 77, 179, 93, 37, 26, 15, 5};
        for (int partitionCount : partitionCounts) {
            int memberCount = members[0];
            List<Member> memberList = createMembers(memberCount, maxSameHostCount);
            Collection<MemberGroup> groups = memberGroupFactory.createMemberGroups(memberList);
            println("PARTITION-COUNT= " + partitionCount + ", MEMBER-COUNT= "
                    + members[0] + ", GROUP-COUNT= " + groups.size());
            println();
            Address[][] state = generator.initialize(memberGroupFactory.createMemberGroups(memberList), partitionCount);
            checkTestResult(state, groups, partitionCount);
            int previousMemberCount = memberCount;
            for (int j = 1; j < members.length; j++) {
                memberCount = members[j];
                if (partitionCount / memberCount < 10) {
                    break;
                }
                if ((float) partitionCount / memberCount > 2) {
                    if (previousMemberCount == 0) {
                        memberList = createMembers(memberCount, maxSameHostCount);
                    } else if (memberCount > previousMemberCount) {
                        MemberImpl last = (MemberImpl) memberList.get(previousMemberCount - 1);
                        List<Member> extra = createMembers(last, (memberCount - previousMemberCount), maxSameHostCount);
                        memberList.addAll(extra);
                    } else {
                        memberList = memberList.subList(0, memberCount);
                        shift(state, memberList);
                    }
                    groups = memberGroupFactory.createMemberGroups(memberList);
                    println("PARTITION-COUNT= " + partitionCount + ", MEMBER-COUNT= " + memberCount + ", GROUP-COUNT= " + groups.size());
                    //todo
                    state = generator.reArrange(memberGroupFactory.createMemberGroups(memberList), toPartitionView(state));
                    checkTestResult(state, groups, partitionCount);
                    previousMemberCount = memberCount;
                }
            }
        }
    }

    private DummyInternalPartition[] toPartitionView(Address[][] state) {
        DummyInternalPartition[] result = new DummyInternalPartition[state.length];
        for (int partitionId = 0; partitionId < state.length; partitionId++) {
            DummyInternalPartition partitionView = new DummyInternalPartition(state[partitionId]);
            result[partitionId] = partitionView;
        }
        return result;
    }

    private class DummyInternalPartition implements InternalPartition {
        private Address[] replicas;

        private DummyInternalPartition(Address[] replicas) {
            this.replicas = replicas;
        }

        @Override
        public int getPartitionId() {
            throw new UnsupportedOperationException();
        }

        @Override
        public Address getOwnerOrNull() {
            return replicas[0];
        }

        @Override
        public boolean isMigrating() {
            throw new UnsupportedOperationException();
        }

        @Override
        public Address getReplicaAddress(int replicaIndex) {
            return replicas[replicaIndex];
        }

        @Override
        public boolean isOwnerOrBackup(Address address) {
            throw new UnsupportedOperationException();
        }
    }

    private static void shift(Address[][] state, List<Member> members) {
        Set<Address> addresses = new HashSet<Address>();
        for (Member member : members) {
            addresses.add(((MemberImpl) member).getAddress());
        }
        for (int partitionId = 0; partitionId < state.length; partitionId++) {
            Address[] replicas = state[partitionId];
            for (int i = 0; i < replicas.length; i++) {
                if (replicas[i] != null && !addresses.contains(replicas[i])) {
                    Address[] validAddresses = new Address[InternalPartition.MAX_REPLICA_COUNT - i];
                    int k = 0;
                    for (int a = i + 1; a < InternalPartition.MAX_REPLICA_COUNT; a++) {
                        Address address = replicas[a];
                        if (address != null && addresses.contains(address)) {
                            validAddresses[k++] = address;
                        }
                    }
                    for (int a = 0; a < k; a++) {
                        replicas[i + a] = validAddresses[a];
                    }
                    for (int a = i + k; a < InternalPartition.MAX_REPLICA_COUNT; a++) {
                        replicas[a] = null;
                    }
                    break;
                }
            }
        }
    }

    private static List<Member> createMembers(int memberCount, int maxSameHostCount) throws Exception {
        return createMembers(null, memberCount, maxSameHostCount);
    }

    private static List<Member> createMembers(MemberImpl startAfter, int memberCount, int maxSameHostCount) throws Exception {
        Random rand = new Random();
        final byte[] ip = new byte[]{10, 10, 0, 0};
        if (startAfter != null) {
            Address address = startAfter.getAddress();
            byte[] startIp = address.getInetAddress().getAddress();
            if (startIp[3] < 255) {
                ip[2] = startIp[2];
                ip[3] = (byte) (startIp[3] + 1);
            } else {
                ip[2] = (byte) (startIp[2] + 1);
                ip[3] = 0;
            }
        }
        int count = 0;
        int port = 5700;
        List<Member> members = new ArrayList<Member>();
        int sameHostCount = rand.nextInt(maxSameHostCount) + 1;
        for (int i = 0; i < memberCount; i++) {
            if (count == sameHostCount) {
                ip[3] = ++ip[3];
                count = 0;
                port = 5700;
                sameHostCount = rand.nextInt(maxSameHostCount) + 1;
            }
            count++;
            port++;
            MemberImpl m = new MemberImpl(new Address(InetAddress.getByAddress(new byte[]{ip[0], ip[1], ip[2], ip[3]})
                    , port), false);
            members.add(m);
            if (ip[3] == 255) {
                ip[2] = ++ip[2];
            }
        }
        return members;
    }

    private void checkTestResult(final Address[][] state, final Collection<MemberGroup> groups, final int partitionCount) {
        Iterator<MemberGroup> iter = groups.iterator();
        while (iter.hasNext()) {
            if (iter.next().size() == 0) {
                iter.remove();
            }
        }
        final int replicaCount = Math.min(groups.size(), InternalPartition.MAX_REPLICA_COUNT);
        final Map<MemberGroup, GroupPartitionState> groupPartitionStates = new HashMap<MemberGroup, GroupPartitionState>();
        final Set<Address> set = new HashSet<Address>();
        final int avgPartitionPerGroup = partitionCount / groups.size();
        for (int partitionId = 0; partitionId < partitionCount; partitionId++) {
            Address[] replicas = state[partitionId];
            for (int i = 0; i < replicaCount; i++) {
                Address owner = replicas[i];
                Assert.assertNotNull(owner);
                assertFalse("Duplicate owner of partition: " + partitionId,
                        set.contains(owner));
                set.add(owner);
                MemberGroup group = null;
                for (MemberGroup g : groups) {
                    if (g.hasMember(new MemberImpl(owner, true))) {
                        group = g;
                        break;
                    }
                }
                Assert.assertNotNull(group);
                GroupPartitionState groupState = groupPartitionStates.get(group);
                if (groupState == null) {
                    groupState = new GroupPartitionState();
                    groupState.group = group;
                    groupPartitionStates.put(group, groupState);
                }
                groupState.groupPartitions[i].add(partitionId);
                groupState.getNodePartitions(owner)[i].add(partitionId);
            }
            set.clear();
        }
        print("Owner");
        for (int i = 0; i < replicaCount; i++) {
            if (i == 0) {
                print("\t\t");
            }
            print("\tRep-" + i);
        }
        print("\tTotal");
        println();
        println("_______________________________________________________________________________________");
        int k = 1;
        for (GroupPartitionState groupState : groupPartitionStates.values()) {
            for (Map.Entry<Address, Set<Integer>[]> entry : groupState.nodePartitionsMap.entrySet()) {
                int total = 0;
                print(entry.getKey().getHost() + ":" + entry.getKey().getPort());
                Collection<Integer>[] partitions = entry.getValue();
                for (int i = 0; i < replicaCount; i++) {
                    final int avgPartitionPerNode = groupState.groupPartitions[i].size() / groupState.nodePartitionsMap.size();
                    if (i == 0) {
                        print("\t");
                    }
                    print('\t');
                    int count = partitions[i].size();
                    print(count);
                    total += partitions[i].size();
                    isInAllowedRange(count, avgPartitionPerNode, i, entry.getKey(), groups, partitionCount);
                }
                print('\t');
                print(total);
                println();
            }
            println("----------------------------------------------------------------------------------------");
            int total = 0;
            print("Group" + (k++) + "[" + groupState.group.size() + "]");
            Collection<Integer>[] partitions = groupState.groupPartitions;
            for (int i = 0; i < replicaCount; i++) {
                if (i == 0) {
                    print("\t");
                }
                print('\t');
                int count = partitions[i].size();
                print(count);
                total += partitions[i].size();
                isInAllowedRange(count, avgPartitionPerGroup, i, groupState.group, groups, partitionCount);
            }
            print('\t');
            print(total);
            println();
            println();
        }
        println();
        println();
    }

    private static void isInAllowedRange(int count, int average, int replica,
                                         Object owner, final Collection<MemberGroup> groups, final int partitionCount) {
        if (average <= 10) {
            return;
        }
        final float r = 2f;
        assertTrue("Too low partition count! \nOwned: " + count + ", Avg: " + average
                + ", \nPartitionCount: " + partitionCount + ", Replica: " + replica +
                ", \nOwner: " + owner, count >= (float) (average) / r);

        assertTrue("Too high partition count! \nOwned: " + count + ", Avg: " + average
                + ", \nPartitionCount: " + partitionCount + ", Replica: " + replica +
                ", \nOwner: " + owner, count <= (float) (average) * r);
    }

    private static void println(Object str) {
        print(str);
        println();
    }

    private static void println() {
        print('\n');
    }

    private static void print(Object str) {
        if (!printState) {
            return;
        }
        System.out.print(str);
    }

    private static class GroupPartitionState {
        MemberGroup group;
        Set<Integer>[] groupPartitions = new Set[InternalPartition.MAX_REPLICA_COUNT];
        Map<Address, Set<Integer>[]> nodePartitionsMap = new HashMap<Address, Set<Integer>[]>();

        {
            for (int i = 0; i < InternalPartition.MAX_REPLICA_COUNT; i++) {
                groupPartitions[i] = new HashSet<Integer>();
            }
        }

        Set<Integer>[] getNodePartitions(Address node) {
            Set<Integer>[] nodePartitions = nodePartitionsMap.get(node);
            if (nodePartitions == null) {
                nodePartitions = new Set[InternalPartition.MAX_REPLICA_COUNT];
                for (int i = 0; i < InternalPartition.MAX_REPLICA_COUNT; i++) {
                    nodePartitions[i] = new HashSet<Integer>();
                }
                nodePartitionsMap.put(node, nodePartitions);
            }
            return nodePartitions;
        }
    }
}
TOP

Related Classes of com.hazelcast.partition.impl.PartitionStateGeneratorTest$GroupPartitionState

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.