Package org.apache.helix.testutil

Source Code of org.apache.helix.testutil.HelixTestUtil

package org.apache.helix.testutil;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.apache.helix.BaseDataAccessor;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.State;
import org.apache.helix.api.id.MessageId;
import org.apache.helix.api.id.StateModelDefId;
import org.apache.helix.controller.pipeline.Pipeline;
import org.apache.helix.controller.pipeline.Stage;
import org.apache.helix.controller.pipeline.StageContext;
import org.apache.helix.controller.stages.ClusterEvent;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.manager.zk.ZkClient;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.IdealState.RebalanceMode;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
import org.apache.helix.model.Message;
import org.apache.helix.model.Message.Attributes;
import org.apache.helix.model.Message.MessageType;
import org.apache.helix.model.StateModelDefinition;
import org.apache.helix.tools.ClusterStateVerifier.ZkVerifier;
import org.apache.helix.tools.StateModelConfigGenerator;
import org.apache.log4j.Logger;

public class HelixTestUtil {
  private static Logger LOG = Logger.getLogger(HelixTestUtil.class);

  /**
   * Ensures that external view and current state are empty
   */
  public static class EmptyZkVerifier extends ZkVerifier {
    private final String _resourceName;

    /**
     * Instantiate the verifier
     * @param clusterName the cluster to verify
     * @param resourceName the resource to verify
     */
    public EmptyZkVerifier(String clusterName, String resourceName, ZkClient zkclient) {
      super(clusterName, zkclient);
      _resourceName = resourceName;
    }

    @Override
    public boolean verify() {
      BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(getZkClient());
      HelixDataAccessor accessor = new ZKHelixDataAccessor(getClusterName(), baseAccessor);
      PropertyKey.Builder keyBuilder = accessor.keyBuilder();
      ExternalView externalView = accessor.getProperty(keyBuilder.externalView(_resourceName));

      // verify external view empty
      if (externalView != null) {
        for (String partition : externalView.getPartitionSet()) {
          Map<String, String> stateMap = externalView.getStateMap(partition);
          if (stateMap != null && !stateMap.isEmpty()) {
            LOG.error("External view not empty for " + partition);
            return false;
          }
        }
      }

      // verify current state empty
      List<String> liveParticipants = accessor.getChildNames(keyBuilder.liveInstances());
      for (String participant : liveParticipants) {
        List<String> sessionIds = accessor.getChildNames(keyBuilder.sessions(participant));
        for (String sessionId : sessionIds) {
          CurrentState currentState =
              accessor.getProperty(keyBuilder.currentState(participant, sessionId, _resourceName));
          Map<String, String> partitionStateMap = currentState.getPartitionStateMap();
          if (partitionStateMap != null && !partitionStateMap.isEmpty()) {
            LOG.error("Current state not empty for " + participant);
            return false;
          }
        }
      }
      return true;
    }
  }

  /**
   * Poll for the existence (or lack thereof) of a specific Helix property
   * @param clazz the HelixProeprty subclass
   * @param accessor connected HelixDataAccessor
   * @param key the property key to look up
   * @param shouldExist true if the property should exist, false otherwise
   * @return the property if found, or null if it does not exist
   */
  public static <T extends HelixProperty> T pollForProperty(Class<T> clazz,
      HelixDataAccessor accessor, PropertyKey key, boolean shouldExist) throws InterruptedException {
    final int POLL_TIMEOUT = 5000;
    final int POLL_INTERVAL = 50;
    T property = accessor.getProperty(key);
    int timeWaited = 0;
    while (((shouldExist && property == null) || (!shouldExist && property != null))
        && timeWaited < POLL_TIMEOUT) {
      Thread.sleep(POLL_INTERVAL);
      timeWaited += POLL_INTERVAL;
      property = accessor.getProperty(key);
    }
    return property;
  }

  public static void setupStateModel(BaseDataAccessor<ZNRecord> baseAccessor, String clusterName) {
    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
    PropertyKey.Builder keyBuilder = accessor.keyBuilder();

    StateModelDefinition masterSlave =
        new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
    accessor.setProperty(keyBuilder.stateModelDef(masterSlave.getId()), masterSlave);

    StateModelDefinition leaderStandby =
        new StateModelDefinition(StateModelConfigGenerator.generateConfigForLeaderStandby());
    accessor.setProperty(keyBuilder.stateModelDef(leaderStandby.getId()), leaderStandby);

    StateModelDefinition onlineOffline =
        new StateModelDefinition(StateModelConfigGenerator.generateConfigForOnlineOffline());
    accessor.setProperty(keyBuilder.stateModelDef(onlineOffline.getId()), onlineOffline);
  }

  public static List<IdealState> setupIdealState(BaseDataAccessor<ZNRecord> baseAccessor,
      String clusterName, int[] nodes, String[] resources, int partitions, int replicas) {
    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
    PropertyKey.Builder keyBuilder = accessor.keyBuilder();

    List<IdealState> idealStates = new ArrayList<IdealState>();
    List<String> instances = new ArrayList<String>();
    for (int i : nodes) {
      instances.add("localhost_" + i);
    }

    for (String resourceName : resources) {
      IdealState idealState = new IdealState(resourceName);
      for (int p = 0; p < partitions; p++) {
        List<String> value = new ArrayList<String>();
        for (int r = 0; r < replicas; r++) {
          int n = nodes[(p + r) % nodes.length];
          value.add("localhost_" + n);
        }
        idealState.getRecord().setListField(resourceName + "_" + p, value);
      }

      idealState.setReplicas(Integer.toString(replicas));
      idealState.setStateModelDefId(StateModelDefId.from("MasterSlave"));
      idealState.setRebalanceMode(RebalanceMode.SEMI_AUTO);
      idealState.setNumPartitions(partitions);
      idealStates.add(idealState);

      // System.out.println(idealState);
      accessor.setProperty(keyBuilder.idealStates(resourceName), idealState);
    }
    return idealStates;
  }

  public static void setupLiveInstances(BaseDataAccessor<ZNRecord> baseAccessor,
      String clusterName, int[] liveInstances) {
    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
    PropertyKey.Builder keyBuilder = accessor.keyBuilder();

    for (int i = 0; i < liveInstances.length; i++) {
      String instance = "localhost_" + liveInstances[i];
      LiveInstance liveInstance = new LiveInstance(instance);
      liveInstance.setSessionId("session_" + liveInstances[i]);
      liveInstance.setHelixVersion("0.4.0");
      accessor.setProperty(keyBuilder.liveInstance(instance), liveInstance);
    }
  }

  public static void setupInstances(BaseDataAccessor<ZNRecord> baseAccessor, String clusterName,
      int[] instances) {
    ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(clusterName, baseAccessor);
    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
    for (int i = 0; i < instances.length; i++) {
      String instance = "localhost_" + instances[i];
      InstanceConfig instanceConfig = new InstanceConfig(instance);
      instanceConfig.setHostName("localhost");
      instanceConfig.setPort("" + instances[i]);
      instanceConfig.setInstanceEnabled(true);
      accessor.setProperty(keyBuilder.instanceConfig(instance), instanceConfig);
    }
  }

  public static void runPipeline(ClusterEvent event, Pipeline pipeline) {
    try {
      pipeline.handle(event);
      pipeline.finish();
    } catch (Exception e) {
      LOG.error("Exception while executing pipeline:" + pipeline
          + ". Will not continue to next pipeline", e);
    }
  }


  public static void runStage(ClusterEvent event, Stage stage) throws Exception {
    StageContext context = new StageContext();
    stage.init(context);
    stage.preProcess();
    stage.process(event);
    stage.postProcess();
  }

  public static Message newMessage(MessageType type, MessageId msgId, String fromState,
      String toState, String resourceName, String tgtName) {
    Message msg = new Message(type.toString(), msgId);
    msg.setFromState(State.from(fromState));
    msg.setToState(State.from(toState));
    msg.getRecord().setSimpleField(Attributes.RESOURCE_NAME.toString(), resourceName);
    msg.setTgtName(tgtName);
    return msg;
  }
}
TOP

Related Classes of org.apache.helix.testutil.HelixTestUtil

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.