Package org.apache.trevni.avro.mapreduce

Source Code of org.apache.trevni.avro.mapreduce.TestKeyWordCount$Counter

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.trevni.avro.mapreduce;

import static org.junit.Assert.assertEquals;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericData.Record;
import org.apache.avro.mapreduce.AvroJob;
import org.apache.avro.mapreduce.AvroKeyInputFormat;
import org.apache.avro.mapred.AvroKey;
import org.apache.avro.mapred.Pair;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
import org.apache.trevni.avro.WordCountUtil;
import org.apache.trevni.avro.mapreduce.AvroTrevniKeyOutputFormat;
import org.junit.Test;

public class TestKeyWordCount {
 
  private static long total = 0;

  static final Schema STRING = Schema.create(Schema.Type.STRING);
  static { GenericData.setStringType(STRING, GenericData.StringType.String); }
  static final Schema LONG = Schema.create(Schema.Type.LONG);
 
 
  private static class WordCountMapper extends
      Mapper<AvroKey<String>, NullWritable, Text, LongWritable> {
    private LongWritable mCount = new LongWritable();
    private Text mText = new Text();

    @Override
    protected void setup(Context context) {
      mCount.set(1);
    }

    @Override
    protected void map(AvroKey<String> key, NullWritable value, Context context)
        throws IOException, InterruptedException {

      try {
        StringTokenizer tokens = new StringTokenizer(key.datum());
        while (tokens.hasMoreTokens()) {
          mText.set(tokens.nextToken());
          context.write(mText, mCount);
        }
      } catch (Exception e) {
        throw new RuntimeException(key + " "  + key.datum() , e);
      }

    }
  }
 
  private static class WordCountReducer extends Reducer< Text, LongWritable, AvroKey<GenericData.Record>, NullWritable> {
   
    private AvroKey<GenericData.Record> result ;
   
    @Override
    protected void setup(Context context) {
      result = new AvroKey<GenericData.Record>();
      result.datum(new Record(Pair.getPairSchema(STRING,LONG)));
    }
   
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
      long count = 0;
      for (LongWritable value: values) {
        count += value.get();
      }
     
      result.datum().put("key", key.toString());
      result.datum().put("value", count);
     
      context.write(result, NullWritable.get());
    }
  }
  

 
  public static class Counter extends
  Mapper<AvroKey<GenericData.Record>, NullWritable, NullWritable, NullWritable> {
    @Override
    protected void map(AvroKey<GenericData.Record> key, NullWritable value, Context context)
        throws IOException, InterruptedException {
      total += (Long)key.datum().get("value");
    }
  }
 
 
  @Test public void testIOFormat() throws Exception {
    checkOutputFormat();
    checkInputFormat();
  }

  public void checkOutputFormat() throws Exception {
    Job job = new Job();
   
    WordCountUtil wordCountUtil = new WordCountUtil("trevniMapReduceKeyTest", "part-r-00000");
   
    wordCountUtil.writeLinesFile();
   
    AvroJob.setInputKeySchema(job, STRING);
    AvroJob.setOutputKeySchema(job, Pair.getPairSchema(STRING,LONG));
   
    job.setMapperClass(WordCountMapper.class);
    job.setReducerClass(WordCountReducer.class);
   
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(LongWritable.class);
   
    FileInputFormat.setInputPaths(job, new Path(wordCountUtil.getDir().toString() + "/in"));
    FileOutputFormat.setOutputPath(job, new Path(wordCountUtil.getDir().toString() + "/out"));
    FileOutputFormat.setCompressOutput(job, true);
   
    job.setInputFormatClass(AvroKeyInputFormat.class);
    job.setOutputFormatClass(AvroTrevniKeyOutputFormat.class);

    job.waitForCompletion(true);
   
    wordCountUtil.validateCountsFile();
  }
 
  public void checkInputFormat() throws Exception {
    Job job = new Job();
   
    WordCountUtil wordCountUtil = new WordCountUtil("trevniMapReduceKeyTest");
   
    job.setMapperClass(Counter.class);

    Schema subSchema = Schema.parse("{\"type\":\"record\"," +
                                    "\"name\":\"PairValue\","+
                                    "\"fields\": [ " +
                                    "{\"name\":\"value\", \"type\":\"long\"}" +
                                    "]}");
    AvroJob.setInputKeySchema(job, subSchema);
   
    FileInputFormat.setInputPaths(job, new Path(wordCountUtil.getDir().toString() + "/out/*"));
    job.setInputFormatClass(AvroTrevniKeyInputFormat.class);
   
    job.setNumReduceTasks(0);
    job.setOutputFormatClass(NullOutputFormat.class);
   
    total = 0;
    job.waitForCompletion(true);
    assertEquals(WordCountUtil.TOTAL, total);
   
  }
 
}
TOP

Related Classes of org.apache.trevni.avro.mapreduce.TestKeyWordCount$Counter

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.