Package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer

Source Code of org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.TestLocalizedResource$ContainerEventMatcher

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer;

import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.LocalResourceType;
import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerEventType;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.LocalizerResourceRequestEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceLocalizedEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceReleaseEvent;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.event.ResourceRequestEvent;
import org.junit.Test;
import org.mockito.ArgumentMatcher;

public class TestLocalizedResource {

  static ContainerId getMockContainer(int id) {
    ApplicationId appId = mock(ApplicationId.class);
    when(appId.getClusterTimestamp()).thenReturn(314159265L);
    when(appId.getId()).thenReturn(3);
    ApplicationAttemptId appAttemptId = mock(ApplicationAttemptId.class);
    when(appAttemptId.getApplicationId()).thenReturn(appId);
    when(appAttemptId.getAttemptId()).thenReturn(0);
    ContainerId container = mock(ContainerId.class);
    when(container.getId()).thenReturn(id);
    when(container.getApplicationAttemptId()).thenReturn(appAttemptId);
    return container;
  }

  @Test
  @SuppressWarnings("unchecked") // mocked generic
  public void testNotification() throws Exception {
    DrainDispatcher dispatcher = new DrainDispatcher();
    dispatcher.init(new Configuration());
    try {
      dispatcher.start();
      EventHandler<ContainerEvent> containerBus = mock(EventHandler.class);
      EventHandler<LocalizerEvent> localizerBus = mock(EventHandler.class);
      dispatcher.register(ContainerEventType.class, containerBus);
      dispatcher.register(LocalizerEventType.class, localizerBus);

      // mock resource
      LocalResource apiRsrc = createMockResource();

      final ContainerId container0 = getMockContainer(0);
      final Credentials creds0 = new Credentials();
      final LocalResourceVisibility vis0 = LocalResourceVisibility.PRIVATE;
      final LocalizerContext ctxt0 =
        new LocalizerContext("yak", container0, creds0);
      LocalResourceRequest rsrcA = new LocalResourceRequest(apiRsrc);
      LocalizedResource local = new LocalizedResource(rsrcA, dispatcher);
      local.handle(new ResourceRequestEvent(rsrcA, vis0, ctxt0));
      dispatcher.await();

      // Register C0, verify request event
      LocalizerEventMatcher matchesL0Req =
        new LocalizerEventMatcher(container0, creds0, vis0,
            LocalizerEventType.REQUEST_RESOURCE_LOCALIZATION);
      verify(localizerBus).handle(argThat(matchesL0Req));
      assertEquals(ResourceState.DOWNLOADING, local.getState());

      // Register C1, verify request event
      final Credentials creds1 = new Credentials();
      final ContainerId container1 = getMockContainer(1);
      final LocalizerContext ctxt1 =
        new LocalizerContext("yak", container1, creds1);
      final LocalResourceVisibility vis1 = LocalResourceVisibility.PUBLIC;
      local.handle(new ResourceRequestEvent(rsrcA, vis1, ctxt1));
      dispatcher.await();
      LocalizerEventMatcher matchesL1Req =
        new LocalizerEventMatcher(container1, creds1, vis1,
            LocalizerEventType.REQUEST_RESOURCE_LOCALIZATION);
      verify(localizerBus).handle(argThat(matchesL1Req));

      // Release C0 container localization, verify no notification
      local.handle(new ResourceReleaseEvent(rsrcA, container0));
      dispatcher.await();
      verify(containerBus, never()).handle(isA(ContainerEvent.class));
      assertEquals(ResourceState.DOWNLOADING, local.getState());

      // Release C1 container localization, verify no notification
      local.handle(new ResourceReleaseEvent(rsrcA, container1));
      dispatcher.await();
      verify(containerBus, never()).handle(isA(ContainerEvent.class));
      assertEquals(ResourceState.INIT, local.getState());

      // Register C2, C3
      final ContainerId container2 = getMockContainer(2);
      final LocalResourceVisibility vis2 = LocalResourceVisibility.PRIVATE;
      final Credentials creds2 = new Credentials();
      final LocalizerContext ctxt2 =
        new LocalizerContext("yak", container2, creds2);

      final ContainerId container3 = getMockContainer(3);
      final LocalResourceVisibility vis3 = LocalResourceVisibility.PRIVATE;
      final Credentials creds3 = new Credentials();
      final LocalizerContext ctxt3 =
        new LocalizerContext("yak", container3, creds3);

      local.handle(new ResourceRequestEvent(rsrcA, vis2, ctxt2));
      local.handle(new ResourceRequestEvent(rsrcA, vis3, ctxt3));
      dispatcher.await();
      LocalizerEventMatcher matchesL2Req =
        new LocalizerEventMatcher(container2, creds2, vis2,
            LocalizerEventType.REQUEST_RESOURCE_LOCALIZATION);
      verify(localizerBus).handle(argThat(matchesL2Req));
      LocalizerEventMatcher matchesL3Req =
        new LocalizerEventMatcher(container3, creds3, vis3,
            LocalizerEventType.REQUEST_RESOURCE_LOCALIZATION);
      verify(localizerBus).handle(argThat(matchesL3Req));

      // Successful localization. verify notification C2, C3
      Path locA = new Path("file:///cache/rsrcA");
      local.handle(new ResourceLocalizedEvent(rsrcA, locA, 10));
      dispatcher.await();
      ContainerEventMatcher matchesC2Localized =
        new ContainerEventMatcher(container2,
            ContainerEventType.RESOURCE_LOCALIZED);
      ContainerEventMatcher matchesC3Localized =
        new ContainerEventMatcher(container3,
            ContainerEventType.RESOURCE_LOCALIZED);
      verify(containerBus).handle(argThat(matchesC2Localized));
      verify(containerBus).handle(argThat(matchesC3Localized));
      assertEquals(ResourceState.LOCALIZED, local.getState());

      // Register C4, verify notification
      final ContainerId container4 = getMockContainer(4);
      final Credentials creds4 = new Credentials();
      final LocalizerContext ctxt4 =
        new LocalizerContext("yak", container4, creds4);
      final LocalResourceVisibility vis4 = LocalResourceVisibility.PRIVATE;
      local.handle(new ResourceRequestEvent(rsrcA, vis4, ctxt4));
      dispatcher.await();
      ContainerEventMatcher matchesC4Localized =
        new ContainerEventMatcher(container4,
            ContainerEventType.RESOURCE_LOCALIZED);
      verify(containerBus).handle(argThat(matchesC4Localized));
      assertEquals(ResourceState.LOCALIZED, local.getState());
    } finally {
      dispatcher.stop();
    }
  }

  @Test
  public void testDirectLocalization() throws Exception {
    DrainDispatcher dispatcher = new DrainDispatcher();
    dispatcher.init(new Configuration());
    try {
      dispatcher.start();
      LocalResource apiRsrc = createMockResource();
      LocalResourceRequest rsrcA = new LocalResourceRequest(apiRsrc);
      LocalizedResource local = new LocalizedResource(rsrcA, dispatcher);
      Path p = new Path("file:///cache/rsrcA");
      local.handle(new ResourceLocalizedEvent(rsrcA, p, 10));
      dispatcher.await();
      assertEquals(ResourceState.LOCALIZED, local.getState());
    } finally {
      dispatcher.stop();
    }
  }

  static LocalResource createMockResource() {
    // mock rsrc location
    org.apache.hadoop.yarn.api.records.URL uriA =
      mock(org.apache.hadoop.yarn.api.records.URL.class);
    when(uriA.getScheme()).thenReturn("file");
    when(uriA.getHost()).thenReturn(null);
    when(uriA.getFile()).thenReturn("/localA/rsrc");

    LocalResource apiRsrc = mock(LocalResource.class);
    when(apiRsrc.getResource()).thenReturn(uriA);
    when(apiRsrc.getTimestamp()).thenReturn(4344L);
    when(apiRsrc.getType()).thenReturn(LocalResourceType.FILE);
    return apiRsrc;
  }


  static class LocalizerEventMatcher extends ArgumentMatcher<LocalizerEvent> {
    Credentials creds;
    LocalResourceVisibility vis;
    private final ContainerId idRef;
    private final LocalizerEventType type;

    public LocalizerEventMatcher(ContainerId idRef, Credentials creds,
        LocalResourceVisibility vis, LocalizerEventType type) {
      this.vis = vis;
      this.type = type;
      this.creds = creds;
      this.idRef = idRef;
    }
    @Override
    public boolean matches(Object o) {
      if (!(o instanceof LocalizerResourceRequestEvent)) return false;
      LocalizerResourceRequestEvent evt = (LocalizerResourceRequestEvent) o;
      return idRef == evt.getContext().getContainerId()
          && type == evt.getType()
          && vis == evt.getVisibility()
          && creds == evt.getContext().getCredentials();
    }
  }

  static class ContainerEventMatcher extends ArgumentMatcher<ContainerEvent> {
    private final ContainerId idRef;
    private final ContainerEventType type;
    public ContainerEventMatcher(ContainerId idRef, ContainerEventType type) {
      this.idRef = idRef;
      this.type = type;
    }
    @Override
    public boolean matches(Object o) {
      if (!(o instanceof ContainerEvent)) return false;
      ContainerEvent evt = (ContainerEvent) o;
      return idRef == evt.getContainerID() && type == evt.getType();
    }
  }

}
TOP

Related Classes of org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.TestLocalizedResource$ContainerEventMatcher

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.