Package org.apache.hadoop.mapred

Source Code of org.apache.hadoop.mapred.TestParallelInitialization

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import junit.framework.TestCase;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapred.FakeObjectUtilities.FakeJobHistory;
import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
import org.apache.hadoop.mapreduce.Cluster.JobTrackerStatus;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;

public class TestParallelInitialization extends TestCase {
 
  private static int jobCounter;
  private static final int NUM_JOBS = 3;
  IntWritable numJobsCompleted = new IntWritable();
 
  static void resetCounters() {
    jobCounter = 0;
  }
 
  class FakeJobInProgress extends JobInProgress {
  
    public FakeJobInProgress(JobConf jobConf,
        FakeTaskTrackerManager taskTrackerManager,
        JobTracker jt) throws IOException {
      super(new JobID("test", ++jobCounter), jobConf, jt);
      this.startTime = System.currentTimeMillis();
      this.status = new JobStatus(getJobID(), 0f, 0f, JobStatus.PREP,
          jobConf.getUser(),
          jobConf.getJobName(), "", "");
      this.status.setJobPriority(JobPriority.NORMAL);
      this.status.setStartTime(startTime);
      this.jobHistory = new FakeJobHistory();
    }

    @Override
    public synchronized void initTasks() throws IOException {
      try {
        int jobNumber = this.getJobID().getId();
        synchronized (numJobsCompleted) {
          while (numJobsCompleted.get() != (NUM_JOBS - jobNumber)) {
            numJobsCompleted.wait();
          }
          numJobsCompleted.set(numJobsCompleted.get() + 1);
          numJobsCompleted.notifyAll();
          LOG.info("JobNumber " + jobNumber + " succeeded");
        }
      } catch (InterruptedException ie) {};
      this.status.setRunState(JobStatus.SUCCEEDED);
    }

    @Override
    synchronized void fail() {
      this.status.setRunState(JobStatus.FAILED);
    }
  }
 
  static class FakeTaskTrackerManager implements TaskTrackerManager {
   
    int maps = 0;
    int reduces = 0;
    int maxMapTasksPerTracker = 2;
    int maxReduceTasksPerTracker = 2;
    List<JobInProgressListener> listeners =
      new ArrayList<JobInProgressListener>();
    QueueManager queueManager;
   
    private Map<String, TaskTrackerStatus> trackers =
      new HashMap<String, TaskTrackerStatus>();

    public FakeTaskTrackerManager() {
      JobConf conf = new JobConf();
      queueManager = new QueueManager(conf);
      trackers.put("tt1", new TaskTrackerStatus("tt1", "tt1.host", 1,
                   new ArrayList<TaskStatus>(), 0,
                   maxMapTasksPerTracker, maxReduceTasksPerTracker));
    }
   
    public ClusterStatus getClusterStatus() {
      int numTrackers = trackers.size();
      return new ClusterStatus(numTrackers, 0,
                               10 * 60 * 1000,
                               maps, reduces,
                               numTrackers * maxMapTasksPerTracker,
                               numTrackers * maxReduceTasksPerTracker,
                               JobTrackerStatus.RUNNING);
    }
   
    public int getNumberOfUniqueHosts() {
      return 0;
    }

    public Collection<TaskTrackerStatus> taskTrackers() {
      return trackers.values();
    }

    public void addJobInProgressListener(JobInProgressListener listener) {
      listeners.add(listener);
    }

    public void removeJobInProgressListener(JobInProgressListener listener) {
      listeners.remove(listener);
    }
   
   
    public QueueManager getQueueManager() {
      return queueManager;
    }
   
    public int getNextHeartbeatInterval() {
      return JTConfig.JT_HEARTBEAT_INTERVAL_MIN_DEFAULT;
    }

    public void killJob(JobID jobid) {
      return;
    }

    public JobInProgress getJob(JobID jobid) {
      return null;
    }

    public boolean killTask(TaskAttemptID attemptId, boolean shouldFail) {
      return true;
    }

    public void initJob(JobInProgress job) {
      try {
        JobStatus prevStatus = (JobStatus)job.getStatus().clone();
        job.initTasks();
        if (job.isJobEmpty()) {
          completeEmptyJob(job);
        } else if (!job.isSetupCleanupRequired()) {
          job.completeSetup();
        }
        JobStatus newStatus = (JobStatus)job.getStatus().clone();
        if (prevStatus.getRunState() != newStatus.getRunState()) {
          JobStatusChangeEvent event =
            new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus,
                newStatus);
          for (JobInProgressListener listener : listeners) {
            listener.jobUpdated(event);
          }
        }
      } catch (KillInterruptedException kie) {
        killJob(job.getJobID());
      } catch (IOException ioe) {
        failJob(job);
      }
    }

    private synchronized void completeEmptyJob(JobInProgress job) {
      job.completeEmptyJob();
    }

    public synchronized void failJob(JobInProgress job) {
      JobStatus prevStatus = (JobStatus)job.getStatus().clone();
      job.fail();
      JobStatus newStatus = (JobStatus)job.getStatus().clone();
      if (prevStatus.getRunState() != newStatus.getRunState()) {
        JobStatusChangeEvent event =
          new JobStatusChangeEvent(job, EventType.RUN_STATE_CHANGED, prevStatus,
              newStatus);
        for (JobInProgressListener listener : listeners) {
          listener.jobUpdated(event);
        }
      }
    }

    // Test methods
   
    public void submitJob(JobInProgress job) throws IOException {
      for (JobInProgressListener listener : listeners) {
        listener.jobAdded(job);
      }
    }
  }
 
  protected JobConf jobConf;
  protected TaskScheduler scheduler;
  private FakeTaskTrackerManager taskTrackerManager;

  @Override
  protected void setUp() throws Exception {
    resetCounters();
    jobConf = new JobConf();
    taskTrackerManager = new FakeTaskTrackerManager();
    scheduler = createTaskScheduler();
    scheduler.setConf(jobConf);
    scheduler.setTaskTrackerManager(taskTrackerManager);
    scheduler.start();
  }
 
  @Override
  protected void tearDown() throws Exception {
    if (scheduler != null) {
      scheduler.terminate();
    }
  }
 
  protected TaskScheduler createTaskScheduler() {
    return new JobQueueTaskScheduler();
  }
 
  public void testParallelInitJobs() throws IOException {
    FakeJobInProgress[] jobs = new FakeJobInProgress[NUM_JOBS];
   
    // Submit NUM_JOBS jobs in order. The init code will ensure
    // that the jobs get inited in descending order of Job ids
    // i.e. highest job id first and the smallest last.
    // If we were not doing parallel init, the first submitted job
    // will be inited first and that will hang
   
    for (int i = 0; i < NUM_JOBS; i++) {
      jobs[i] = new FakeJobInProgress(jobConf, taskTrackerManager,
      UtilsForTests.getJobTracker());
      jobs[i].getStatus().setRunState(JobStatus.PREP);
      taskTrackerManager.submitJob(jobs[i]);
    }
   
    try {
      Thread.sleep(1000);
    } catch (InterruptedException ie) {}
   
    for (int i = 0; i < NUM_JOBS; i++) {
      assertTrue(jobs[i].getStatus().getRunState() == JobStatus.SUCCEEDED);
    }
  } 
}
TOP

Related Classes of org.apache.hadoop.mapred.TestParallelInitialization

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.