/**
* Copyright (C) 2012 LinkedIn Inc <opensource@linkedin.com>
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.linkedin.helix.integration;
import java.util.Date;
import java.util.List;
import org.apache.log4j.Logger;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import com.linkedin.helix.HelixAdmin;
import com.linkedin.helix.HelixManager;
import com.linkedin.helix.HelixManagerFactory;
import com.linkedin.helix.InstanceType;
import com.linkedin.helix.ZNRecord;
import com.linkedin.helix.controller.GenericHelixController;
import com.linkedin.helix.manager.file.FileHelixAdmin;
import com.linkedin.helix.mock.storage.DummyProcess;
import com.linkedin.helix.model.IdealState;
import com.linkedin.helix.model.InstanceConfig;
import com.linkedin.helix.model.InstanceConfig.InstanceConfigProperty;
import com.linkedin.helix.model.StateModelDefinition;
import com.linkedin.helix.store.PropertyJsonComparator;
import com.linkedin.helix.store.PropertyJsonSerializer;
import com.linkedin.helix.store.file.FilePropertyStore;
import com.linkedin.helix.tools.ClusterStateVerifier;
import com.linkedin.helix.tools.IdealStateCalculatorForStorageNode;
import com.linkedin.helix.tools.StateModelConfigGenerator;
/**
* Test base for dynamic file-based cluster manager
*
* @author zzhang
*
*/
public class FileCMTestBase
{
private static Logger logger =
Logger.getLogger(FileCMTestBase.class);
protected final String CLUSTER_NAME = "CLUSTER_"
+ getShortClassName();
private static final String TEST_DB = "TestDB";
protected static final String STATE_MODEL = "MasterSlave";
protected static final int NODE_NR = 5;
protected static final int START_PORT = 12918;
final String ROOT_PATH = "/tmp/"
+ getShortClassName();
protected final FilePropertyStore<ZNRecord> _fileStore =
new FilePropertyStore<ZNRecord>(new PropertyJsonSerializer<ZNRecord>(ZNRecord.class),
ROOT_PATH,
new PropertyJsonComparator<ZNRecord>(ZNRecord.class));
protected HelixManager _manager;
protected HelixAdmin _mgmtTool;
@BeforeClass()
public void beforeClass() throws Exception
{
System.out.println("START BEFORECLASS FileCMTestBase at "
+ new Date(System.currentTimeMillis()));
// setup test cluster
_mgmtTool = new FileHelixAdmin(_fileStore);
_mgmtTool.addCluster(CLUSTER_NAME, true);
StateModelConfigGenerator generator = new StateModelConfigGenerator();
_mgmtTool.addStateModelDef(CLUSTER_NAME, "LeaderStandby",
new StateModelDefinition(generator.generateConfigForLeaderStandby()));
_mgmtTool.addStateModelDef(CLUSTER_NAME,
"OnlineOffline",
new StateModelDefinition(generator.generateConfigForOnlineOffline()));
_mgmtTool.addResource(CLUSTER_NAME, TEST_DB, 10, STATE_MODEL);
for (int i = 0; i < NODE_NR; i++)
{
addNodeToCluster(CLUSTER_NAME, "localhost", START_PORT + i);
}
rebalanceStorageCluster(CLUSTER_NAME, TEST_DB, 3);
// start dummy storage nodes
for (int i = 0; i < NODE_NR; i++)
{
DummyProcess process =
new DummyProcess(null,
CLUSTER_NAME,
"localhost_" + (START_PORT + i),
"dynamic-file",
null,
0,
_fileStore);
try
{
process.start();
}
catch (Exception e)
{
logger.error("fail to start dummy participant using dynmaic file-based cluster-manager",
e);
}
_manager =
HelixManagerFactory.getDynamicFileHelixManager(CLUSTER_NAME,
"controller_0",
InstanceType.CONTROLLER,
_fileStore);
}
// start cluster manager controller
GenericHelixController controller = new GenericHelixController();
try
{
// manager.addConfigChangeListener(controller);
_manager.addLiveInstanceChangeListener(controller);
_manager.addIdealStateChangeListener(controller);
// manager.addExternalViewChangeListener(controller);
_manager.connect();
}
catch (Exception e)
{
logger.error("fail to start controller using dynamic file-based cluster-manager ",
e);
}
boolean result =
ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewFileVerifier(ROOT_PATH,
CLUSTER_NAME));
Assert.assertTrue(result);
System.out.println("END BEFORECLASS FileCMTestBase at "
+ new Date(System.currentTimeMillis()));
}
@AfterClass()
public void afterClass() throws Exception
{
logger.info("START afterClass FileCMTestBase shutting down file-based cluster managers at "
+ new Date(System.currentTimeMillis()));
// Thread.sleep(3000);
// _store.stop();
_manager.disconnect();
_manager.disconnect(); // test if disconnect() can be called twice
logger.info("END afterClass FileCMTestBase at "
+ new Date(System.currentTimeMillis()));
}
private String getShortClassName()
{
String className = this.getClass().getName();
return className.substring(className.lastIndexOf('.') + 1);
}
private void addNodeToCluster(String clusterName, String host, int port)
{
// TODO use ClusterSetup
String nodeId = host + "_" + port;
ZNRecord nodeConfig = new ZNRecord(nodeId);
nodeConfig.setSimpleField(InstanceConfigProperty.HELIX_HOST.toString(), host);
nodeConfig.setSimpleField(InstanceConfigProperty.HELIX_PORT.toString(),
Integer.toString(port));
nodeConfig.setSimpleField(InstanceConfigProperty.HELIX_ENABLED.toString(),
Boolean.toString(true));
_mgmtTool.addInstance(CLUSTER_NAME, new InstanceConfig(nodeConfig));
}
protected void rebalanceStorageCluster(String clusterName,
String resourceName,
int replica)
{
List<String> nodeNames = _mgmtTool.getInstancesInCluster(clusterName);
IdealState idealState = _mgmtTool.getResourceIdealState(clusterName, resourceName);
idealState.setReplicas(Integer.toString(replica));
int partitions = idealState.getNumPartitions();
ZNRecord newIdealState =
IdealStateCalculatorForStorageNode.calculateIdealState(nodeNames,
partitions,
replica - 1,
resourceName,
"MASTER",
"SLAVE");
newIdealState.merge(idealState.getRecord());
_mgmtTool.setResourceIdealState(clusterName,
resourceName,
new IdealState(newIdealState));
}
}