Package org.apache.tez.dag.app.rm.node

Source Code of org.apache.tez.dag.app.rm.node.TestAMNodeMap

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.tez.dag.app.rm.node;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.mock;

import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.Event;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.app.AppContext;
import org.apache.tez.dag.app.rm.AMSchedulerEventNodeBlacklistUpdate;
import org.apache.tez.dag.app.rm.AMSchedulerEventType;
import org.apache.tez.dag.app.rm.TaskSchedulerEventHandler;
import org.apache.tez.dag.app.rm.container.AMContainerEventNodeFailed;
import org.apache.tez.dag.app.rm.container.AMContainerEventType;
import org.apache.tez.dag.app.rm.container.AMContainerMap;
import org.apache.tez.dag.records.TezTaskAttemptID;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

import com.google.common.collect.Lists;

@SuppressWarnings({ "resource", "rawtypes" })
public class TestAMNodeMap {

  private static final Log LOG = LogFactory.getLog(TestAMNodeMap.class);

  DrainDispatcher dispatcher;
  EventHandler eventHandler;
 
  @Before
  public void setup() {
    dispatcher = new DrainDispatcher();
    dispatcher.init(new Configuration());
    dispatcher.start();
    eventHandler = dispatcher.getEventHandler();
  }
 
  class TestEventHandler implements EventHandler{
    List<Event> events = Lists.newLinkedList();
    @SuppressWarnings("unchecked")
    @Override
    public void handle(Event event) {
      events.add(event);
      eventHandler.handle(event);
    }
  }
 
  @After
  public void teardown() {
    dispatcher.stop();
  }
 
  @Test(timeout=5000)
  public void testHealthUpdateKnownNode() {
    AppContext appContext = mock(AppContext.class);

    AMNodeMap amNodeMap = new AMNodeMap(eventHandler, appContext);
    amNodeMap.init(new Configuration(false));
    amNodeMap.start();

    NodeId nodeId = NodeId.newInstance("host1", 2342);
    amNodeMap.nodeSeen(nodeId);

    NodeReport nodeReport = generateNodeReport(nodeId, NodeState.UNHEALTHY);
    amNodeMap.handle(new AMNodeEventStateChanged(nodeReport));
    dispatcher.await();
    assertEquals(AMNodeState.UNHEALTHY, amNodeMap.get(nodeId).getState());
    amNodeMap.stop();
  }

  @Test(timeout=5000)
  public void testHealthUpdateUnknownNode() {
    AppContext appContext = mock(AppContext.class);

    AMNodeMap amNodeMap = new AMNodeMap(eventHandler, appContext);
    amNodeMap.init(new Configuration(false));
    amNodeMap.start();

    NodeId nodeId = NodeId.newInstance("unknownhost", 2342);

    NodeReport nodeReport = generateNodeReport(nodeId, NodeState.UNHEALTHY);
    amNodeMap.handle(new AMNodeEventStateChanged(nodeReport));
    dispatcher.await();

    amNodeMap.stop();
    // No exceptions - the status update was ignored. Not bothering to capture
    // the log message for verification.
  }
 
  @Test(timeout=10000)
  public void testNodeSelfBlacklist() throws InterruptedException {
    AppContext appContext = mock(AppContext.class);
    Configuration conf = new Configuration(false);
    conf.setInt(TezConfiguration.TEZ_AM_MAX_TASK_FAILURES_PER_NODE, 2);
    TestEventHandler handler = new TestEventHandler();
    AMNodeMap amNodeMap = new AMNodeMap(handler, appContext);
    AMContainerMap amContainerMap = mock(AMContainerMap.class);
    TaskSchedulerEventHandler taskSchedulerEventHandler =
        mock(TaskSchedulerEventHandler.class);
    dispatcher.register(AMNodeEventType.class, amNodeMap);
    dispatcher.register(AMContainerEventType.class, amContainerMap);
    dispatcher.register(AMSchedulerEventType.class, taskSchedulerEventHandler);
    amNodeMap.init(conf);
    amNodeMap.start();

    amNodeMap.handle(new AMNodeEventNodeCountUpdated(4));
    NodeId nodeId = NodeId.newInstance("host1", 1234);
    NodeId nodeId2 = NodeId.newInstance("host2", 1234);
    NodeId nodeId3 = NodeId.newInstance("host3", 1234);
    NodeId nodeId4 = NodeId.newInstance("host4", 1234);
    amNodeMap.nodeSeen(nodeId);
    amNodeMap.nodeSeen(nodeId2);
    amNodeMap.nodeSeen(nodeId3);
    amNodeMap.nodeSeen(nodeId4);
    AMNodeImpl node = (AMNodeImpl) amNodeMap.get(nodeId);
   
    ContainerId cId1 = mock(ContainerId.class);
    ContainerId cId2 = mock(ContainerId.class);
    ContainerId cId3 = mock(ContainerId.class);
   
    amNodeMap.handle(new AMNodeEventContainerAllocated(nodeId, cId1));
    amNodeMap.handle(new AMNodeEventContainerAllocated(nodeId, cId2));
    amNodeMap.handle(new AMNodeEventContainerAllocated(nodeId, cId3));
    assertEquals(3, node.containers.size());
   
    TezTaskAttemptID ta1 = mock(TezTaskAttemptID.class);
    TezTaskAttemptID ta2 = mock(TezTaskAttemptID.class);
    TezTaskAttemptID ta3 = mock(TezTaskAttemptID.class);
   
    amNodeMap.handle(new AMNodeEventTaskAttemptSucceeded(nodeId, cId1, ta1));
    assertEquals(1, node.numSuccessfulTAs);
   
    amNodeMap.handle(new AMNodeEventTaskAttemptEnded(nodeId, cId2, ta2, true));
    assertEquals(1, node.numSuccessfulTAs);
    assertEquals(1, node.numFailedTAs);
    assertEquals(AMNodeState.ACTIVE, node.getState());
    // duplicate should not affect anything
    amNodeMap.handle(new AMNodeEventTaskAttemptEnded(nodeId, cId2, ta2, true));
    assertEquals(1, node.numSuccessfulTAs);
    assertEquals(1, node.numFailedTAs);
    assertEquals(AMNodeState.ACTIVE, node.getState());
   
    amNodeMap.handle(new AMNodeEventTaskAttemptEnded(nodeId, cId3, ta3, true));
    dispatcher.await();
    assertEquals(1, node.numSuccessfulTAs);
    assertEquals(2, node.numFailedTAs);
    assertEquals(AMNodeState.BLACKLISTED, node.getState());
   
    assertEquals(5, handler.events.size());
    assertEquals(AMContainerEventType.C_NODE_FAILED, handler.events.get(0).getType());
    assertEquals(cId1, ((AMContainerEventNodeFailed)handler.events.get(0)).getContainerId());
    assertEquals(AMContainerEventType.C_NODE_FAILED, handler.events.get(1).getType());
    assertEquals(cId2, ((AMContainerEventNodeFailed)handler.events.get(1)).getContainerId());
    assertEquals(AMContainerEventType.C_NODE_FAILED, handler.events.get(2).getType());
    assertEquals(cId3, ((AMContainerEventNodeFailed)handler.events.get(2)).getContainerId());
    assertEquals(AMSchedulerEventType.S_NODE_BLACKLISTED, handler.events.get(3).getType());
    assertEquals(node.getNodeId(), ((AMSchedulerEventNodeBlacklistUpdate)handler.events.get(3)).getNodeId());
    assertEquals(AMNodeEventType.N_NODE_WAS_BLACKLISTED, handler.events.get(4).getType());
    assertEquals(node.getNodeId(), ((AMNodeEvent)handler.events.get(4)).getNodeId());
   
    ContainerId cId4 = mock(ContainerId.class);
    ContainerId cId5 = mock(ContainerId.class);
    TezTaskAttemptID ta4 = mock(TezTaskAttemptID.class);
    TezTaskAttemptID ta5 = mock(TezTaskAttemptID.class);
    AMNodeImpl node2 = (AMNodeImpl) amNodeMap.get(nodeId2);
    amNodeMap.handle(new AMNodeEventContainerAllocated(nodeId2, cId4));
    amNodeMap.handle(new AMNodeEventContainerAllocated(nodeId2, cId5));
   
    amNodeMap.handle(new AMNodeEventTaskAttemptEnded(nodeId2, cId4, ta4, true));
    assertEquals(1, node2.numFailedTAs);
    assertEquals(AMNodeState.ACTIVE, node2.getState());
   
    handler.events.clear();
    amNodeMap.handle(new AMNodeEventTaskAttemptEnded(nodeId2, cId5, ta5, true));
    dispatcher.await();
    assertEquals(2, node2.numFailedTAs);
    assertEquals(AMNodeState.FORCED_ACTIVE, node2.getState());
    AMNodeImpl node3 = (AMNodeImpl)amNodeMap.get(nodeId3);
    assertEquals(AMNodeState.FORCED_ACTIVE, node3.getState());
    assertEquals(10, handler.events.size());

    assertEquals(AMContainerEventType.C_NODE_FAILED, handler.events.get(0).getType());
    assertEquals(AMContainerEventType.C_NODE_FAILED, handler.events.get(1).getType());
    assertEquals(AMSchedulerEventType.S_NODE_BLACKLISTED, handler.events.get(2).getType());
    assertEquals(AMNodeEventType.N_NODE_WAS_BLACKLISTED, handler.events.get(3).getType());
    assertEquals(AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED, handler.events.get(4).getType());
    assertEquals(AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED, handler.events.get(5).getType());
    assertEquals(AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED, handler.events.get(6).getType());
    assertEquals(AMNodeEventType.N_IGNORE_BLACKLISTING_ENABLED, handler.events.get(7).getType());
    assertEquals(AMSchedulerEventType.S_NODE_UNBLACKLISTED, handler.events.get(8).getType());
    assertEquals(AMSchedulerEventType.S_NODE_UNBLACKLISTED, handler.events.get(9).getType());
    // drain all previous events
    Thread.sleep(500l);
    dispatcher.await();

    handler.events.clear();
    amNodeMap.handle(new AMNodeEventNodeCountUpdated(8));
    dispatcher.await();
    Thread.sleep(1000l);
    dispatcher.await();
    LOG.info(("Completed waiting for dispatcher to process all pending events"));
    assertEquals(AMNodeState.BLACKLISTED, node.getState());
    assertEquals(AMNodeState.BLACKLISTED, node2.getState());
    assertEquals(AMNodeState.ACTIVE, node3.getState());
    assertEquals(8, handler.events.size());

    int index = 0;
    int numBlacklistingDisabledEvents = 0;
    int numNodeBlacklistedEvents = 0;
    int numNodeWasBlacklistedEvents = 0;
    for (Event event : handler.events) {
      LOG.info("Logging event: index:" + index++
          + " type: " + event.getType());
      if (event.getType() == AMNodeEventType.N_IGNORE_BLACKLISTING_DISABLED) {
        numBlacklistingDisabledEvents++;
      } else if (event.getType() == AMSchedulerEventType.S_NODE_BLACKLISTED) {
        numNodeBlacklistedEvents++;
      } else if (event.getType() == AMNodeEventType.N_NODE_WAS_BLACKLISTED) {
        numNodeWasBlacklistedEvents++;
      } else {
        Assert.assertTrue("Unexpected event: " + event.getType(), false);       
      }
    }
    assertEquals(4, numBlacklistingDisabledEvents);
    assertEquals(2, numNodeBlacklistedEvents);
    assertEquals(2, numNodeWasBlacklistedEvents);
   
    amNodeMap.stop();
  }

  private static NodeReport generateNodeReport(NodeId nodeId, NodeState nodeState) {
    NodeReport nodeReport = NodeReport.newInstance(nodeId, nodeState, nodeId.getHost() + ":3433",
        "/default-rack", Resource.newInstance(0, 0), Resource.newInstance(10240, 12), 10,
        nodeState.toString(), System.currentTimeMillis());
    return nodeReport;
  }
}
TOP

Related Classes of org.apache.tez.dag.app.rm.node.TestAMNodeMap

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.