Package org.apache.hadoop.mapred

Source Code of org.apache.hadoop.mapred.TestSetupTaskScheduling$FakeJobWithSetupTask

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobInProgress;
import org.apache.hadoop.mapred.FakeObjectUtilities.FakeTaskInProgress;
import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobTracker;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.split.JobSplit;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.TaskType;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import junit.framework.TestCase;

public class TestSetupTaskScheduling extends TestCase {

  public static final Log LOG =
    LogFactory.getLog(TestSetupTaskScheduling.class);

  static String[] trackers = new String[] { "tracker_tracker1:1000",
    "tracker_tracker2:1000", "tracker_tracker3:1000" };
  private static FakeJobTracker jobTracker;

  /**
   * Fake JobInProgress that can return a hardcoded setup or
   * cleanup task depending on the slot type passed in.
   */
  static class FakeJobWithSetupTask
    extends FakeObjectUtilities.FakeJobInProgress {
   
    FakeJobWithSetupTask(JobConf jobConf,
                          JobTracker tracker) throws IOException {
      super(jobConf, tracker);
    }

    /**
     * Initialize tasks, including setup.
     */
    @Override
    public synchronized void initTasks() throws IOException {
      super.initTasks();
      JobSplit.TaskSplitMetaInfo emptySplit = new JobSplit.TaskSplitMetaInfo();
      setup = new TaskInProgress[2];
      setup[0] = new TaskInProgress(getJobID(), "test",  emptySplit,
          jobtracker, getJobConf(), this, numMapTasks + 1, 1);
      setup[1] = new TaskInProgress(getJobID(), "test", numMapTasks,
          numReduceTasks + 1, jobtracker, getJobConf(), this, 1);
    }
   
    /**
     * Obtain a setup task on a map slot or reduce slot
     * depending on what is free.
     *
     * Every call to this will return either a map or reduce
     * setup task. No check is done to see if the task is already
     * returned
     */
    @Override
    public Task obtainJobSetupTask(TaskTrackerStatus tts,
                                             int clusterSize,
                                             int numUniqueHosts,
                                             boolean isMapSlot)
                                  throws IOException{
      TaskInProgress tip = null;
      if (isMapSlot) {
        tip = setup[0];
      } else {
        tip = setup[1];
      }
      Task t = tip.getTaskToRun(tts.getHost());
      t.setJobSetupTask();
      return t;
    }
  }

  static class FakeJobWithTaskCleanupTask
  extends FakeObjectUtilities.FakeJobInProgress {

    FakeJobWithTaskCleanupTask(JobConf jobConf,
        JobTracker tracker) throws IOException {
      super(jobConf, tracker);
    }

    /**
     * Initialize tasks(1 map and 1 reduce task each needs 2 slots, similar to
     * tasks of a high RAM job).
     */
    @Override
    public synchronized void initTasks() throws IOException {
      super.initTasks();

      final int numSlotsPerTask = 2;
      maps = new TaskInProgress[1];
      reduces = new TaskInProgress[1];
     
      maps[0] = new FakeTaskInProgress(getJobID(), "test"
          JobSplit.EMPTY_TASK_SPLIT,
          jobtracker, getJobConf(), this, 0, numSlotsPerTask);
      TaskAttemptID attemptId = new TaskAttemptID(maps[0].getTIPId(), 0);
     
      // make this task a taskCleanup task of a map task
      mapCleanupTasks.add(attemptId);
      TaskStatus stat = new MapTaskStatus(attemptId, 0.01f, 2,
          TaskStatus.State.FAILED_UNCLEAN, "", "", trackers[0],
          TaskStatus.Phase.MAP, new Counters());
      maps[0].updateStatus(stat);
     
      //similarly for reduce task's taskCleanup task
      reduces[0] = new FakeTaskInProgress(getJobID(), "test", 1,
          0, jobtracker, getJobConf(), this, numSlotsPerTask);
      attemptId = new TaskAttemptID(reduces[0].getTIPId(), 0);
     
      // make this task a taskCleanup task of a reduce task
      reduceCleanupTasks.add(attemptId);
      stat = new ReduceTaskStatus(attemptId, 0.01f, 2,
          TaskStatus.State.FAILED_UNCLEAN, "", "", trackers[0],
          TaskStatus.Phase.REDUCE, new Counters());
      reduces[0].updateStatus(stat);
    }
  }

  public void setUp() throws Exception {
    JobConf conf = new JobConf();
    conf.set(JTConfig.JT_IPC_ADDRESS, "localhost:0");
    conf.set(JTConfig.JT_HTTP_ADDRESS, "0.0.0.0:0");
    jobTracker = new FakeJobTracker(conf, new Clock(), trackers);
    for (String tracker : trackers) {
      FakeObjectUtilities.establishFirstContact(jobTracker, tracker);
    }
  }

  // create a job for testing setup tasks and reservations
  FakeJobInProgress createJob(TaskType taskType) throws IOException {
    JobConf conf = new JobConf();
    conf.setSpeculativeExecution(false);
    conf.setNumMapTasks(2);
    conf.setNumReduceTasks(2);
    conf.set(JobContext.REDUCE_FAILURES_MAXPERCENT, ".70");
    conf.set(JobContext.MAP_FAILURES_MAX_PERCENT, ".70");
    FakeJobInProgress job = null;
    if (taskType == null) {
      conf.setBoolean(JobContext.SETUP_CLEANUP_NEEDED, false);
      job = new FakeJobInProgress(conf, jobTracker);
    } else if (taskType == TaskType.JOB_SETUP) {
      job = new FakeJobWithSetupTask(conf, jobTracker);
    } else if (taskType == TaskType.TASK_CLEANUP) {
      job = new FakeJobWithTaskCleanupTask(conf, jobTracker);
    }
    job.setClusterSize(trackers.length);
    job.initTasks();
    return job;
  }
 
  // create a new TaskStatus and add to a list of status objects.
  // useMapSlot param is needed only when taskType is TASK_CLEANUP.
  void addNewTaskStatus(FakeJobInProgress job, TaskType taskType,
        boolean useMapSlot, String tracker, List<TaskStatus> reports)
        throws IOException {
    TaskAttemptID task = null;
    TaskStatus status = null;
    if (taskType == TaskType.MAP) {
      task = job.findMapTask(tracker);
      status = new MapTaskStatus(task, 0.01f, 2,
            TaskStatus.State.RUNNING, "", "", tracker,
            TaskStatus.Phase.MAP, new Counters());
    } else if (taskType == TaskType.TASK_CLEANUP) {
      if (useMapSlot) {
        status = job.maps[0].taskStatuses.get(
          new TaskAttemptID(job.maps[0].getTIPId(), 0));
      } else {
        status = job.reduces[0].taskStatuses.get(
              new TaskAttemptID(job.reduces[0].getTIPId(), 0));
      }
    } else {
      task = job.findReduceTask(tracker);
      status = new ReduceTaskStatus(task, 0.01f, 2,
            TaskStatus.State.RUNNING, "", "", tracker,
            TaskStatus.Phase.REDUCE, new Counters());
    }
    reports.add(status);
  }
 
  // create a TaskTrackerStatus
  TaskTrackerStatus createTaskTrackerStatus(String tracker,
      List<TaskStatus> reports) {
    TaskTrackerStatus ttStatus =
      new TaskTrackerStatus(tracker,
          JobInProgress.convertTrackerNameToHostName(tracker),
          0, reports, 0, 2, 2);
    return ttStatus;
  }

  /**
   * Test that a setup task can be run against a map slot
   * if it is free.
   * @throws IOException
   */
  public void testSetupTaskReturnedForFreeMapSlots() throws IOException {
    // create a job with a setup task.
    FakeJobInProgress job = createJob(TaskType.JOB_SETUP);
    jobTracker.jobs.put(job.getJobID(), job);
   
    // create a status simulating a free tasktracker
    List<TaskStatus> reports = new ArrayList<TaskStatus>();
    TaskTrackerStatus ttStatus
      = createTaskTrackerStatus(trackers[2], reports);
   
    // verify that a setup task can be assigned to a map slot.
    List<Task> tasks = jobTracker.getSetupAndCleanupTasks(ttStatus);
    assertEquals(1, tasks.size());
    assertTrue(tasks.get(0).isJobSetupTask());
    assertTrue(tasks.get(0).isMapTask());
    jobTracker.jobs.clear();
  }

  /**
   * Test to check that map slots are counted when returning
   * a setup task.
   * @throws IOException
   */
  public void testMapSlotsCountedForSetup() throws IOException {
    // create a job with a setup task.
    FakeJobInProgress job = createJob(TaskType.JOB_SETUP);
    jobTracker.jobs.put(job.getJobID(), job);
   
    // create another job for reservation
    FakeJobInProgress job1 = createJob(null);
    jobTracker.jobs.put(job1.getJobID(), job1);
  
    // create TT status for testing getSetupAndCleanupTasks
    List<TaskStatus> taskStatuses = new ArrayList<TaskStatus>();
    addNewTaskStatus(job, TaskType.MAP, true, trackers[0], taskStatuses);
    TaskTrackerStatus ttStatus
      = createTaskTrackerStatus(trackers[0], taskStatuses);
   
    // test that there should be no map setup task returned.
    List<Task> tasks = jobTracker.getSetupAndCleanupTasks(ttStatus);
    assertEquals(1, tasks.size());
    assertTrue(tasks.get(0).isJobSetupTask());
    assertFalse(tasks.get(0).isMapTask());
    jobTracker.jobs.clear();
  }

  /**
   * Test to check that reduce slots are also counted when returning
   * a setup task.
   * @throws IOException
   */
  public void testReduceSlotsCountedForSetup() throws IOException {
    // create a job with a setup task.
    FakeJobInProgress job = createJob(TaskType.JOB_SETUP);
    jobTracker.jobs.put(job.getJobID(), job);
   
    // create another job for reservation
    FakeJobInProgress job1 = createJob(null);
    jobTracker.jobs.put(job1.getJobID(), job1);
   
    // create TT status for testing getSetupAndCleanupTasks
    List<TaskStatus> reports = new ArrayList<TaskStatus>();
    // because free map slots are checked first in code,
    // we fill up map slots also.
    addNewTaskStatus(job1, TaskType.MAP, true, trackers[1], reports);
    addNewTaskStatus(job1, TaskType.REDUCE, false,trackers[1], reports);
    TaskTrackerStatus ttStatus
      = createTaskTrackerStatus(trackers[1], reports);

    // test that there should be no setup task returned,
    // as both map and reduce slots are occupied.
    List<Task> tasks = jobTracker.getSetupAndCleanupTasks(ttStatus);
    assertNull(tasks);
    jobTracker.jobs.clear();
  }

  void validateNumSlotsUsedForTaskCleanup(TaskTrackerStatus ttStatus)
       throws IOException {
    List<Task> tasks = jobTracker.getSetupAndCleanupTasks(ttStatus);

    assertEquals("Actual number of taskCleanup tasks is not same as expected", 1, tasks.size());
    LOG.info("taskCleanup task is " + tasks.get(0));
    assertTrue(tasks.get(0).isTaskCleanupTask());

    // slots needed for taskCleanup task should be 1(even for high RAM jobs)
    assertEquals("TaskCleanup task should not need more than 1 slot.",
                 1, tasks.get(0).getNumSlotsRequired());
  }
 
  /**
   * Test to check that map slots are counted when returning
   * a taskCleanup task.
   * @throws IOException
   */
  public void testNumSlotsUsedForTaskCleanup() throws IOException {
    // Create a high RAM job with a map task's cleanup task and a reduce task's
    // cleanup task. Make this Fake job a high RAM job by setting the slots
    // required for map/reduce task to 2.
    FakeJobInProgress job = createJob(TaskType.TASK_CLEANUP);
    jobTracker.jobs.put(job.getJobID(), job);
  
    // create TT status for testing getSetupAndCleanupTasks
    List<TaskStatus> taskStatuses = new ArrayList<TaskStatus>();
    TaskTrackerStatus ttStatus =
      createTaskTrackerStatus(trackers[0], taskStatuses);//create dummy status
   
    // validate mapTaskCleanup task
    validateNumSlotsUsedForTaskCleanup(ttStatus);
   
    // validate reduceTaskCleanup task
    validateNumSlotsUsedForTaskCleanup(ttStatus);
   
    jobTracker.jobs.clear();
  }
}
TOP

Related Classes of org.apache.hadoop.mapred.TestSetupTaskScheduling$FakeJobWithSetupTask

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.