Package org.apache.helix.task

Source Code of org.apache.helix.task.TaskUtil

package org.apache.helix.task;

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*/

import java.io.IOException;
import java.text.ParseException;
import java.util.Collections;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.helix.AccessOption;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixProperty;
import org.apache.helix.PropertyKey;
import org.apache.helix.ZNRecord;
import org.apache.helix.api.State;
import org.apache.helix.api.id.PartitionId;
import org.apache.helix.model.CurrentState;
import org.apache.helix.model.HelixConfigScope;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.ResourceAssignment;
import org.apache.helix.model.builder.HelixConfigScopeBuilder;
import org.apache.log4j.Logger;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;

import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;

/**
* Static utility methods.
*/
public class TaskUtil {
  private static final Logger LOG = Logger.getLogger(TaskUtil.class);
  public static final String CONTEXT_NODE = "Context";
  public static final String PREV_RA_NODE = "PreviousResourceAssignment";

  /**
   * Parses job resource configurations in Helix into a {@link JobConfig} object.
   * @param manager HelixManager object used to connect to Helix.
   * @param jobResource The name of the job resource.
   * @return A {@link JobConfig} object if Helix contains valid configurations for the job, null
   *         otherwise.
   */
  public static JobConfig getJobCfg(HelixManager manager, String jobResource) {
    HelixProperty jobResourceConfig = getResourceConfig(manager, jobResource);
    return getJobCfg(jobResourceConfig);
  }

  /**
   * Parses job resource configurations directly from a property into a {@link JobConfig}.
   * @param jobResourceConfig the property containing the configuration
   * @return A {@link JobConfig} object if the property valid configurations for the job, null
   *         otherwise.
   */
  public static JobConfig getJobCfg(HelixProperty jobResourceConfig) {
    if (jobResourceConfig == null) {
      return null;
    }
    JobConfig.Builder b =
        JobConfig.Builder.fromMap(jobResourceConfig.getRecord().getSimpleFields());
    Map<String, Map<String, String>> rawTaskConfigMap =
        jobResourceConfig.getRecord().getMapFields();
    Map<String, TaskConfig> taskConfigMap = Maps.newHashMap();
    for (Map<String, String> rawTaskConfig : rawTaskConfigMap.values()) {
      TaskConfig taskConfig = TaskConfig.from(rawTaskConfig);
      taskConfigMap.put(taskConfig.getId(), taskConfig);
    }
    b.addTaskConfigMap(taskConfigMap);
    return b.build();
  }

  /**
   * Parses workflow resource configurations in Helix into a {@link WorkflowConfig} object.
   * @param manager Helix manager object used to connect to Helix.
   * @param workflowResource The name of the workflow resource.
   * @return A {@link WorkflowConfig} object if Helix contains valid configurations for the
   *         workflow, null otherwise.
   */
  public static WorkflowConfig getWorkflowCfg(HelixManager manager, String workflowResource) {
    Map<String, String> workflowCfg = getResourceConfigMap(manager, workflowResource);
    return getWorkflowCfg(workflowCfg);
  }

  /**
   * Parses workflow resource configurations in Helix into a {@link WorkflowConfig} object.
   * @param workflowResourceConfig the proeprty containing the configurations
   * @return A {@link WorkflowConfig} object if the property contains valid configurations for the
   *         workflow, null otherwise.
   */
  public static WorkflowConfig getWorkflowCfg(HelixProperty workflowResourceConfig) {
    if (workflowResourceConfig == null) {
      return null;
    }
    return getWorkflowCfg(workflowResourceConfig.getRecord().getSimpleFields());
  }

  /**
   * Parses a key-value map into a {@link WorkflowConfig} object.
   * @param workflowCfg the map of configurations
   * @return A {@link WorkflowConfig} object if the map contains valid configurations for the
   *         workflow, null otherwise.
   */
  private static WorkflowConfig getWorkflowCfg(Map<String, String> workflowCfg) {
    if (workflowCfg == null) {
      return null;
    }
    WorkflowConfig.Builder b = WorkflowConfig.Builder.fromMap(workflowCfg);

    return b.build();
  }

  /**
   * Request a state change for a specific task.
   * @param accessor connected Helix data accessor
   * @param instance the instance serving the task
   * @param sessionId the current session of the instance
   * @param resource the job name
   * @param partition the task partition name
   * @param state the requested state
   * @return true if the request was persisted, false otherwise
   */
  public static boolean setRequestedState(HelixDataAccessor accessor, String instance,
      String sessionId, String resource, String partition, TaskPartitionState state) {
    LOG.debug(String.format("Requesting a state transition to %s for partition %s.", state,
        partition));
    try {
      PropertyKey.Builder keyBuilder = accessor.keyBuilder();
      PropertyKey key = keyBuilder.currentState(instance, sessionId, resource);
      CurrentState currStateDelta = new CurrentState(resource);
      currStateDelta.setRequestedState(PartitionId.from(partition), State.from(state.name()));

      return accessor.updateProperty(key, currStateDelta);
    } catch (Exception e) {
      LOG.error(String.format("Error when requesting a state transition to %s for partition %s.",
          state, partition), e);
      return false;
    }
  }

  /**
   * Get a Helix configuration scope at a resource (i.e. job and workflow) level
   * @param clusterName the cluster containing the resource
   * @param resource the resource name
   * @return instantiated {@link HelixConfigScope}
   */
  public static HelixConfigScope getResourceConfigScope(String clusterName, String resource) {
    return new HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.RESOURCE)
        .forCluster(clusterName).forResource(resource).build();
  }

  /**
   * Get the last task assignment for a given job
   * @param manager a connection to Helix
   * @param resourceName the name of the job
   * @return {@link ResourceAssignment} instance, or null if no assignment is available
   */
  public static ResourceAssignment getPrevResourceAssignment(HelixManager manager,
      String resourceName) {
    ZNRecord r =
        manager.getHelixPropertyStore().get(
            Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE),
            null, AccessOption.PERSISTENT);
    return r != null ? new ResourceAssignment(r) : null;
  }

  /**
   * Set the last task assignment for a given job
   * @param manager a connection to Helix
   * @param resourceName the name of the job
   * @param ra {@link ResourceAssignment} containing the task assignment
   */
  public static void setPrevResourceAssignment(HelixManager manager, String resourceName,
      ResourceAssignment ra) {
    manager.getHelixPropertyStore().set(
        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, PREV_RA_NODE),
        ra.getRecord(), AccessOption.PERSISTENT);
  }

  /**
   * Get the runtime context of a single job
   * @param manager a connection to Helix
   * @param jobResource the name of the job
   * @return the {@link JobContext}, or null if none is available
   */
  public static JobContext getJobContext(HelixManager manager, String jobResource) {
    ZNRecord r =
        manager.getHelixPropertyStore().get(
            Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, CONTEXT_NODE),
            null, AccessOption.PERSISTENT);
    return r != null ? new JobContext(r) : null;
  }

  /**
   * Set the runtime context of a single job
   * @param manager a connection to Helix
   * @param jobResource the name of the job
   * @param ctx the up-to-date {@link JobContext} for the job
   */
  public static void setJobContext(HelixManager manager, String jobResource, JobContext ctx) {
    manager.getHelixPropertyStore().set(
        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, jobResource, CONTEXT_NODE),
        ctx.getRecord(), AccessOption.PERSISTENT);
  }

  /**
   * Get the rumtime context of a single workflow
   * @param manager a connection to Helix
   * @param workflowResource the name of the workflow
   * @return the {@link WorkflowContext}, or null if none is available
   */
  public static WorkflowContext getWorkflowContext(HelixManager manager, String workflowResource) {
    ZNRecord r =
        manager.getHelixPropertyStore().get(
            Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowResource,
                CONTEXT_NODE), null, AccessOption.PERSISTENT);
    return r != null ? new WorkflowContext(r) : null;
  }

  /**
   * Set the rumtime context of a single workflow
   * @param manager a connection to Helix
   * @param workflowResource the name of the workflow
   * @param ctx the up-to-date {@link WorkflowContext} for the workflow
   */
  public static void setWorkflowContext(HelixManager manager, String workflowResource,
      WorkflowContext ctx) {
    manager.getHelixPropertyStore().set(
        Joiner.on("/").join(TaskConstants.REBALANCER_CONTEXT_ROOT, workflowResource, CONTEXT_NODE),
        ctx.getRecord(), AccessOption.PERSISTENT);
  }

  /**
   * Get a workflow-qualified job name for a single-job workflow
   * @param singleJobWorkflow the name of the single-job workflow
   * @return The namespaced job name, which is just singleJobWorkflow_singleJobWorkflow
   */
  public static String getNamespacedJobName(String singleJobWorkflow) {
    return getNamespacedJobName(singleJobWorkflow, singleJobWorkflow);
  }

  /**
   * Get a workflow-qualified job name for a job in that workflow
   * @param workflowResource the name of the workflow
   * @param jobName the un-namespaced name of the job
   * @return The namespaced job name, which is just workflowResource_jobName
   */
  public static String getNamespacedJobName(String workflowResource, String jobName) {
    return workflowResource + "_" + jobName;
  }

  /**
   * Remove the workflow namespace from the job name
   * @param workflowResource the name of the workflow that owns the job
   * @param jobName the namespaced job name
   * @return the denamespaced job name, or the same job name if it is already denamespaced
   */
  public static String getDenamespacedJobName(String workflowResource, String jobName) {
    if (jobName.contains(workflowResource)) {
      // skip the entire length of the work plus the underscore
      return jobName.substring(jobName.indexOf(workflowResource) + workflowResource.length() + 1);
    } else {
      return jobName;
    }
  }

  /**
   * Serialize a map of job-level configurations as a single string
   * @param commandConfig map of job config key to config value
   * @return serialized string
   */
  public static String serializeJobCommandConfigMap(Map<String, String> commandConfig) {
    ObjectMapper mapper = new ObjectMapper();
    try {
      String serializedMap = mapper.writeValueAsString(commandConfig);
      return serializedMap;
    } catch (IOException e) {
      LOG.error("Error serializing " + commandConfig, e);
    }
    return null;
  }

  /**
   * Deserialize a single string into a map of job-level configurations
   * @param commandConfig the serialized job config map
   * @return a map of job config key to config value
   */
  public static Map<String, String> deserializeJobCommandConfigMap(String commandConfig) {
    ObjectMapper mapper = new ObjectMapper();
    try {
      Map<String, String> commandConfigMap =
          mapper.readValue(commandConfig, new TypeReference<HashMap<String, String>>() {
          });
      return commandConfigMap;
    } catch (IOException e) {
      LOG.error("Error deserializing " + commandConfig, e);
    }
    return Collections.emptyMap();
  }

  /**
   * Trigger a controller pipeline execution for a given resource.
   * @param manager Helix connection
   * @param resource the name of the resource changed to triggering the execution
   */
  public static void invokeRebalance(HelixManager manager, String resource) {
    // The pipeline is idempotent, so touching an ideal state is enough to trigger a pipeline run
    HelixDataAccessor accessor = manager.getHelixDataAccessor();
    accessor.updateProperty(accessor.keyBuilder().idealStates(resource), new IdealState(resource));
  }

  /**
   * Get a ScheduleConfig from a workflow config string map
   * @param cfg the string map
   * @return a ScheduleConfig if one exists, otherwise null
   */
  public static ScheduleConfig parseScheduleFromConfigMap(Map<String, String> cfg) {
    // Parse schedule-specific configs, if they exist
    Date startTime = null;
    if (cfg.containsKey(WorkflowConfig.START_TIME)) {
      try {
        startTime = WorkflowConfig.DEFAULT_DATE_FORMAT.parse(cfg.get(WorkflowConfig.START_TIME));
      } catch (ParseException e) {
        LOG.error("Unparseable date " + cfg.get(WorkflowConfig.START_TIME), e);
        return null;
      }
    }
    if (cfg.containsKey(WorkflowConfig.RECURRENCE_UNIT)
        && cfg.containsKey(WorkflowConfig.RECURRENCE_INTERVAL)) {
      return ScheduleConfig.recurringFromDate(startTime,
          TimeUnit.valueOf(cfg.get(WorkflowConfig.RECURRENCE_UNIT)),
          Long.parseLong(cfg.get(WorkflowConfig.RECURRENCE_INTERVAL)));
    } else if (startTime != null) {
      return ScheduleConfig.oneTimeDelayedStart(startTime);
    }
    return null;
  }

  /**
   * Create a new workflow based on an existing one
   * @param manager connection to Helix
   * @param origWorkflowName the name of the existing workflow
   * @param newWorkflowName the name of the new workflow
   * @param newStartTime a provided start time that deviates from the desired start time
   * @return the cloned workflow, or null if there was a problem cloning the existing one
   */
  public static Workflow cloneWorkflow(HelixManager manager, String origWorkflowName,
      String newWorkflowName, Date newStartTime) {
    // Read all resources, including the workflow and jobs of interest
    HelixDataAccessor accessor = manager.getHelixDataAccessor();
    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
    Map<String, HelixProperty> resourceConfigMap =
        accessor.getChildValuesMap(keyBuilder.resourceConfigs());
    if (!resourceConfigMap.containsKey(origWorkflowName)) {
      LOG.error("No such workflow named " + origWorkflowName);
      return null;
    }
    if (resourceConfigMap.containsKey(newWorkflowName)) {
      LOG.error("Workflow with name " + newWorkflowName + " already exists!");
      return null;
    }

    // Create a new workflow with a new name
    HelixProperty workflowConfig = resourceConfigMap.get(origWorkflowName);
    Map<String, String> wfSimpleFields = workflowConfig.getRecord().getSimpleFields();
    JobDag jobDag = JobDag.fromJson(wfSimpleFields.get(WorkflowConfig.DAG));
    Map<String, Set<String>> parentsToChildren = jobDag.getParentsToChildren();
    Workflow.Builder builder = new Workflow.Builder(newWorkflowName);

    // Set the workflow expiry
    builder.setExpiry(Long.parseLong(wfSimpleFields.get(WorkflowConfig.EXPIRY)));

    // Set the schedule, if applicable
    ScheduleConfig scheduleConfig;
    if (newStartTime != null) {
      scheduleConfig = ScheduleConfig.oneTimeDelayedStart(newStartTime);
    } else {
      scheduleConfig = parseScheduleFromConfigMap(wfSimpleFields);
    }
    if (scheduleConfig != null) {
      builder.setScheduleConfig(scheduleConfig);
    }

    // Add each job back as long as the original exists
    Set<String> namespacedJobs = jobDag.getAllNodes();
    for (String namespacedJob : namespacedJobs) {
      if (resourceConfigMap.containsKey(namespacedJob)) {
        // Copy over job-level and task-level configs
        String job = getDenamespacedJobName(origWorkflowName, namespacedJob);
        HelixProperty jobConfig = resourceConfigMap.get(namespacedJob);
        Map<String, String> jobSimpleFields = jobConfig.getRecord().getSimpleFields();
        jobSimpleFields.put(JobConfig.WORKFLOW_ID, newWorkflowName); // overwrite workflow name
        for (Map.Entry<String, String> e : jobSimpleFields.entrySet()) {
          builder.addConfig(job, e.getKey(), e.getValue());
        }
        Map<String, Map<String, String>> rawTaskConfigMap = jobConfig.getRecord().getMapFields();
        List<TaskConfig> taskConfigs = Lists.newLinkedList();
        for (Map<String, String> rawTaskConfig : rawTaskConfigMap.values()) {
          TaskConfig taskConfig = TaskConfig.from(rawTaskConfig);
          taskConfigs.add(taskConfig);
        }
        builder.addTaskConfigs(job, taskConfigs);

        // Add dag dependencies
        Set<String> children = parentsToChildren.get(namespacedJob);
        if (children != null) {
          for (String namespacedChild : children) {
            String child = getDenamespacedJobName(origWorkflowName, namespacedChild);
            builder.addParentChildDependency(job, child);
          }
        }
      }
    }
    return builder.build();
  }

  private static Map<String, String> getResourceConfigMap(HelixManager manager, String resource) {
    HelixConfigScope scope = getResourceConfigScope(manager.getClusterName(), resource);
    ConfigAccessor configAccessor = manager.getConfigAccessor();

    Map<String, String> taskCfg = new HashMap<String, String>();
    List<String> cfgKeys = configAccessor.getKeys(scope);
    if (cfgKeys == null || cfgKeys.isEmpty()) {
      return null;
    }

    for (String cfgKey : cfgKeys) {
      taskCfg.put(cfgKey, configAccessor.get(scope, cfgKey));
    }

    return getResourceConfig(manager, resource).getRecord().getSimpleFields();
  }

  private static HelixProperty getResourceConfig(HelixManager manager, String resource) {
    HelixDataAccessor accessor = manager.getHelixDataAccessor();
    PropertyKey.Builder keyBuilder = accessor.keyBuilder();
    return accessor.getProperty(keyBuilder.resourceConfig(resource));
  }
}
TOP

Related Classes of org.apache.helix.task.TaskUtil

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.