Package org.apache.hadoop.fs

Source Code of org.apache.hadoop.fs.TestEnhancedByteBufferAccess$RestrictedAllocatingByteBufferPool

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;

import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeoutException;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.Random;

import org.apache.commons.lang.SystemUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.client.ClientMmap;
import org.apache.hadoop.hdfs.client.ClientMmapManager;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;

import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;

/**
* This class tests if EnhancedByteBufferAccess works correctly.
*/
public class TestEnhancedByteBufferAccess {
  private static final Log LOG =
      LogFactory.getLog(TestEnhancedByteBufferAccess.class.getName());

  static TemporarySocketDirectory sockDir;

  @BeforeClass
  public static void init() {
    sockDir = new TemporarySocketDirectory();
    DomainSocket.disableBindPathValidation();
  }

  private static byte[] byteBufferToArray(ByteBuffer buf) {
    byte resultArray[] = new byte[buf.remaining()];
    buf.get(resultArray);
    buf.flip();
    return resultArray;
  }
 
  public static HdfsConfiguration initZeroCopyTest() {
    Assume.assumeTrue(NativeIO.isAvailable());
    Assume.assumeTrue(SystemUtils.IS_OS_UNIX);
    HdfsConfiguration conf = new HdfsConfiguration();
    conf.setBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY, true);
    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
    conf.setInt(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_SIZE, 3);
    conf.setLong(DFSConfigKeys.DFS_CLIENT_MMAP_CACHE_TIMEOUT_MS, 100);
    conf.set(DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY,
        new File(sockDir.getDir(),
          "TestRequestMmapAccess._PORT.sock").getAbsolutePath());
    conf.setBoolean(DFSConfigKeys.
        DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY, true);
    return conf;
  }

  @Test
  public void testZeroCopyReads() throws Exception {
    HdfsConfiguration conf = initZeroCopyTest();
    MiniDFSCluster cluster = null;
    final Path TEST_PATH = new Path("/a");
    FSDataInputStream fsIn = null;
    final int TEST_FILE_LENGTH = 12345;
   
    FileSystem fs = null;
    try {
      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
      cluster.waitActive();
      fs = cluster.getFileSystem();
      DFSTestUtil.createFile(fs, TEST_PATH,
          TEST_FILE_LENGTH, (short)1, 7567L);
      try {
        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
      } catch (InterruptedException e) {
        Assert.fail("unexpected InterruptedException during " +
            "waitReplication: " + e);
      } catch (TimeoutException e) {
        Assert.fail("unexpected TimeoutException during " +
            "waitReplication: " + e);
      }
      fsIn = fs.open(TEST_PATH);
      byte original[] = new byte[TEST_FILE_LENGTH];
      IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
      fsIn.close();
      fsIn = fs.open(TEST_PATH);
      ByteBuffer result = fsIn.read(null, 4096,
          EnumSet.of(ReadOption.SKIP_CHECKSUMS));
      Assert.assertEquals(4096, result.remaining());
      HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
      Assert.assertEquals(4096,
          dfsIn.getReadStatistics().getTotalBytesRead());
      Assert.assertEquals(4096,
          dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
      Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096),
          byteBufferToArray(result));
      fsIn.releaseBuffer(result);
    } finally {
      if (fsIn != null) fsIn.close();
      if (fs != null) fs.close();
      if (cluster != null) cluster.shutdown();
    }
  }
 
  @Test
  public void testShortZeroCopyReads() throws Exception {
    HdfsConfiguration conf = initZeroCopyTest();
    MiniDFSCluster cluster = null;
    final Path TEST_PATH = new Path("/a");
    FSDataInputStream fsIn = null;
    final int TEST_FILE_LENGTH = 12345;
   
    FileSystem fs = null;
    try {
      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
      cluster.waitActive();
      fs = cluster.getFileSystem();
      DFSTestUtil.createFile(fs, TEST_PATH, TEST_FILE_LENGTH, (short)1, 7567L);
      try {
        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
      } catch (InterruptedException e) {
        Assert.fail("unexpected InterruptedException during " +
            "waitReplication: " + e);
      } catch (TimeoutException e) {
        Assert.fail("unexpected TimeoutException during " +
            "waitReplication: " + e);
      }
      fsIn = fs.open(TEST_PATH);
      byte original[] = new byte[TEST_FILE_LENGTH];
      IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
      fsIn.close();
      fsIn = fs.open(TEST_PATH);

      // Try to read 8192, but only get 4096 because of the block size.
      HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
      ByteBuffer result =
        dfsIn.read(null, 8192, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
      Assert.assertEquals(4096, result.remaining());
      Assert.assertEquals(4096,
          dfsIn.getReadStatistics().getTotalBytesRead());
      Assert.assertEquals(4096,
          dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
      Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096),
          byteBufferToArray(result));
      dfsIn.releaseBuffer(result);
     
      // Try to read 4097, but only get 4096 because of the block size.
      result =
          dfsIn.read(null, 4097, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
      Assert.assertEquals(4096, result.remaining());
      Assert.assertArrayEquals(Arrays.copyOfRange(original, 4096, 8192),
          byteBufferToArray(result));
      dfsIn.releaseBuffer(result);
    } finally {
      if (fsIn != null) fsIn.close();
      if (fs != null) fs.close();
      if (cluster != null) cluster.shutdown();
    }
  }

  @Test
  public void testZeroCopyReadsNoFallback() throws Exception {
    HdfsConfiguration conf = initZeroCopyTest();
    MiniDFSCluster cluster = null;
    final Path TEST_PATH = new Path("/a");
    FSDataInputStream fsIn = null;
    final int TEST_FILE_LENGTH = 12345;
   
    FileSystem fs = null;
    try {
      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
      cluster.waitActive();
      fs = cluster.getFileSystem();
      DFSTestUtil.createFile(fs, TEST_PATH,
          TEST_FILE_LENGTH, (short)1, 7567L);
      try {
        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
      } catch (InterruptedException e) {
        Assert.fail("unexpected InterruptedException during " +
            "waitReplication: " + e);
      } catch (TimeoutException e) {
        Assert.fail("unexpected TimeoutException during " +
            "waitReplication: " + e);
      }
      fsIn = fs.open(TEST_PATH);
      byte original[] = new byte[TEST_FILE_LENGTH];
      IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
      fsIn.close();
      fsIn = fs.open(TEST_PATH);
      HdfsDataInputStream dfsIn = (HdfsDataInputStream)fsIn;
      ByteBuffer result;
      try {
        result = dfsIn.read(null, 4097, EnumSet.noneOf(ReadOption.class));
        Assert.fail("expected UnsupportedOperationException");
      } catch (UnsupportedOperationException e) {
        // expected
      }
      result = dfsIn.read(null, 4096, EnumSet.of(ReadOption.SKIP_CHECKSUMS));
      Assert.assertEquals(4096, result.remaining());
      Assert.assertEquals(4096,
          dfsIn.getReadStatistics().getTotalBytesRead());
      Assert.assertEquals(4096,
          dfsIn.getReadStatistics().getTotalZeroCopyBytesRead());
      Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 4096),
          byteBufferToArray(result));
    } finally {
      if (fsIn != null) fsIn.close();
      if (fs != null) fs.close();
      if (cluster != null) cluster.shutdown();
    }
  }

  private static class CountingVisitor
      implements ClientMmapManager.ClientMmapVisitor {
    int count = 0;

    @Override
    public void accept(ClientMmap mmap) {
      count++;
    }

    public void reset() {
      count = 0;
    }
  }

  @Test
  public void testZeroCopyMmapCache() throws Exception {
    HdfsConfiguration conf = initZeroCopyTest();
    MiniDFSCluster cluster = null;
    final Path TEST_PATH = new Path("/a");
    final int TEST_FILE_LENGTH = 16385;
    final int RANDOM_SEED = 23453;
    FSDataInputStream fsIn = null;
    ByteBuffer results[] = { null, null, null, null, null };
   
    DistributedFileSystem fs = null;
    try {
      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
      cluster.waitActive();
      fs = cluster.getFileSystem();
      DFSTestUtil.createFile(fs, TEST_PATH,
          TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
      try {
        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
      } catch (InterruptedException e) {
        Assert.fail("unexpected InterruptedException during " +
            "waitReplication: " + e);
      } catch (TimeoutException e) {
        Assert.fail("unexpected TimeoutException during " +
            "waitReplication: " + e);
      }
      fsIn = fs.open(TEST_PATH);
      byte original[] = new byte[TEST_FILE_LENGTH];
      IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
      fsIn.close();
      fsIn = fs.open(TEST_PATH);
      final ClientMmapManager mmapManager = fs.getClient().getMmapManager();
      final CountingVisitor countingVisitor = new CountingVisitor();
      mmapManager.visitMmaps(countingVisitor);
      Assert.assertEquals(0, countingVisitor.count);
      mmapManager.visitEvictable(countingVisitor);
      Assert.assertEquals(0, countingVisitor.count);
      results[0] = fsIn.read(null, 4096,
          EnumSet.of(ReadOption.SKIP_CHECKSUMS));
      fsIn.seek(0);
      results[1] = fsIn.read(null, 4096,
          EnumSet.of(ReadOption.SKIP_CHECKSUMS));
      mmapManager.visitMmaps(countingVisitor);
      Assert.assertEquals(1, countingVisitor.count);
      countingVisitor.reset();
      mmapManager.visitEvictable(countingVisitor);
      Assert.assertEquals(0, countingVisitor.count);
      countingVisitor.reset();

      // The mmaps should be of the first block of the file.
      final ExtendedBlock firstBlock = DFSTestUtil.getFirstBlock(fs, TEST_PATH);
      mmapManager.visitMmaps(new ClientMmapManager.ClientMmapVisitor() {
        @Override
        public void accept(ClientMmap mmap) {
          Assert.assertEquals(firstBlock, mmap.getBlock());
        }
      });

      // Read more blocks.
      results[2] = fsIn.read(null, 4096,
          EnumSet.of(ReadOption.SKIP_CHECKSUMS));
      results[3] = fsIn.read(null, 4096,
          EnumSet.of(ReadOption.SKIP_CHECKSUMS));
      try {
        results[4] = fsIn.read(null, 4096,
            EnumSet.of(ReadOption.SKIP_CHECKSUMS));
        Assert.fail("expected UnsupportedOperationException");
      } catch (UnsupportedOperationException e) {
        // expected
      }

      // we should have 3 mmaps, 0 evictable
      mmapManager.visitMmaps(countingVisitor);
      Assert.assertEquals(3, countingVisitor.count);
      countingVisitor.reset();
      mmapManager.visitEvictable(countingVisitor);
      Assert.assertEquals(0, countingVisitor.count);

      // After we close the cursors, the mmaps should be evictable for
      // a brief period of time.  Then, they should be closed (we're
      // using a very quick timeout)
      for (ByteBuffer buffer : results) {
        if (buffer != null) {
          fsIn.releaseBuffer(buffer);
        }
      }
      GenericTestUtils.waitFor(new Supplier<Boolean>() {
        public Boolean get() {
          countingVisitor.reset();
          try {
            mmapManager.visitEvictable(countingVisitor);
          } catch (InterruptedException e) {
            e.printStackTrace();
            return false;
          }
          return (0 == countingVisitor.count);
        }
      }, 10, 10000);
      countingVisitor.reset();
      mmapManager.visitMmaps(countingVisitor);
      Assert.assertEquals(0, countingVisitor.count);
    } finally {
      if (fsIn != null) fsIn.close();
      if (fs != null) fs.close();
      if (cluster != null) cluster.shutdown();
    }
  }

  /**
   * Test HDFS fallback reads.  HDFS streams support the ByteBufferReadable
   * interface.
   */
  @Test
  public void testHdfsFallbackReads() throws Exception {
    HdfsConfiguration conf = initZeroCopyTest();
    MiniDFSCluster cluster = null;
    final Path TEST_PATH = new Path("/a");
    final int TEST_FILE_LENGTH = 16385;
    final int RANDOM_SEED = 23453;
    FSDataInputStream fsIn = null;
   
    DistributedFileSystem fs = null;
    try {
      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
      cluster.waitActive();
      fs = cluster.getFileSystem();
      DFSTestUtil.createFile(fs, TEST_PATH,
          TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
      try {
        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
      } catch (InterruptedException e) {
        Assert.fail("unexpected InterruptedException during " +
            "waitReplication: " + e);
      } catch (TimeoutException e) {
        Assert.fail("unexpected TimeoutException during " +
            "waitReplication: " + e);
      }
      fsIn = fs.open(TEST_PATH);
      byte original[] = new byte[TEST_FILE_LENGTH];
      IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
      fsIn.close();
      fsIn = fs.open(TEST_PATH);
      testFallbackImpl(fsIn, original);
    } finally {
      if (fsIn != null) fsIn.close();
      if (fs != null) fs.close();
      if (cluster != null) cluster.shutdown();
    }
  }

  private static class RestrictedAllocatingByteBufferPool
      implements ByteBufferPool {
    private final boolean direct;

    RestrictedAllocatingByteBufferPool(boolean direct) {
      this.direct = direct;
    }
    @Override
    public ByteBuffer getBuffer(boolean direct, int length) {
      Preconditions.checkArgument(this.direct == direct);
      return direct ? ByteBuffer.allocateDirect(length) :
        ByteBuffer.allocate(length);
    }
    @Override
    public void putBuffer(ByteBuffer buffer) {
    }
  }
 
  private static void testFallbackImpl(InputStream stream,
      byte original[]) throws Exception {
    RestrictedAllocatingByteBufferPool bufferPool =
        new RestrictedAllocatingByteBufferPool(
            stream instanceof ByteBufferReadable);

    ByteBuffer result = ByteBufferUtil.fallbackRead(stream, bufferPool, 10);
    Assert.assertEquals(10, result.remaining());
    Assert.assertArrayEquals(Arrays.copyOfRange(original, 0, 10),
        byteBufferToArray(result));

    result = ByteBufferUtil.fallbackRead(stream, bufferPool, 5000);
    Assert.assertEquals(5000, result.remaining());
    Assert.assertArrayEquals(Arrays.copyOfRange(original, 10, 5010),
        byteBufferToArray(result));

    result = ByteBufferUtil.fallbackRead(stream, bufferPool, 9999999);
    Assert.assertEquals(11375, result.remaining());
    Assert.assertArrayEquals(Arrays.copyOfRange(original, 5010, 16385),
        byteBufferToArray(result));

    result = ByteBufferUtil.fallbackRead(stream, bufferPool, 10);
    Assert.assertNull(result);
  }

  /**
   * Test the {@link ByteBufferUtil#fallbackRead} function directly.
   */
  @Test
  public void testFallbackRead() throws Exception {
    HdfsConfiguration conf = initZeroCopyTest();
    MiniDFSCluster cluster = null;
    final Path TEST_PATH = new Path("/a");
    final int TEST_FILE_LENGTH = 16385;
    final int RANDOM_SEED = 23453;
    FSDataInputStream fsIn = null;
   
    DistributedFileSystem fs = null;
    try {
      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
      cluster.waitActive();
      fs = cluster.getFileSystem();
      DFSTestUtil.createFile(fs, TEST_PATH,
          TEST_FILE_LENGTH, (short)1, RANDOM_SEED);
      try {
        DFSTestUtil.waitReplication(fs, TEST_PATH, (short)1);
      } catch (InterruptedException e) {
        Assert.fail("unexpected InterruptedException during " +
            "waitReplication: " + e);
      } catch (TimeoutException e) {
        Assert.fail("unexpected TimeoutException during " +
            "waitReplication: " + e);
      }
      fsIn = fs.open(TEST_PATH);
      byte original[] = new byte[TEST_FILE_LENGTH];
      IOUtils.readFully(fsIn, original, 0, TEST_FILE_LENGTH);
      fsIn.close();
      fsIn = fs.open(TEST_PATH);
      testFallbackImpl(fsIn, original);
    } finally {
      if (fsIn != null) fsIn.close();
      if (fs != null) fs.close();
      if (cluster != null) cluster.shutdown();
    }
  }

  /**
   * Test fallback reads on a stream which does not support the
   * ByteBufferReadable * interface.
   */
  @Test
  public void testIndirectFallbackReads() throws Exception {
    final File TEST_DIR = new File(
      System.getProperty("test.build.data","build/test/data"));
    final String TEST_PATH = TEST_DIR + File.separator +
        "indirectFallbackTestFile";
    final int TEST_FILE_LENGTH = 16385;
    final int RANDOM_SEED = 23453;
    FileOutputStream fos = null;
    FileInputStream fis = null;
    try {
      fos = new FileOutputStream(TEST_PATH);
      Random random = new Random(RANDOM_SEED);
      byte original[] = new byte[TEST_FILE_LENGTH];
      random.nextBytes(original);
      fos.write(original);
      fos.close();
      fos = null;
      fis = new FileInputStream(TEST_PATH);
      testFallbackImpl(fis, original);
    } finally {
      IOUtils.cleanup(LOG, fos, fis);
      new File(TEST_PATH).delete();
    }
  }
}
TOP

Related Classes of org.apache.hadoop.fs.TestEnhancedByteBufferAccess$RestrictedAllocatingByteBufferPool

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.