Package com.datasalt.pangool.tuplemr.serialization

Source Code of com.datasalt.pangool.tuplemr.serialization.TestTupleFieldSerialization

/**
* Copyright [2012] [Datasalt Systems S.L.]
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datasalt.pangool.tuplemr.serialization;

import static org.junit.Assert.assertEquals;

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.junit.Test;

import com.datasalt.pangool.io.Fields;
import com.datasalt.pangool.io.ITuple;
import com.datasalt.pangool.io.Schema;
import com.datasalt.pangool.io.Schema.Field;
import com.datasalt.pangool.io.Schema.Field.Type;
import com.datasalt.pangool.io.Tuple;
import com.datasalt.pangool.tuplemr.TupleMRBuilder;
import com.datasalt.pangool.tuplemr.TupleMRException;
import com.datasalt.pangool.tuplemr.TupleMapper;
import com.datasalt.pangool.tuplemr.TupleReducer;
import com.datasalt.pangool.tuplemr.mapred.lib.input.HadoopInputFormat;
import com.datasalt.pangool.tuplemr.mapred.lib.output.HadoopOutputFormat;
import com.datasalt.pangool.utils.test.AbstractHadoopTestLibrary;

/**
* This unit test checks that it is possible to serialize a Tuple inside a Tuple by using stateful serialization
* {@link TupleFieldSerialization}. It performs a kind of join where two different data sources with no common fields
* at all are joined by a new field (partitionId).
*/
@SuppressWarnings("serial")
public class TestTupleFieldSerialization extends AbstractHadoopTestLibrary implements Serializable {

  public final static String INPUT1 = "in-1-" + TestTupleFieldSerialization.class.getName();
  public final static String INPUT2 = "in-2-" + TestTupleFieldSerialization.class.getName();
  public final static String OUTPUT = "out-" + TestTupleFieldSerialization.class.getName();

  @SuppressWarnings("deprecation")
  @Test
  public void test() throws Exception {
    initHadoop();
    trash(INPUT1, INPUT2, OUTPUT);

    // Prepare input
    BufferedWriter writer;

    // INPUT1
    writer = new BufferedWriter(new FileWriter(INPUT1));
    writer.write("foo1" + "\t" + "30" + "\n");
    writer.write("foo2" + "\t" + "20" + "\n");
    writer.write("foo3" + "\t" + "140" + "\n");
    writer.write("foo4" + "\t" + "110" + "\n");
    writer.write("foo5" + "\t" + "220" + "\n");
    writer.write("foo6" + "\t" + "260" + "\n");
    writer.close();

    // INPUT2
    writer = new BufferedWriter(new FileWriter(INPUT2));
    writer.write("4.5" + "\t" + "true" + "\n");
    writer.write("4.6" + "\t" + "false" + "\n");
    writer.close();

    TupleMRBuilder builder = new TupleMRBuilder(getConf());

    final Schema tupleSchema1 = new Schema("tupleSchema1", Fields.parse("a:string, b:int"));
    final Schema tupleSchema2 = new Schema("tupleSchema2", Fields.parse("c:double, d:boolean"));

    List<Field> fields = new ArrayList<Field>();
    fields.add(Field.create("partitionId", Type.INT));
    fields.add(Fields.createTupleField("tuple1", tupleSchema1));
    final Schema schema1 = new Schema("tupleInTuple1", fields);

    fields.clear();
    fields.add(Field.create("partitionId", Type.INT));
    fields.add(Fields.createTupleField("tuple2", tupleSchema2));
    final Schema schema2 = new Schema("tupleInTuple2", fields);

    builder.addIntermediateSchema(schema1);
    builder.addIntermediateSchema(schema2);

    builder.addInput(new Path(INPUT1), new HadoopInputFormat(TextInputFormat.class),
        new TupleMapper<LongWritable, Text>() {

          ITuple tupleInTuple1 = new Tuple(schema1);
          ITuple tuple1 = new Tuple(tupleSchema1);

          @Override
          public void map(LongWritable key, Text value, TupleMRContext context, Collector collector)
              throws IOException, InterruptedException {
            String[] split = value.toString().split("\t");
            tuple1.set("a", split[0]);
            tuple1.set("b", Integer.parseInt(split[1]));

            tupleInTuple1.set("partitionId", 0);
            tupleInTuple1.set("tuple1", tuple1);
            collector.write(tupleInTuple1);
          }
        });

    builder.addInput(new Path(INPUT2), new HadoopInputFormat(TextInputFormat.class),
        new TupleMapper<LongWritable, Text>() {

          ITuple tupleInTuple2 = new Tuple(schema2);
          ITuple tuple2 = new Tuple(tupleSchema2);

          @Override
          public void map(LongWritable key, Text value, TupleMRContext context, Collector collector)
              throws IOException, InterruptedException {
            String[] split = value.toString().split("\t");
            tuple2.set("c", Double.parseDouble(split[0]));
            tuple2.set("d", Boolean.parseBoolean(split[1]));

            tupleInTuple2.set("partitionId", 0);
            tupleInTuple2.set("tuple2", tuple2);
            collector.write(tupleInTuple2);
          }
        });

    builder.setTupleReducer(new TupleReducer<Text, NullWritable>() {

      public void reduce(ITuple group, Iterable<ITuple> tuples, TupleMRContext context, Collector collector)
          throws IOException, InterruptedException, TupleMRException {
       
        Iterator<ITuple> iterator = tuples.iterator();
        ITuple currentTuple;
       
        assertEquals(0, group.get("partitionId"));
       
        currentTuple = iterator.next();
        assertEquals("foo1", ((ITuple)currentTuple.get("tuple1")).get("a").toString());
        assertEquals(30, ((ITuple)currentTuple.get("tuple1")).get("b"));
       
        currentTuple = iterator.next();
        assertEquals("foo2", ((ITuple)currentTuple.get("tuple1")).get("a").toString());
        assertEquals(20, ((ITuple)currentTuple.get("tuple1")).get("b"));

        currentTuple = iterator.next();
        assertEquals("foo3", ((ITuple)currentTuple.get("tuple1")).get("a").toString());
        assertEquals(140, ((ITuple)currentTuple.get("tuple1")).get("b"));

        currentTuple = iterator.next();
        assertEquals("foo4", ((ITuple)currentTuple.get("tuple1")).get("a").toString());
        assertEquals(110, ((ITuple)currentTuple.get("tuple1")).get("b"));

        currentTuple = iterator.next();
        assertEquals("foo5", ((ITuple)currentTuple.get("tuple1")).get("a").toString());
        assertEquals(220, ((ITuple)currentTuple.get("tuple1")).get("b"));

        currentTuple = iterator.next();
        assertEquals("foo6", ((ITuple)currentTuple.get("tuple1")).get("a").toString());
        assertEquals(260, ((ITuple)currentTuple.get("tuple1")).get("b"));

        // Second data source BEGINS
        currentTuple = iterator.next();
        assertEquals(4.5, ((ITuple)currentTuple.get("tuple2")).get("c"));
        assertEquals(true, ((ITuple)currentTuple.get("tuple2")).get("d"));
       
        currentTuple = iterator.next();
        assertEquals(4.6, ((ITuple)currentTuple.get("tuple2")).get("c"));
        assertEquals(false, ((ITuple)currentTuple.get("tuple2")).get("d"));
      };
    });
    builder.setGroupByFields("partitionId");
    builder.setOutput(new Path(OUTPUT), new HadoopOutputFormat(TextOutputFormat.class), Text.class, NullWritable.class);
    Job job = builder.createJob();
    try {
      job.waitForCompletion(true);
    } finally {
      builder.cleanUpInstanceFiles();
    }
    trash(INPUT1, INPUT2, OUTPUT);
  }
}
TOP

Related Classes of com.datasalt.pangool.tuplemr.serialization.TestTupleFieldSerialization

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.