Package org.apache.hama.bsp

Source Code of org.apache.hama.bsp.JobInProgress$KillInterruptedException

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hama.bsp;

import java.io.DataInputStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hama.Constants;
import org.apache.hama.bsp.ft.AsyncRcvdMsgCheckpointImpl;
import org.apache.hama.bsp.ft.BSPFaultTolerantService;
import org.apache.hama.bsp.ft.FaultTolerantMasterService;
import org.apache.hama.bsp.sync.MasterSyncClient;
import org.apache.hama.bsp.taskallocation.BSPResource;
import org.apache.hama.bsp.taskallocation.BestEffortDataLocalTaskAllocator;
import org.apache.hama.bsp.taskallocation.TaskAllocationStrategy;
import org.apache.hama.util.ReflectionUtils;

/**
* JobInProgress maintains all the info for keeping a Job on the straight and
* narrow. It keeps its JobProfile and its latest JobStatus, plus a set of
* tables for doing bookkeeping of its Tasks.ss
*/
public class JobInProgress {

  /**
   * Used when the a kill is issued to a job which is initializing.
   */
  static class KillInterruptedException extends InterruptedException {
    private static final long serialVersionUID = 1L;

    public KillInterruptedException(String msg) {
      super(msg);
    }
  }

  public static enum JobCounter {
    LAUNCHED_TASKS, SUPERSTEPS
  }

  static final Log LOG = LogFactory.getLog(JobInProgress.class);
  boolean tasksInited = false;

  Configuration conf;
  JobProfile profile;
  JobStatus status;
  Path jobFile = null;
  Path localJobFile = null;
  Path localJarFile = null;

  private LocalFileSystem localFs;
  // Indicates how many times the job got restarted
  private int restartCount;

  long startTime;
  long launchTime;
  long finishTime;

  int maxTaskAttempts;

  private String jobName;

  // private LocalFileSystem localFs;
  private BSPJobID jobId;
  final BSPMaster master;
  TaskInProgress tasks[] = new TaskInProgress[0];
  private long superstepCounter;

  private final Counters counters = new Counters();

  int numBSPTasks = 0;
  int clusterSize;
  String jobSplit;

  Map<Task, GroomServerStatus> taskToGroomMap;

  // Used only for scheduling!
  Map<GroomServerStatus, Integer> taskCountInGroomMap;

  // If the task does not exist as key, it implies that the task did not fail
  // before.
  // Value in the map implies the attempt ID for which the key(task) was
  // re-attempted before.
  Map<Task, Integer> taskReattemptMap;

  Set<TaskInProgress> recoveryTasks;

  // This set keeps track of the tasks that have failed.
  Set<Task> failedTasksTillNow;

  private int taskCompletionEventTracker = 0;

  private TaskAllocationStrategy taskAllocationStrategy;

  private FaultTolerantMasterService faultToleranceService;
 
  /**
   * Used only for unit tests.
   * @param jobId
   * @param conf
   */
  public JobInProgress(BSPJobID jobId, Configuration conf){
    this.conf = conf;
    this.jobId = jobId;
    master = null;
  }

  public JobInProgress(BSPJobID jobId, Path jobFile, BSPMaster master,
      Configuration conf) throws IOException {
    this.conf = conf;
    this.jobId = jobId;
    this.localFs = FileSystem.getLocal(conf);
    this.jobFile = jobFile;
    this.master = master;

    this.status = new JobStatus(jobId, null, 0L, 0L,
        JobStatus.State.PREP.value(), counters);
    this.startTime = System.currentTimeMillis();
    this.superstepCounter = 0;
    this.restartCount = 0;

    this.localJobFile = master.getLocalPath(BSPMaster.SUBDIR + "/" + jobId
        + ".xml");
    this.localJarFile = master.getLocalPath(BSPMaster.SUBDIR + "/" + jobId
        + ".jar");

    Path jobDir = master.getSystemDirectoryForJob(jobId);
    FileSystem fs = jobDir.getFileSystem(conf);
    fs.copyToLocalFile(jobFile, localJobFile);
    BSPJob job = new BSPJob(jobId, localJobFile.toString());
    this.jobSplit = job.getConfiguration().get("bsp.job.split.file");

    this.numBSPTasks = job.getNumBspTask();
    this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>(
        numBSPTasks + 10);

    this.maxTaskAttempts = job.getConfiguration().getInt(Constants.MAX_TASK_ATTEMPTS,
        Constants.DEFAULT_MAX_TASK_ATTEMPTS);

    this.profile = new JobProfile(job.getUser(), jobId, jobFile.toString(),
        job.getJobName());

    this.setJobName(job.getJobName());

    status.setUsername(job.getUser());
    status.setStartTime(startTime);

    String jarFile = job.getJar();
    if (jarFile != null) {
      fs.copyToLocalFile(new Path(jarFile), localJarFile);
    }

    failedTasksTillNow = new HashSet<Task>(2 * tasks.length);

  }

  public JobProfile getProfile() {
    return profile;
  }

  public JobStatus getStatus() {
    return status;
  }

  public synchronized long getLaunchTime() {
    return launchTime;
  }

  public long getStartTime() {
    return startTime;
  }

  public long getFinishTime() {
    return finishTime;
  }

  public int getNumOfTasks() {
    return tasks.length;
  }

  /**
   * @return the number of desired tasks.
   */
  public int desiredBSPTasks() {
    return numBSPTasks;
  }

  /**
   * @return The JobID of this JobInProgress.
   */
  public BSPJobID getJobID() {
    return jobId;
  }

  public synchronized TaskInProgress findTaskInProgress(TaskID id) {
    if (areTasksInited()) {
      for (TaskInProgress tip : tasks) {
        if (tip.getTaskId().equals(id)) {
          return tip;
        }
      }
    }
    return null;
  }

  public synchronized boolean areTasksInited() {
    return this.tasksInited;
  }

  @Override
  public String toString() {
    return "jobName:" + profile.getJobName() + "\n" + "submit user:"
        + profile.getUser() + "\n" + "JobId:" + jobId + "\n" + "JobFile:"
        + jobFile + "\n";
  }

  // ///////////////////////////////////////////////////
  // Create/manage tasks
  // ///////////////////////////////////////////////////

  public synchronized void initTasks() throws IOException {
    if (tasksInited) {
      return;
    }

    Path sysDir = new Path(this.master.getSystemDir());
    FileSystem fs = sysDir.getFileSystem(conf);
    if (jobSplit != null) {
      DataInputStream splitFile = fs.open(new Path(this.jobSplit));

      BSPJobClient.RawSplit[] splits;
      try {
        splits = BSPJobClient.readSplitFile(splitFile);
      } finally {
        splitFile.close();
      }
      numBSPTasks = splits.length;
      LOG.info("num BSPTasks: " + numBSPTasks);

      // adjust number of BSP tasks to actual number of splits
      this.tasks = new TaskInProgress[numBSPTasks];
      for (int i = 0; i < numBSPTasks; i++) {
        tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(),
            splits[i], this.conf, this, i);
      }
    } else {
      this.tasks = new TaskInProgress[numBSPTasks];
      for (int i = 0; i < numBSPTasks; i++) {
        tasks[i] = new TaskInProgress(getJobID(), this.jobFile.toString(),
            null, this.conf, this, i);
      }
    }
    this.taskToGroomMap = new HashMap<Task, GroomServerStatus>(2 * tasks.length);

    this.taskCountInGroomMap = new HashMap<GroomServerStatus, Integer>();

    this.recoveryTasks = new HashSet<TaskInProgress>(2 * tasks.length);

    // Update job status
    this.status = new JobStatus(this.status.getJobID(), this.profile.getUser(),
        0L, 0L, JobStatus.RUNNING, counters);

    // delete all nodes belonging to that job before start
    MasterSyncClient syncClient = master.getSyncClient();
    syncClient.registerJob(this.getJobID().toString());

    tasksInited = true;

    Class<?> taskAllocatorClass = conf.getClass(Constants.TASK_ALLOCATOR_CLASS,
        BestEffortDataLocalTaskAllocator.class, TaskAllocationStrategy.class);
    this.taskAllocationStrategy = (TaskAllocationStrategy) ReflectionUtils
        .newInstance(taskAllocatorClass, new Object[0]);

    if (conf.getBoolean(Constants.FAULT_TOLERANCE_FLAG, false)) {

      Class<?> ftClass = conf.getClass(Constants.FAULT_TOLERANCE_CLASS,
          AsyncRcvdMsgCheckpointImpl.class ,
          BSPFaultTolerantService.class);
      if (ftClass != null) {
        try {
          faultToleranceService = ((BSPFaultTolerantService<?>) ReflectionUtils
              .newInstance(ftClass, new Object[0]))
              .constructMasterFaultTolerance(jobId, maxTaskAttempts, tasks,
                  conf, master.getSyncClient(), taskAllocationStrategy);
          LOG.info("Initialized fault tolerance service with "
              + ftClass.getCanonicalName());
        } catch (Exception e) {
          throw new IOException(e);
        }
      }
    }

    LOG.info("Job is initialized.");
  }

  public Iterator<GroomServerStatus> getGroomsForTask() {
    return null;
  }

  public GroomServerStatus getGroomStatusForTask(Task t) {
    return this.taskToGroomMap.get(t);
  }

  public synchronized Task obtainNewTask(
      Map<String, GroomServerStatus> groomStatuses) {
    this.clusterSize = groomStatuses.size();

    if (this.status.getRunState() != JobStatus.RUNNING) {
      LOG.info("Cannot create task split for " + profile.getJobID());
      return null;
    }

    Task result = null;
    BSPResource[] resources = new BSPResource[0];

    for (int i = 0; i < tasks.length; i++) {
      if (!tasks[i].isRunning() && !tasks[i].isComplete()) {

        String[] selectedGrooms = taskAllocationStrategy.selectGrooms(
            groomStatuses, taskCountInGroomMap, resources, tasks[i]);
        GroomServerStatus groomStatus = taskAllocationStrategy
            .getGroomToAllocate(groomStatuses, selectedGrooms,
                taskCountInGroomMap, resources, tasks[i]);
        if (groomStatus != null){
          result = tasks[i].constructTask(groomStatus);
        }
        else if (LOG.isDebugEnabled()){
          LOG.debug("Could not find a groom to schedule task");
        }
        if (result != null) {
          updateGroomTaskDetails(tasks[i].getGroomServerStatus(), result);
        }
        break;
      }
    }

    counters.incrCounter(JobCounter.LAUNCHED_TASKS, 1L);
    return result;
  }

  public void recoverTasks(Map<String, GroomServerStatus> groomStatuses,
      Map<GroomServerStatus, List<GroomServerAction>> actionMap)
      throws IOException {

    if (this.faultToleranceService == null)
      return;

    try {
      this.faultToleranceService.recoverTasks(this, groomStatuses,
          fetchAndClearTasksToRecover(), tasks, taskCountInGroomMap, actionMap);
    } catch (IOException e) {
      throw e;
    }
  }

  private void updateGroomTaskDetails(GroomServerStatus groomStatus, Task task) {
    taskToGroomMap.put(task, groomStatus);
    int tasksInGroom = 0;

    if (taskCountInGroomMap.containsKey(groomStatus)) {
      tasksInGroom = taskCountInGroomMap.get(groomStatus);
    }
    taskCountInGroomMap.put(groomStatus, tasksInGroom + 1);
  }

  /**
   * Hosts that tasks run on.
   *
   * @return groom host name that tasks of a job run on.
   */
  public synchronized String[] tasksOnGroomServers() {
    final String[] list = new String[tasks.length];
    for (int i = 0; i < tasks.length; i++) {
      list[i] = tasks[i].getGroomServerStatus().getGroomHostName();
    }
    return list;
  }

  /**
   * Mark the completed task status. If all the tasks are completed the status
   * of the job is updated to notify the client on the completion of the whole
   * job.
   *
   * @param tip <code>TaskInProgress</code> object representing task.
   * @param status The completed task status
   */
  public synchronized void completedTask(TaskInProgress tip, TaskStatus status) {
    TaskAttemptID taskid = status.getTaskId();
    updateTaskStatus(tip, status);
    LOG.debug("Taskid '" + taskid + "' has finished successfully.");
    tip.completed(taskid);

    //
    // If all tasks are complete, then the job is done!
    //

    boolean allDone = true;
    for (TaskInProgress taskInProgress : tasks) {
      if (!taskInProgress.isComplete()) {
        allDone = false;
        break;
      }
    }

    if (allDone) {
      this.status = new JobStatus(this.status.getJobID(),
          this.profile.getUser(), superstepCounter, superstepCounter,
          superstepCounter, JobStatus.SUCCEEDED, superstepCounter, counters);
      this.finishTime = System.currentTimeMillis();
      this.status.setFinishTime(this.finishTime);

      LOG.info("Job successfully done.");

      // delete job root
      master.getSyncClient().deregisterJob(this.getJobID().toString());

      garbageCollect();
    }
  }

  /**
   * Mark failure of a task.
   *
   * @param tip <code>TaskInProgress</code> object representing task.
   * @param status The failed task status
   */
  public void failedTask(TaskInProgress tip, TaskStatus status) {
    TaskAttemptID taskid = status.getTaskId();
    updateTaskStatus(tip, status);
    LOG.info("Taskid '" + taskid + "' has failed.");
    tip.terminated(taskid);
    tip.kill();

    boolean allDone = true;
    for (TaskInProgress taskInProgress : tasks) {
      if (taskInProgress.isFailed()) {
        allDone = false;
        break;
      }
    }

    if (!allDone) {
      // Kill job
      this.kill();
      // Send KillTaskAction to GroomServer
      this.status = new JobStatus(this.status.getJobID(),
          this.profile.getUser(), 0L, 0L, 0L, JobStatus.KILLED,
          superstepCounter, counters);
      this.finishTime = System.currentTimeMillis();
      this.status.setFinishTime(this.finishTime);

      LOG.info("Job failed.");

      garbageCollect();
    }
  }

  /**
   * Updates the task status of the task.
   *
   * @param tip <code>TaskInProgress</code> representing task
   * @param taskStatus The status of the task.
   */
  public synchronized void updateTaskStatus(TaskInProgress tip,
      TaskStatus taskStatus) {
    TaskAttemptID taskid = taskStatus.getTaskId();
    boolean change = tip.updateStatus(taskStatus); // update tip

    if (change) {
      TaskStatus.State state = taskStatus.getRunState();
      TaskCompletionEvent taskEvent = null;
      String httpTaskLogLocation = "http://"
          + tip.getGroomServerStatus().getGroomHostName()
          + ":"
          + conf.getInt("bsp.http.groomserver.port",
              Constants.DEFAULT_GROOM_INFO_SERVER);

      if (state == TaskStatus.State.FAILED || state == TaskStatus.State.KILLED) {
        int eventNumber;
        if ((eventNumber = tip.getSuccessEventNumber()) != -1) {
          TaskCompletionEvent t = this.taskCompletionEvents.get(eventNumber);
          if (t.getTaskAttemptId().equals(taskid))
            t.setTaskStatus(TaskCompletionEvent.Status.OBSOLETE);
        }

        // Did the task failure lead to tip failure?
        TaskCompletionEvent.Status taskCompletionStatus = (state == TaskStatus.State.FAILED) ? TaskCompletionEvent.Status.FAILED
            : TaskCompletionEvent.Status.KILLED;
        if (tip.isFailed()) {
          taskCompletionStatus = TaskCompletionEvent.Status.TIPFAILED;
        }
        taskEvent = new TaskCompletionEvent(taskCompletionEventTracker, taskid,
            tip.idWithinJob(), taskCompletionStatus, httpTaskLogLocation);

        if (taskEvent != null) {
          this.taskCompletionEvents.add(taskEvent);
          taskCompletionEventTracker++;
        }
      }
    }

    if (superstepCounter < taskStatus.getSuperstepCount()) {
      superstepCounter = taskStatus.getSuperstepCount();
    }
  }

  /**
   * Kill the job.
   */
  public synchronized void kill() {
    if (status.getRunState() != JobStatus.KILLED) {
      this.status = new JobStatus(status.getJobID(), this.profile.getUser(),
          0L, 0L, 0L, JobStatus.KILLED, counters);
      this.finishTime = System.currentTimeMillis();
      this.status.setFinishTime(this.finishTime);
      //
      // kill all TIPs.
      //
      for (int i = 0; i < tasks.length; i++) {
        tasks[i].kill();
      }

      garbageCollect();
    }

  }

  /**
   * The job is dead. We're now GC'ing it, getting rid of the job from all
   * tables. Be sure to remove all of this job's tasks from the various tables.
   */
  synchronized void garbageCollect() {
    try {
     
      if(LOG.isDebugEnabled()){
        LOG.debug("Removing " + localJobFile + " and " + localJarFile
            + " getJobFile = " + profile.getJobFile());
      }
     
      // Definitely remove the local-disk copy of the job file
      if (localJobFile != null) {
        localFs.delete(localJobFile, true);
        localJobFile = null;
      }
      if (localJarFile != null) {
        localFs.delete(localJarFile, true);
        localJarFile = null;
      }

      // JobClient always creates a new directory with job files
      // so we remove that directory to cleanup
      FileSystem fs = FileSystem.get(conf);
      fs.delete(new Path(profile.getJobFile()).getParent(), true);

    } catch (IOException e) {
      LOG.info("Error cleaning up " + profile.getJobID() + ": " + e);
    }
  }

  /**
   * Get the number of times the job has restarted
   */
  int getNumRestarts() {
    return restartCount;
  }

  /**
   * @param jobName the jobName to set
   */
  public void setJobName(String jobName) {
    this.jobName = jobName;
  }

  /**
   * @return the jobName
   */
  public String getJobName() {
    return jobName;
  }

  public Counters getCounters() {
    return counters;
  }

  List<TaskCompletionEvent> taskCompletionEvents;

  synchronized int getNumTaskCompletionEvents() {
    return taskCompletionEvents.size();
  }

  public TaskCompletionEvent[] getTaskCompletionEvents(int fromEventId,
      int maxEvents) {
    TaskCompletionEvent[] events = TaskCompletionEvent.EMPTY_ARRAY;
    if (taskCompletionEvents.size() > fromEventId) {
      int actualMax = Math.min(maxEvents,
          (taskCompletionEvents.size() - fromEventId));
      events = taskCompletionEvents.subList(fromEventId,
          actualMax + fromEventId).toArray(events);
    }
    return events;
  }

  /**
   * Returns the configured maximum number of times the task could be
   * re-attempted.
   */
  int getMaximumReAttempts() {
    return maxTaskAttempts;
  }

  /**
   * Returns true if the task should be restarted on failure. It also causes
   * JobInProgress object to maintain state of the restart request.
   */
  synchronized boolean handleFailure(TaskInProgress tip) {
    if (this.faultToleranceService == null
        || (!faultToleranceService.isRecoveryPossible(tip)))
      return false;

    if (!faultToleranceService.isAlreadyRecovered(tip)) {
      if(LOG.isDebugEnabled()){
        LOG.debug("Adding recovery task " + tip.getCurrentTaskAttemptId());
      }
      recoveryTasks.add(tip);
      status.setRunState(JobStatus.RECOVERING);
      return true;
    }
    else if(LOG.isDebugEnabled()){
      LOG.debug("Avoiding recovery task " + tip.getCurrentTaskAttemptId());
    }
    return false;
   
  }
 
 
  /**
   *
   * @return Returns the list of tasks in progress that has to be recovered.
   */
  synchronized TaskInProgress[] fetchAndClearTasksToRecover() {
    TaskInProgress[] failedTasksInProgress = new TaskInProgress[recoveryTasks
        .size()];
    recoveryTasks.toArray(failedTasksInProgress);

    recoveryTasks.clear();
    return failedTasksInProgress;
  }

  public boolean isRecoveryPending() {
    return recoveryTasks.size() != 0;
  }

  public Set<Task> getTaskSet() {
    return taskToGroomMap.keySet();
  }

  public FaultTolerantMasterService getFaultToleranceService() {
    return this.faultToleranceService;
  }

}
TOP

Related Classes of org.apache.hama.bsp.JobInProgress$KillInterruptedException

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.