Package org.apache.hadoop.mapred

Source Code of org.apache.hadoop.mapred.TestRefreshOfQueues

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.mapred;

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.fail;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapred.CapacityTestUtils.ControlledInitializationPoller;
import org.apache.hadoop.mapred.CapacityTestUtils.FakeJobInProgress;
import org.apache.hadoop.mapred.CapacityTestUtils.FakeTaskTrackerManager;
import static org.apache.hadoop.mapred.CapacityTestUtils.*;
import org.junit.After;
import org.junit.Test;

/**
* Test the Queue-Refresh feature.
*/
public class TestRefreshOfQueues {

  private static final Log LOG =
      LogFactory.getLog(org.apache.hadoop.mapred.TestRefreshOfQueues.class);

  String queueConfigPath =
      System.getProperty("test.build.extraconf", "build/test/extraconf");
  File queueConfigFile =
      new File(queueConfigPath, QueueManager.QUEUE_CONF_FILE_NAME);

  private CapacityTaskScheduler scheduler;
  private FakeTaskTrackerManager taskTrackerManager;

  /**
   * Remove the queueConfigFile once the test is done.
   */
  @After
  public void tearDown() {
    if (queueConfigFile.exists()) {
      queueConfigFile.delete();
    }
  }

  /**
   * Sets up the scheduler, TaskTrackerManager, QueueManager, initializer and
   * starts the scheduler.
   *
   * @throws IOException
   */
  private void setupAndStartSchedulerFramework(int numTTs, int numMapsPerTT,
      int numReducesPerTT)
      throws IOException {
    scheduler = new CapacityTaskScheduler();
    taskTrackerManager =
        new FakeTaskTrackerManager(numTTs, numMapsPerTT, numReducesPerTT);
    taskTrackerManager.setQueueManager(new QueueManager());
    scheduler.setTaskTrackerManager(taskTrackerManager);
    scheduler.setConf(new Configuration());
    ControlledInitializationPoller controlledInitializationPoller =
        new ControlledInitializationPoller(scheduler.jobQueuesManager,
            taskTrackerManager);
    scheduler.setInitializationPoller(controlledInitializationPoller);
    taskTrackerManager.addJobInProgressListener(scheduler.jobQueuesManager);
    scheduler.start();
  }

  /**
   * Helper method that ensures TaskScheduler is locked before calling
   * {@link QueueManager#refreshQueues(Configuration,
   *    org.apache.hadoop.mapred.TaskScheduler.QueueRefresher)}.
   */
  private static void refreshQueues(QueueManager qm, Configuration conf,
      TaskScheduler ts) throws IOException {
    synchronized (ts) {
      qm.refreshQueues(conf, ts.getQueueRefresher());
    }
  }

  /**
   * @throws Throwable
   */
  @Test
  public void testRefreshOfQueuesSanity()
      throws Throwable {

    JobQueueInfo[] queues = TestQueueManagerRefresh.getSimpleQueueHierarchy();

    Properties[] props = new Properties[3];
    for (int i = 0; i < props.length; i++) {
      props[i] = queues[i].getProperties();
      props[i].setProperty(CapacitySchedulerConf.CAPACITY_PROPERTY,
          String.valueOf(i + 10));
      props[i].setProperty(CapacitySchedulerConf.MAX_CAPACITY_PROPERTY,
          String.valueOf(i + 15));
      props[i].setProperty(CapacitySchedulerConf.SUPPORTS_PRIORITY_PROPERTY,
          String.valueOf(false));
      props[i].setProperty(
          CapacitySchedulerConf.MAXIMUM_INITIALIZED_JOBS_PER_USER_PROPERTY,
          String.valueOf(i + 11));
      props[i].setProperty(
          CapacitySchedulerConf.MINIMUM_USER_LIMIT_PERCENT_PROPERTY,
          String.valueOf(i + 16));
    }

    // write the configuration file
    QueueManagerTestUtils.writeQueueConfigurationFile(
        queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });

    setupAndStartSchedulerFramework(0, 0, 0);

    Map<String, AbstractQueue> allQueues = getAllQueues(scheduler);

    // Verify the configuration.
    for (int i = 0; i < queues.length; i++) {
      String qName = queues[i].getQueueName();
      LOG.info("Queue name : " + qName);
      QueueSchedulingContext qsc =
          allQueues.get(qName).getQueueSchedulingContext();
      LOG.info("Context for queue " + qName + " is : " + qsc);
      assertEquals(i + 10, qsc.getCapacityPercent(), 0);
      assertEquals(i + 15, qsc.getMaxCapacityPercent(), 0);
      assertEquals(Boolean.valueOf(false),
          Boolean.valueOf(qsc.supportsPriorities()));
      assertEquals(i + 16, qsc.getUlMin());
    }

    // change configuration
    for (int i = 0; i < props.length; i++) {
      props[i] = queues[i].getProperties();
      props[i].setProperty(CapacitySchedulerConf.CAPACITY_PROPERTY,
          String.valueOf(i + 20));
      props[i].setProperty(CapacitySchedulerConf.MAX_CAPACITY_PROPERTY,
          String.valueOf(i + 25));
      props[i].setProperty(CapacitySchedulerConf.SUPPORTS_PRIORITY_PROPERTY,
          String.valueOf(false));
      props[i].setProperty(
          CapacitySchedulerConf.MAXIMUM_INITIALIZED_JOBS_PER_USER_PROPERTY,
          String.valueOf(i + 5));
      props[i].setProperty(
          CapacitySchedulerConf.MINIMUM_USER_LIMIT_PERCENT_PROPERTY,
          String.valueOf(i + 10));
    }

    // Re-write the configuration file
    QueueManagerTestUtils.writeQueueConfigurationFile(
        queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });

    // Now do scheduler refresh.
    refreshQueues(taskTrackerManager.getQueueManager(), null, scheduler);

    allQueues = getAllQueues(scheduler);

    for (int i = 0; i < queues.length; i++) {
      String qName = queues[i].getQueueName();
      LOG.info("Queue name : " + qName);
      QueueSchedulingContext qsc =
        allQueues.get(qName).getQueueSchedulingContext();
      assertEquals(qName, qsc.getQueueName());
      LOG.info("Context for queue " + qName + " is : " + qsc);
      assertEquals(i + 20, qsc.getCapacityPercent(), 0);
      assertEquals(i + 25, qsc.getMaxCapacityPercent(), 0);
      assertEquals(Boolean.valueOf(false),
          Boolean.valueOf(qsc.supportsPriorities()));
    }
  }

  /**
   * @throws Throwable
   */
   @Test
  public void testSuccessfulCapacityRefresh()
      throws Throwable {

    JobQueueInfo[] queues = TestQueueManagerRefresh.getSimpleQueueHierarchy();

    queues[0].getProperties().setProperty(
      CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(100));
    queues[1].getProperties().setProperty(
      CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
    queues[2].getProperties().setProperty(
      CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));

    // write the configuration file
    QueueManagerTestUtils.writeQueueConfigurationFile(
      queueConfigFile.getAbsolutePath(), new JobQueueInfo[]{queues[0]});

    setupAndStartSchedulerFramework(2, 2, 2);

    FakeJobInProgress job1 =
      taskTrackerManager.submitJobAndInit(
        JobStatus.PREP, 2, 2,
        queues[1].getQueueName(), "user");
    FakeJobInProgress job2 =
      taskTrackerManager.submitJobAndInit(
        JobStatus.PREP, 2, 2,
        queues[2].getQueueName(), "user");

    Map<String, String> expectedStrings = new HashMap<String, String>();
    expectedStrings.put(MAP, "attempt_test_0001_m_000001_0 on tt1");
    expectedStrings.put(REDUCE, "attempt_test_0001_r_000001_0 on tt1");
    checkMultipleTaskAssignment(
      taskTrackerManager, scheduler, "tt1",
      expectedStrings);
//===========================================
    expectedStrings.clear();
    expectedStrings.put(MAP, "attempt_test_0002_m_000001_0 on tt1");
    expectedStrings.put(REDUCE, "attempt_test_0002_r_000001_0 on tt1");
    checkMultipleTaskAssignment(
      taskTrackerManager, scheduler, "tt1",
      expectedStrings);
//============================================
    expectedStrings.clear();
    expectedStrings.put(MAP, "attempt_test_0002_m_000002_0 on tt2");
    expectedStrings.put(REDUCE, "attempt_test_0002_r_000002_0 on tt2");
    checkMultipleTaskAssignment(
      taskTrackerManager, scheduler, "tt2",
      expectedStrings);
//============================================
    expectedStrings.clear();
    expectedStrings.put(MAP, "attempt_test_0001_m_000002_0 on tt2");
    expectedStrings.put(REDUCE, "attempt_test_0001_r_000002_0 on tt2");
    checkMultipleTaskAssignment(
      taskTrackerManager, scheduler, "tt2",
      expectedStrings);

    taskTrackerManager.killJob(job1.getJobID());
    taskTrackerManager.killJob(job2.getJobID());

    // change configuration
    queues[1].getProperties().setProperty(
      CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(25));
    queues[2].getProperties().setProperty(
      CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(75));

    // Re-write the configuration file
    QueueManagerTestUtils.writeQueueConfigurationFile(
      queueConfigFile.getAbsolutePath(), new JobQueueInfo[]{queues[0]});

    refreshQueues(taskTrackerManager.getQueueManager(), null, scheduler);

    job1 =
      taskTrackerManager.submitJobAndInit(
        JobStatus.PREP, 2, 2,
        queues[1].getQueueName(), "user");
    job2 =
      taskTrackerManager.submitJobAndInit(
        JobStatus.PREP, 4, 4,
        queues[2].getQueueName(), "user");

    expectedStrings.clear();
    expectedStrings.put(MAP, "attempt_test_0003_m_000001_0 on tt1");
    expectedStrings.put(REDUCE, "attempt_test_0003_r_000001_0 on tt1");
    checkMultipleTaskAssignment(
      taskTrackerManager, scheduler, "tt1",
      expectedStrings);


    expectedStrings.clear();
    expectedStrings.put(MAP, "attempt_test_0004_m_000001_0 on tt1");
    expectedStrings.put(REDUCE, "attempt_test_0004_r_000001_0 on tt1");
    checkMultipleTaskAssignment(
      taskTrackerManager, scheduler, "tt1",
      expectedStrings);


    expectedStrings.clear();
    expectedStrings.put(MAP, "attempt_test_0004_m_000002_0 on tt2");
    expectedStrings.put(REDUCE, "attempt_test_0004_r_000002_0 on tt2");
    checkMultipleTaskAssignment(
      taskTrackerManager, scheduler, "tt2",
      expectedStrings);

    expectedStrings.clear();
    expectedStrings.put(MAP, "attempt_test_0004_m_000003_0 on tt2");
    expectedStrings.put(REDUCE, "attempt_test_0004_r_000003_0 on tt2");
    checkMultipleTaskAssignment(
      taskTrackerManager, scheduler, "tt2",
      expectedStrings);

  }

  /**
   * Test to verify that the refresh of the scheduler fails when modified
   * configuration overflows 100%
   *
   * @throws Throwable
   */
   @Test
  public void testFailingCapacityRefresh()
      throws Throwable {

    JobQueueInfo[] queues = TestQueueManagerRefresh.getSimpleQueueHierarchy();

    queues[0].getProperties().setProperty(
        CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(100));
    queues[1].getProperties().setProperty(
        CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(70));
    queues[2].getProperties().setProperty(
        CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));

    // write the configuration file
    QueueManagerTestUtils.writeQueueConfigurationFile(
        queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });

    try {
      setupAndStartSchedulerFramework(2, 2, 2);
      fail("Scheduler should have failed to start!");
    } catch (IOException ioe) {
      assertTrue(ioe.getMessage().contains(
          String.format(QueueHierarchyBuilder.TOTAL_CAPACITY_OVERFLOWN_MSG,
              queues[1].getQueueName() + "," + queues[2].getQueueName(),
              Float.valueOf(120.0f))));
    }

    // Rectify the properties and start the scheduler
    queues[1].getProperties().setProperty(
        CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));

    // write the configuration file
    QueueManagerTestUtils.writeQueueConfigurationFile(
        queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });

    setupAndStartSchedulerFramework(2, 2, 2);

    // Now change configuration.
    queues[1].getProperties().setProperty(
        CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(35));
    queues[2].getProperties().setProperty(
        CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(95));

    // Re-write the configuration file
    QueueManagerTestUtils.writeQueueConfigurationFile(
        queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });

    try {
      refreshQueues(taskTrackerManager.getQueueManager(), null, scheduler);
    } catch (IOException ioe) {
      assertTrue(ioe.getMessage().contains(
          String.format(QueueHierarchyBuilder.TOTAL_CAPACITY_OVERFLOWN_MSG,
              queues[1].getQueueName() + "," + queues[2].getQueueName(),
              Float.valueOf(130.0f))));
    }
  }

  /**
   * @throws Throwable
   */
   @Test
  public void testRefreshUserLimits()
      throws Throwable {

    JobQueueInfo[] queues = TestQueueManagerRefresh.getSimpleQueueHierarchy();

    queues[0].getProperties().setProperty(
      CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(100));
    queues[1].getProperties().setProperty(
      CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
    queues[2].getProperties().setProperty(
      CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));

    queues[2].getProperties().setProperty(
      CapacitySchedulerConf.MINIMUM_USER_LIMIT_PERCENT_PROPERTY,
      String.valueOf(100));

    // write the configuration file
    QueueManagerTestUtils.writeQueueConfigurationFile(
      queueConfigFile.getAbsolutePath(), new JobQueueInfo[]{queues[0]});

    setupAndStartSchedulerFramework(1, 2, 2);

    FakeJobInProgress job1 =
      taskTrackerManager.submitJobAndInit(
        JobStatus.PREP, 2, 2,
        queues[2].getQueueName(), "user1");
    FakeJobInProgress job2 =
      taskTrackerManager.submitJobAndInit(
        JobStatus.PREP, 2, 2,
        queues[2].getQueueName(), "user2");

    Map<String, String> expectedStrings = new HashMap<String, String>();
    expectedStrings.put(MAP, "attempt_test_0001_m_000001_0 on tt1");
    expectedStrings.put(REDUCE, "attempt_test_0001_r_000001_0 on tt1");
    checkMultipleTaskAssignment(
      taskTrackerManager, scheduler, "tt1",
      expectedStrings);
   
    expectedStrings.clear();
    expectedStrings.put(MAP, "attempt_test_0001_m_000002_0 on tt1");
    expectedStrings.put(REDUCE, "attempt_test_0001_r_000002_0 on tt1");
    checkMultipleTaskAssignment(
      taskTrackerManager, scheduler, "tt1",
      expectedStrings);

    assertNull(scheduler.assignTasks(taskTrackerManager.getTaskTracker("tt1")));
    taskTrackerManager.killJob(job1.getJobID());
    taskTrackerManager.killJob(job2.getJobID());

    // change configuration
    queues[2].getProperties().setProperty(
      CapacitySchedulerConf.MINIMUM_USER_LIMIT_PERCENT_PROPERTY,
      String.valueOf(50));

    // Re-write the configuration file
    QueueManagerTestUtils.writeQueueConfigurationFile(
      queueConfigFile.getAbsolutePath(), new JobQueueInfo[]{queues[0]});

    refreshQueues(taskTrackerManager.getQueueManager(), null, scheduler);

    job1 =
      taskTrackerManager.submitJobAndInit(
        JobStatus.PREP, 2, 2,
        queues[1].getQueueName(), "user1");
    job2 =
      taskTrackerManager.submitJobAndInit(
        JobStatus.PREP, 2, 2,
        queues[2].getQueueName(), "user2");

    expectedStrings.clear();
    expectedStrings.put(MAP, "attempt_test_0003_m_000001_0 on tt1");
    expectedStrings.put(REDUCE, "attempt_test_0003_r_000001_0 on tt1");
    checkMultipleTaskAssignment(
      taskTrackerManager, scheduler, "tt1",
      expectedStrings);

    expectedStrings.clear();
    expectedStrings.put(MAP, "attempt_test_0004_m_000001_0 on tt1");
    expectedStrings.put(REDUCE, "attempt_test_0004_r_000001_0 on tt1");
    checkMultipleTaskAssignment(
      taskTrackerManager, scheduler, "tt1",
      expectedStrings);
  }

  /**
   * Get a map of all {@link AbstractQueue}s.
   *
   * @param sched
   * @return
   */
  private static Map<String, AbstractQueue> getAllQueues(
      CapacityTaskScheduler sched) {
    AbstractQueue rootQueue = sched.getRoot();
    HashMap<String, AbstractQueue> allQueues =
        new HashMap<String, AbstractQueue>();
    List<AbstractQueue> allQueuesList = new ArrayList<AbstractQueue>();
    allQueuesList.addAll(rootQueue.getDescendentJobQueues());
    allQueuesList.addAll(rootQueue.getDescendantContainerQueues());
    for (AbstractQueue q : allQueuesList) {
      LOG.info("Putting in allQueues list " + q.getName());
      allQueues.put(q.getName(), q);
    }
    return allQueues;
  }
}
TOP

Related Classes of org.apache.hadoop.mapred.TestRefreshOfQueues

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.