Package org.apache.hama.bsp.message

Source Code of org.apache.hama.bsp.message.TestMessageIO

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hama.bsp.message;

import java.io.EOFException;
import java.io.File;
import java.io.RandomAccessFile;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.security.SecureRandom;
import java.util.Iterator;

import junit.framework.TestCase;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.bsp.Combiner;
import org.apache.hama.bsp.message.io.CombineSpilledDataProcessor;
import org.apache.hama.bsp.message.io.DirectByteBufferInputStream;
import org.apache.hama.bsp.message.io.DirectByteBufferOutputStream;
import org.apache.hama.bsp.message.io.ReusableByteBuffer;
import org.apache.hama.bsp.message.io.SpilledByteBuffer;
import org.apache.hama.bsp.message.io.SpilledDataInputBuffer;
import org.apache.hama.bsp.message.io.SpilledDataProcessor;
import org.apache.hama.bsp.message.io.SpillingDataOutputBuffer;
import org.apache.hama.bsp.message.io.SyncFlushByteBufferOutputStream;
import org.apache.hama.bsp.message.io.SyncReadByteBufferInputStream;
import org.apache.hama.bsp.message.io.WriteSpilledDataProcessor;

public class TestMessageIO extends TestCase {

  public void testNonSpillBuffer() throws Exception {

    SpillingDataOutputBuffer outputBuffer = new SpillingDataOutputBuffer();
    Text text = new Text("Testing the spillage of spilling buffer");

    for (int i = 0; i < 100; ++i) {
      text.write(outputBuffer);
    }
    assertTrue(outputBuffer != null);
    assertTrue(outputBuffer.size() == 4000);
    assertFalse(outputBuffer.hasSpilled());
    outputBuffer.close();
  }

  public void testSpillBuffer() throws Exception {

    Configuration conf = new HamaConfiguration();
    String fileName = System.getProperty("java.io.tmpdir") + File.separatorChar
        + new BigInteger(128, new SecureRandom()).toString(32);
    SpilledDataProcessor processor = new WriteSpilledDataProcessor(fileName);
    processor.init(conf);
    SpillingDataOutputBuffer outputBuffer = new SpillingDataOutputBuffer(2,
        1024, 1024, true, processor);
    Text text = new Text("Testing the spillage of spilling buffer");
    for (int i = 0; i < 100; ++i) {
      text.write(outputBuffer);
    }

    assertTrue(outputBuffer != null);
    assertTrue(outputBuffer.size() == 4000);
    assertTrue(outputBuffer.hasSpilled());
    File f = new File(fileName);
    assertTrue(f.exists());
    assertTrue(f.delete());
    outputBuffer.close();

  }

  public static class SumCombiner extends Combiner<IntWritable> {

    @Override
    public IntWritable combine(Iterable<IntWritable> messages) {
      int sum = 0;
      for (IntWritable intObj : messages) {
        sum += intObj.get();
      }
      return new IntWritable(sum);
    }

  }

  public void testSpillingByteBuffer() throws Exception {
    ByteBuffer buffer = ByteBuffer.allocateDirect(512);
    SpilledByteBuffer spillBuffer = new SpilledByteBuffer(buffer);
    for (int i = 0; i < 100; ++i) {
      spillBuffer.putInt(i);
      spillBuffer.markEndOfRecord();
    }
    spillBuffer.putInt(100);
    assertEquals(spillBuffer.getMarkofLastRecord(), 400);
    assertEquals(spillBuffer.remaining(), (512 - 404));
    spillBuffer.flip();
    assertEquals(spillBuffer.remaining(), 404);
    assertEquals(spillBuffer.getMarkofLastRecord(), 400);

  }

  public void testDirectByteBufferOutput() throws Exception {

    ByteBuffer buffer = ByteBuffer.allocateDirect(512);
    DirectByteBufferOutputStream stream = new DirectByteBufferOutputStream();
    stream.setBuffer(buffer);
    IntWritable intWritable = new IntWritable(1);

    for (int i = 0; i < 100; ++i) {
      intWritable.set(i);
      intWritable.write(stream);
    }

    stream.close();

    buffer.flip();
    for (int i = 0; i < 100; ++i) {
      assertEquals(i, buffer.getInt());
    }

    try {
      buffer.getInt();
      assertTrue(false);
    } catch (Exception e) {
      assertTrue(true);
    }

  }

  public void testDirectByteBufferInput() throws Exception {
    ByteBuffer buffer = ByteBuffer.allocateDirect(512);
    DirectByteBufferOutputStream stream = new DirectByteBufferOutputStream();
    stream.setBuffer(buffer);
    IntWritable intWritable = new IntWritable(1);

    for (int i = 0; i < 100; ++i) {
      intWritable.set(i);
      intWritable.write(stream);
    }
    intWritable.write(stream);

    stream.close();

    buffer.flip();

    DirectByteBufferInputStream inStream = new DirectByteBufferInputStream();

    inStream.setBuffer(new SpilledByteBuffer(buffer, 400));
    for (int i = 0; i < 100; ++i) {
      intWritable.readFields(inStream);
      assertEquals(i, intWritable.get());
    }

    assertFalse(inStream.hasDataToRead());
    assertTrue(inStream.hasUnmarkData());
    inStream.prepareForNext();

    // push in another buffer and check if the unmarked data could be read.

    buffer.clear();
    stream = new DirectByteBufferOutputStream();
    buffer = ByteBuffer.allocateDirect(2048);
    stream.setBuffer(buffer);

    for (int i = 0; i < 400; ++i) {
      intWritable.set(i);
      intWritable.write(stream);
    }
    stream.close();
    buffer.flip();

    inStream.setBuffer(new SpilledByteBuffer(buffer, 400));

    // Read previous data
    intWritable.readFields(inStream);
    assertEquals(99, intWritable.get());

    for (int i = 0; i < 100; ++i) {
      intWritable.readFields(inStream);
      assertEquals(i, intWritable.get());
    }

    assertFalse(inStream.hasDataToRead());
    assertTrue(inStream.hasUnmarkData());
    inStream.prepareForNext();

    buffer.clear();
    stream = new DirectByteBufferOutputStream();
    stream.setBuffer(buffer);

    for (int i = 0; i < 100; ++i) {
      intWritable.set(i);
      intWritable.write(stream);
    }
    stream.close();
    buffer.flip();

    inStream.setBuffer(new SpilledByteBuffer(buffer, 400));

    // Read previous data with resized intermediate buffer
    for (int i = 100; i < 400; ++i) {
      intWritable.readFields(inStream);
      assertEquals(i, intWritable.get());
    }

    for (int i = 0; i < 100; ++i) {
      intWritable.readFields(inStream);
      assertEquals(i, intWritable.get());
    }

    assertFalse(inStream.hasDataToRead());
    assertFalse(inStream.hasUnmarkData());

  }

  /**
   *
   * @throws Exception
   */
  public void testReusableByteBufferIter() throws Exception {

    ReusableByteBuffer<IntWritable> reuseByteBuffer = new ReusableByteBuffer<IntWritable>(
        new IntWritable());

    ByteBuffer buffer = ByteBuffer.allocateDirect(512);
    DirectByteBufferOutputStream stream = new DirectByteBufferOutputStream();
    stream.setBuffer(buffer);
    IntWritable intWritable = new IntWritable(1);

    for (int i = 0; i < 100; ++i) {
      intWritable.set(i);
      intWritable.write(stream);
    }
    intWritable.write(stream);
    stream.close();
    buffer.flip();
    reuseByteBuffer.set(new SpilledByteBuffer(buffer, 400));

    Iterator<IntWritable> iter = reuseByteBuffer.iterator();
    int j = 0;
    while (iter.hasNext()) {
      assertEquals(iter.next().get(), j++);
    }
    assertEquals(j, 100);
    reuseByteBuffer.prepareForNext();

    buffer.clear();

    stream = new DirectByteBufferOutputStream();
    stream.setBuffer(buffer);

    for (int i = 0; i < 101; ++i) {
      intWritable.set(i);
      intWritable.write(stream);
    }
    stream.close();
    buffer.flip();

    reuseByteBuffer.set(new SpilledByteBuffer(buffer, 404));
    iter = reuseByteBuffer.iterator();
    assertEquals(iter.next().get(), 99);

    j = 0;
    while (iter.hasNext()) {
      assertEquals(iter.next().get(), j++);
    }
    buffer.clear();
  }

  public void testCombineProcessor() throws Exception {
    String fileName = System.getProperty("java.io.tmpdir") + File.separatorChar
        + new BigInteger(128, new SecureRandom()).toString(32);

    ByteBuffer buffer = ByteBuffer.allocateDirect(512);
    DirectByteBufferOutputStream stream = new DirectByteBufferOutputStream();
    stream.setBuffer(buffer);
    IntWritable intWritable = new IntWritable(1);
    int sum = 0;
    for (int i = 0; i < 100; ++i) {
      intWritable.set(i);
      intWritable.write(stream);
      sum += i;
    }
    intWritable.write(stream);
    stream.close();
    buffer.flip();

    Configuration conf = new HamaConfiguration();

    conf.setClass(Constants.MESSAGE_CLASS, IntWritable.class, Writable.class);
    conf.setClass(Constants.COMBINER_CLASS, SumCombiner.class, Combiner.class);

    CombineSpilledDataProcessor<IntWritable> processor = new CombineSpilledDataProcessor<IntWritable>(
        fileName);
    assertTrue(processor.init(conf));
    File f = new File(fileName);
    try {
      assertTrue(processor.handleSpilledBuffer(new SpilledByteBuffer(buffer,
          400)));
      buffer.flip();
      assertTrue(processor.handleSpilledBuffer(new SpilledByteBuffer(buffer,
          400)));
      assertTrue(processor.close());

      assertTrue(f.exists());
      assertEquals(f.length(), 8);

      RandomAccessFile raf = new RandomAccessFile(fileName, "r");
      FileChannel fileChannel = raf.getChannel();
      ByteBuffer readBuff = ByteBuffer.allocateDirect(16);
      fileChannel.read(readBuff);
      readBuff.flip();
      assertEquals(readBuff.getInt(), sum);
      assertEquals(readBuff.getInt(), sum + 99);
    } finally {
      assertTrue(f.delete());
    }

  }

  public void testSpillInputStream() throws Exception {

    File f = null;
    try {
      String fileName = System.getProperty("java.io.tmpdir")
          + File.separatorChar + "testSpillInputStream.txt";
      Configuration conf = new HamaConfiguration();
      SpilledDataProcessor processor = new WriteSpilledDataProcessor(fileName);
      processor.init(conf);
      SpillingDataOutputBuffer outputBuffer = new SpillingDataOutputBuffer(2,
          1024, 1024, true, processor);
      Text text = new Text("Testing the spillage of spilling buffer");
      for (int i = 0; i < 100; ++i) {
        text.write(outputBuffer);
        outputBuffer.markRecordEnd();
      }

      assertTrue(outputBuffer != null);
      assertTrue(outputBuffer.size() == 4000);
      assertTrue(outputBuffer.hasSpilled());
      f = new File(fileName);
      assertTrue(f.exists());
      outputBuffer.close();
      assertTrue(f.length() == 4000);// + (4000 / 1024 + 1) * 4));

      SpilledDataInputBuffer inputBuffer = outputBuffer
          .getInputStreamToRead(fileName);

      for (int i = 0; i < 100; ++i) {
        text.readFields(inputBuffer);
        assertTrue("Testing the spillage of spilling buffer".equals(text
            .toString()));
        text.clear();
      }

      try {
        text.readFields(inputBuffer);
        assertTrue(false);
      } catch (EOFException eof) {
        assertTrue(true);
      }

      inputBuffer.close();
      inputBuffer.completeReading(false);
      assertTrue(f.exists());
      inputBuffer.completeReading(true);
      assertFalse(f.exists());
    } finally {
      if (f != null) {
        if (f.exists()) {
          f.delete();
        }
      }
    }

  }

  public void testSyncFlushByteBufferOutputStream() throws Exception {

    File f = null;
    try {
      String fileName = System.getProperty("java.io.tmpdir")
          + File.separatorChar + "testSyncFlushByteBufferOutputStream.txt";
      SyncFlushByteBufferOutputStream stream = new SyncFlushByteBufferOutputStream(
          fileName);
      DirectByteBufferOutputStream syncFlushStream = new DirectByteBufferOutputStream(
          stream);
      ByteBuffer buffer = ByteBuffer.allocateDirect(512);
      syncFlushStream.setBuffer(buffer);
      IntWritable intWritable = new IntWritable(1);

      for (int i = 0; i < 200; ++i) {
        intWritable.set(i);
        intWritable.write(syncFlushStream);
      }
      intWritable.write(syncFlushStream);
      syncFlushStream.close();

      f = new File(fileName);
      assertTrue(f.exists());
      assertTrue(f.length() == 804);
      assertTrue(f.delete());
    } finally {
      if (f != null) {
        f.delete();
      }
    }

  }

  public void testSyncFlushBufferInputStream() throws Exception {
    File f = null;
    try {
      String fileName = System.getProperty("java.io.tmpdir")
          + File.separatorChar + "testSyncFlushBufferInputStream.txt";
      SyncFlushByteBufferOutputStream stream = new SyncFlushByteBufferOutputStream(
          fileName);
      DirectByteBufferOutputStream syncFlushStream = new DirectByteBufferOutputStream(
          stream);
      ByteBuffer buffer = ByteBuffer.allocateDirect(512);
      syncFlushStream.setBuffer(buffer);
      IntWritable intWritable = new IntWritable(1);

      for (int i = 0; i < 200; ++i) {
        intWritable.set(i);
        intWritable.write(syncFlushStream);
      }
      intWritable.write(syncFlushStream);
      syncFlushStream.close();

      f = new File(fileName);
      assertTrue(f.exists());
      assertEquals(f.length(), 804);

      SyncReadByteBufferInputStream syncReadStream = new SyncReadByteBufferInputStream(
          stream.isSpilled(), fileName);
      DirectByteBufferInputStream inStream = new DirectByteBufferInputStream(
          syncReadStream);
      buffer.clear();
      inStream.setBuffer(buffer);

      for (int i = 0; i < 200; ++i) {
        intWritable.readFields(inStream);
        assertEquals(intWritable.get(), i);
      }

      intWritable.readFields(inStream);
      assertEquals(intWritable.get(), 199);

      try {
        intWritable.readFields(inStream);
        assertFalse(true);
      } catch (Exception e) {
        assertTrue(true);
      }

      inStream.close();
      syncFlushStream.close();

    } finally {
      if (f != null) {
        f.delete();
      }
    }
  }

}
TOP

Related Classes of org.apache.hama.bsp.message.TestMessageIO

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.