Package org.apache.qpid.server.store.berkeleydb

Source Code of org.apache.qpid.server.store.berkeleydb.HAClusterManagementTest

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.qpid.server.store.berkeleydb;

import static com.sleepycat.je.rep.ReplicatedEnvironment.State.DETACHED;
import static com.sleepycat.je.rep.ReplicatedEnvironment.State.MASTER;
import static com.sleepycat.je.rep.ReplicatedEnvironment.State.REPLICA;
import static com.sleepycat.je.rep.ReplicatedEnvironment.State.UNKNOWN;

import java.util.Arrays;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

import javax.jms.Connection;
import javax.management.ObjectName;
import javax.management.openmbean.CompositeData;
import javax.management.openmbean.TabularData;

import org.apache.log4j.Logger;
import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.management.common.mbeans.ManagedBroker;
import org.apache.qpid.server.store.berkeleydb.jmx.ManagedBDBHAMessageStore;
import org.apache.qpid.test.utils.JMXTestUtils;
import org.apache.qpid.test.utils.QpidBrokerTestCase;

import com.sleepycat.je.EnvironmentFailureException;

/**
* System test verifying the ability to control a cluster via the Management API.
*
* @see HAClusterBlackboxTest
*/
public class HAClusterManagementTest extends QpidBrokerTestCase
{
    protected static final Logger LOGGER = Logger.getLogger(HAClusterManagementTest.class);

    private static final Set<String> NON_MASTER_STATES = new HashSet<String>(Arrays.asList(REPLICA.toString(), DETACHED.toString(), UNKNOWN.toString()));;
    private static final String VIRTUAL_HOST = "test";

    private static final String MANAGED_OBJECT_QUERY = "org.apache.qpid:type=BDBHAMessageStore,name=" + ObjectName.quote(VIRTUAL_HOST);
    private static final int NUMBER_OF_NODES = 4;

    private final HATestClusterCreator _clusterCreator = new HATestClusterCreator(this, VIRTUAL_HOST, NUMBER_OF_NODES);
    private final JMXTestUtils _jmxUtils = new JMXTestUtils(this);

    private ConnectionURL _brokerFailoverUrl;

    @Override
    protected void setUp() throws Exception
    {
        _brokerType = BrokerType.SPAWNED;

        _clusterCreator.configureClusterNodes();
        _brokerFailoverUrl = _clusterCreator.getConnectionUrlForAllClusterNodes();
        _clusterCreator.startCluster();

        super.setUp();
    }

    @Override
    protected void tearDown() throws Exception
    {
        try
        {
            _jmxUtils.close();
        }
        finally
        {
            super.tearDown();
        }
    }

    @Override
    public void startBroker() throws Exception
    {
        // Don't start default broker provided by QBTC.
    }

    public void testReadonlyMBeanAttributes() throws Exception
    {
        final int brokerPortNumber = getBrokerPortNumbers().iterator().next();
        final int bdbPortNumber = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumber);

        ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumber);
        assertEquals("Unexpected store group name", _clusterCreator.getGroupName(), storeBean.getGroupName());
        assertEquals("Unexpected store node name", _clusterCreator.getNodeNameForNodeAt(bdbPortNumber), storeBean.getNodeName());
        assertEquals("Unexpected store node host port",_clusterCreator.getNodeHostPortForNodeAt(bdbPortNumber), storeBean.getNodeHostPort());
        assertEquals("Unexpected store helper host port", _clusterCreator.getHelperHostPort(), storeBean.getHelperHostPort());
        // As we have chosen an arbitrary broker from the cluster, we cannot predict its state
        assertNotNull("Store state must not be null", storeBean.getNodeState());
    }

    public void testStateOfActiveBrokerIsMaster() throws Exception
    {
        final Connection activeConnection = getConnection(_brokerFailoverUrl);
        final int activeBrokerPortNumber = _clusterCreator.getBrokerPortNumberFromConnection(activeConnection);

        ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(activeBrokerPortNumber);
        assertEquals("Unexpected store state", MASTER.toString(), storeBean.getNodeState());
    }

    public void testStateOfNonActiveBrokerIsNotMaster() throws Exception
    {
        final Connection activeConnection = getConnection(_brokerFailoverUrl);
        final int inactiveBrokerPortNumber = _clusterCreator.getPortNumberOfAnInactiveBroker(activeConnection);
        ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(inactiveBrokerPortNumber);
        final String nodeState = storeBean.getNodeState();
        assertTrue("Unexpected store state : " + nodeState, NON_MASTER_STATES.contains(nodeState));
    }

    public void testGroupMembers() throws Exception
    {
        final int brokerPortNumber = getBrokerPortNumbers().iterator().next();

        ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumber);
        awaitAllNodesJoiningGroup(storeBean, NUMBER_OF_NODES);

        final TabularData groupMembers = storeBean.getAllNodesInGroup();
        assertNotNull(groupMembers);

        for(int bdbPortNumber : _clusterCreator.getBdbPortNumbers())
        {
            final String nodeName = _clusterCreator.getNodeNameForNodeAt(bdbPortNumber);
            final String nodeHostPort = _clusterCreator.getNodeHostPortForNodeAt(bdbPortNumber);

            CompositeData row = groupMembers.get(new Object[] {nodeName});
            assertNotNull("Table does not contain row for node name " + nodeName, row);
            assertEquals(nodeHostPort, row.get(BDBHAMessageStore.GRP_MEM_COL_NODE_HOST_PORT));
        }
    }

    public void testRemoveNodeFromGroup() throws Exception
    {
        final Iterator<Integer> brokerPortNumberIterator = getBrokerPortNumbers().iterator();
        final int brokerPortNumberToMakeObservation = brokerPortNumberIterator.next();
        final int brokerPortNumberToBeRemoved = brokerPortNumberIterator.next();
        final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumberToMakeObservation);
        awaitAllNodesJoiningGroup(storeBean, NUMBER_OF_NODES);

        final String removedNodeName = _clusterCreator.getNodeNameForNodeAt(_clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeRemoved));
        _clusterCreator.stopNode(brokerPortNumberToBeRemoved);
        storeBean.removeNodeFromGroup(removedNodeName);

        final int numberOfDataRowsAfterRemoval = storeBean.getAllNodesInGroup().size();
        assertEquals("Unexpected number of data rows before test", NUMBER_OF_NODES - 1,numberOfDataRowsAfterRemoval);
    }

    /**
     * Updates the address of a node.
     *
     * If the broker (node) can subsequently start without error then the update was a success, hence no need for an explicit
     * assert.
     *
     * @see #testRestartNodeWithNewPortNumberWithoutFirstCallingUpdateAddressThrowsAnException() for converse case
     */
    public void testUpdateAddress() throws Exception
    {
        final Iterator<Integer> brokerPortNumberIterator = getBrokerPortNumbers().iterator();
        final int brokerPortNumberToPerformUpdate = brokerPortNumberIterator.next();
        final int brokerPortNumberToBeMoved = brokerPortNumberIterator.next();
        final ManagedBDBHAMessageStore storeBean = getStoreBeanForNodeAtBrokerPort(brokerPortNumberToPerformUpdate);

        _clusterCreator.stopNode(brokerPortNumberToBeMoved);

        final int oldBdbPort = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeMoved);
        final int newBdbPort = getNextAvailable(oldBdbPort + 1);

        storeBean.updateAddress(_clusterCreator.getNodeNameForNodeAt(oldBdbPort), _clusterCreator.getIpAddressOfBrokerHost(), newBdbPort);

        _clusterCreator.modifyClusterNodeBdbAddress(brokerPortNumberToBeMoved, newBdbPort);

        _clusterCreator.startNode(brokerPortNumberToBeMoved);
    }

    /**
     * @see #testUpdateAddress()
     */
    public void testRestartNodeWithNewPortNumberWithoutFirstCallingUpdateAddressThrowsAnException() throws Exception
    {
        final Iterator<Integer> brokerPortNumberIterator = getBrokerPortNumbers().iterator();
        final int brokerPortNumberToBeMoved = brokerPortNumberIterator.next();

        _clusterCreator.stopNode(brokerPortNumberToBeMoved);

        final int oldBdbPort = _clusterCreator.getBdbPortForBrokerPort(brokerPortNumberToBeMoved);
        final int newBdbPort = getNextAvailable(oldBdbPort + 1);

        // now deliberately don't call updateAddress

        _clusterCreator.modifyClusterNodeBdbAddress(brokerPortNumberToBeMoved, newBdbPort);

        try
        {
            _clusterCreator.startNode(brokerPortNumberToBeMoved);
            fail("Exception not thrown");
        }
        catch(RuntimeException rte)
        {
            //check cause was BDBs EnvironmentFailureException
            assertTrue(rte.getMessage().contains(EnvironmentFailureException.class.getName()));
            // PASS
        }
    }

    public void testVirtualHostOperationsDeniedForNonMasterNode() throws Exception
    {
        final Connection activeConnection = getConnection(_brokerFailoverUrl);
        final int inactiveBrokerPortNumber = _clusterCreator.getPortNumberOfAnInactiveBroker(activeConnection);

        ManagedBroker inactiveBroker = getManagedBrokerBeanForNodeAtBrokerPort(inactiveBrokerPortNumber);

        try
        {
            inactiveBroker.createNewQueue(getTestQueueName(), null, true);
            fail("Exception not thrown");
        }
        catch  (Exception e)
        {
            String message = e.getMessage();
            assertEquals("The virtual hosts state of PASSIVE does not permit this operation.", message);
        }

        try
        {
            inactiveBroker.createNewExchange(getName(), "direct", true);
            fail("Exception not thrown");
        }
        catch  (Exception e)
        {
            String message = e.getMessage();
            assertEquals("The virtual hosts state of PASSIVE does not permit this operation.", message);
        }
    }

    private ManagedBDBHAMessageStore getStoreBeanForNodeAtBrokerPort(final int brokerPortNumber) throws Exception
    {
        _jmxUtils.open(brokerPortNumber);

        return _jmxUtils.getManagedObject(ManagedBDBHAMessageStore.class, MANAGED_OBJECT_QUERY);
    }

    private ManagedBroker getManagedBrokerBeanForNodeAtBrokerPort(final int brokerPortNumber) throws Exception
    {
        _jmxUtils.open(brokerPortNumber);

        return _jmxUtils.getManagedBroker(VIRTUAL_HOST);
    }

    private void awaitAllNodesJoiningGroup(ManagedBDBHAMessageStore storeBean, int expectedNumberOfNodes) throws Exception
    {
        long totalTimeWaited = 0l;
        long waitInterval = 100l;
        long maxWaitTime = 10000;

        int currentNumberOfNodes = storeBean.getAllNodesInGroup().size();
        while (expectedNumberOfNodes > currentNumberOfNodes || totalTimeWaited > maxWaitTime)
        {
            LOGGER.debug("Still awaiting nodes to join group; expecting "
                + expectedNumberOfNodes + " node(s) but only have " + currentNumberOfNodes
                + " after " + totalTimeWaited + " ms.");

            totalTimeWaited += waitInterval;
            Thread.sleep(waitInterval);

            currentNumberOfNodes = storeBean.getAllNodesInGroup().size();
        }

        assertEquals("Unexpected number of nodes in group after " + totalTimeWaited + " ms",
                expectedNumberOfNodes ,currentNumberOfNodes);
    }
}
TOP

Related Classes of org.apache.qpid.server.store.berkeleydb.HAClusterManagementTest

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.