Package org.apache.spark.network.shuffle

Source Code of org.apache.spark.network.shuffle.RetryingBlockFetcherSuite

/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements.  See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License.  You may obtain a copy of the License at
*
*    http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.network.shuffle;


import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedHashSet;
import java.util.Map;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.mockito.stubbing.Stubber;

import static org.junit.Assert.*;
import static org.mockito.Mockito.*;

import org.apache.spark.network.buffer.ManagedBuffer;
import org.apache.spark.network.buffer.NioManagedBuffer;
import org.apache.spark.network.util.SystemPropertyConfigProvider;
import org.apache.spark.network.util.TransportConf;
import static org.apache.spark.network.shuffle.RetryingBlockFetcher.BlockFetchStarter;

/**
* Tests retry logic by throwing IOExceptions and ensuring that subsequent attempts are made to
* fetch the lost blocks.
*/
public class RetryingBlockFetcherSuite {

  ManagedBuffer block0 = new NioManagedBuffer(ByteBuffer.wrap(new byte[13]));
  ManagedBuffer block1 = new NioManagedBuffer(ByteBuffer.wrap(new byte[7]));
  ManagedBuffer block2 = new NioManagedBuffer(ByteBuffer.wrap(new byte[19]));

  @Before
  public void beforeEach() {
    System.setProperty("spark.shuffle.io.maxRetries", "2");
    System.setProperty("spark.shuffle.io.retryWaitMs", "0");
  }

  @After
  public void afterEach() {
    System.clearProperty("spark.shuffle.io.maxRetries");
    System.clearProperty("spark.shuffle.io.retryWaitMs");
  }

  @Test
  public void testNoFailures() throws IOException {
    BlockFetchingListener listener = mock(BlockFetchingListener.class);

    Map[] interactions = new Map[] {
      // Immediately return both blocks successfully.
      ImmutableMap.<String, Object>builder()
        .put("b0", block0)
        .put("b1", block1)
        .build(),
    };

    performInteractions(interactions, listener);

    verify(listener).onBlockFetchSuccess("b0", block0);
    verify(listener).onBlockFetchSuccess("b1", block1);
    verifyNoMoreInteractions(listener);
  }

  @Test
  public void testUnrecoverableFailure() throws IOException {
    BlockFetchingListener listener = mock(BlockFetchingListener.class);

    Map[] interactions = new Map[] {
      // b0 throws a non-IOException error, so it will be failed without retry.
      ImmutableMap.<String, Object>builder()
        .put("b0", new RuntimeException("Ouch!"))
        .put("b1", block1)
        .build(),
    };

    performInteractions(interactions, listener);

    verify(listener).onBlockFetchFailure(eq("b0"), (Throwable) any());
    verify(listener).onBlockFetchSuccess("b1", block1);
    verifyNoMoreInteractions(listener);
  }

  @Test
  public void testSingleIOExceptionOnFirst() throws IOException {
    BlockFetchingListener listener = mock(BlockFetchingListener.class);

    Map[] interactions = new Map[] {
      // IOException will cause a retry. Since b0 fails, we will retry both.
      ImmutableMap.<String, Object>builder()
        .put("b0", new IOException("Connection failed or something"))
        .put("b1", block1)
        .build(),
      ImmutableMap.<String, Object>builder()
        .put("b0", block0)
        .put("b1", block1)
        .build(),
    };

    performInteractions(interactions, listener);

    verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
    verify(listener, timeout(5000)).onBlockFetchSuccess("b1", block1);
    verifyNoMoreInteractions(listener);
  }

  @Test
  public void testSingleIOExceptionOnSecond() throws IOException {
    BlockFetchingListener listener = mock(BlockFetchingListener.class);

    Map[] interactions = new Map[] {
      // IOException will cause a retry. Since b1 fails, we will not retry b0.
      ImmutableMap.<String, Object>builder()
        .put("b0", block0)
        .put("b1", new IOException("Connection failed or something"))
        .build(),
      ImmutableMap.<String, Object>builder()
        .put("b1", block1)
        .build(),
    };

    performInteractions(interactions, listener);

    verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
    verify(listener, timeout(5000)).onBlockFetchSuccess("b1", block1);
    verifyNoMoreInteractions(listener);
  }

  @Test
  public void testTwoIOExceptions() throws IOException {
    BlockFetchingListener listener = mock(BlockFetchingListener.class);

    Map[] interactions = new Map[] {
      // b0's IOException will trigger retry, b1's will be ignored.
      ImmutableMap.<String, Object>builder()
        .put("b0", new IOException())
        .put("b1", new IOException())
        .build(),
      // Next, b0 is successful and b1 errors again, so we just request that one.
      ImmutableMap.<String, Object>builder()
        .put("b0", block0)
        .put("b1", new IOException())
        .build(),
      // b1 returns successfully within 2 retries.
      ImmutableMap.<String, Object>builder()
        .put("b1", block1)
        .build(),
    };

    performInteractions(interactions, listener);

    verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
    verify(listener, timeout(5000)).onBlockFetchSuccess("b1", block1);
    verifyNoMoreInteractions(listener);
  }

  @Test
  public void testThreeIOExceptions() throws IOException {
    BlockFetchingListener listener = mock(BlockFetchingListener.class);

    Map[] interactions = new Map[] {
      // b0's IOException will trigger retry, b1's will be ignored.
      ImmutableMap.<String, Object>builder()
        .put("b0", new IOException())
        .put("b1", new IOException())
        .build(),
      // Next, b0 is successful and b1 errors again, so we just request that one.
      ImmutableMap.<String, Object>builder()
        .put("b0", block0)
        .put("b1", new IOException())
        .build(),
      // b1 errors again, but this was the last retry
      ImmutableMap.<String, Object>builder()
        .put("b1", new IOException())
        .build(),
      // This is not reached -- b1 has failed.
      ImmutableMap.<String, Object>builder()
        .put("b1", block1)
        .build(),
    };

    performInteractions(interactions, listener);

    verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
    verify(listener, timeout(5000)).onBlockFetchFailure(eq("b1"), (Throwable) any());
    verifyNoMoreInteractions(listener);
  }

  @Test
  public void testRetryAndUnrecoverable() throws IOException {
    BlockFetchingListener listener = mock(BlockFetchingListener.class);

    Map[] interactions = new Map[] {
      // b0's IOException will trigger retry, subsequent messages will be ignored.
      ImmutableMap.<String, Object>builder()
        .put("b0", new IOException())
        .put("b1", new RuntimeException())
        .put("b2", block2)
        .build(),
      // Next, b0 is successful, b1 errors unrecoverably, and b2 triggers a retry.
      ImmutableMap.<String, Object>builder()
        .put("b0", block0)
        .put("b1", new RuntimeException())
        .put("b2", new IOException())
        .build(),
      // b2 succeeds in its last retry.
      ImmutableMap.<String, Object>builder()
        .put("b2", block2)
        .build(),
    };

    performInteractions(interactions, listener);

    verify(listener, timeout(5000)).onBlockFetchSuccess("b0", block0);
    verify(listener, timeout(5000)).onBlockFetchFailure(eq("b1"), (Throwable) any());
    verify(listener, timeout(5000)).onBlockFetchSuccess("b2", block2);
    verifyNoMoreInteractions(listener);
  }

  /**
   * Performs a set of interactions in response to block requests from a RetryingBlockFetcher.
   * Each interaction is a Map from BlockId to either ManagedBuffer or Exception. This interaction
   * means "respond to the next block fetch request with these Successful buffers and these Failure
   * exceptions". We verify that the expected block ids are exactly the ones requested.
   *
   * If multiple interactions are supplied, they will be used in order. This is useful for encoding
   * retries -- the first interaction may include an IOException, which causes a retry of some
   * subset of the original blocks in a second interaction.
   */
  @SuppressWarnings("unchecked")
  private void performInteractions(final Map[] interactions, BlockFetchingListener listener)
    throws IOException {

    TransportConf conf = new TransportConf(new SystemPropertyConfigProvider());
    BlockFetchStarter fetchStarter = mock(BlockFetchStarter.class);

    Stubber stub = null;

    // Contains all blockIds that are referenced across all interactions.
    final LinkedHashSet<String> blockIds = Sets.newLinkedHashSet();

    for (final Map<String, Object> interaction : interactions) {
      blockIds.addAll(interaction.keySet());

      Answer<Void> answer = new Answer<Void>() {
        @Override
        public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
          try {
            // Verify that the RetryingBlockFetcher requested the expected blocks.
            String[] requestedBlockIds = (String[]) invocationOnMock.getArguments()[0];
            String[] desiredBlockIds = interaction.keySet().toArray(new String[interaction.size()]);
            assertArrayEquals(desiredBlockIds, requestedBlockIds);

            // Now actually invoke the success/failure callbacks on each block.
            BlockFetchingListener retryListener =
              (BlockFetchingListener) invocationOnMock.getArguments()[1];
            for (Map.Entry<String, Object> block : interaction.entrySet()) {
              String blockId = block.getKey();
              Object blockValue = block.getValue();

              if (blockValue instanceof ManagedBuffer) {
                retryListener.onBlockFetchSuccess(blockId, (ManagedBuffer) blockValue);
              } else if (blockValue instanceof Exception) {
                retryListener.onBlockFetchFailure(blockId, (Exception) blockValue);
              } else {
                fail("Can only handle ManagedBuffers and Exceptions, got " + blockValue);
              }
            }
            return null;
          } catch (Throwable e) {
            e.printStackTrace();
            throw e;
          }
        }
      };

      // This is either the first stub, or should be chained behind the prior ones.
      if (stub == null) {
        stub = doAnswer(answer);
      } else {
        stub.doAnswer(answer);
      }
    }

    assert stub != null;
    stub.when(fetchStarter).createAndStart((String[]) any(), (BlockFetchingListener) anyObject());
    String[] blockIdArray = blockIds.toArray(new String[blockIds.size()]);
    new RetryingBlockFetcher(conf, fetchStarter, blockIdArray, listener).start();
  }
}
TOP

Related Classes of org.apache.spark.network.shuffle.RetryingBlockFetcherSuite

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.