Package upenn.junto.algorithm.parallel

Source Code of upenn.junto.algorithm.parallel.LP_ZGL_Hadoop$LP_ZGL_Reduce

package upenn.junto.algorithm.parallel;

/**
* Copyright 2011 Partha Pratim Talukdar
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import gnu.trove.map.hash.TObjectDoubleHashMap;
import gnu.trove.iterator.TObjectDoubleIterator;

import java.io.IOException;
import java.util.HashMap;
import java.util.Hashtable;
import java.util.Iterator;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;

import upenn.junto.util.*;
import upenn.junto.config.*;

public class LP_ZGL_Hadoop {
 
   private static String _kDelim = "\t";
 
   public static class LP_ZGL_Map extends MapReduceBase
         implements Mapper<LongWritable, Text, Text, Text> {
     private Text word = new Text();

     public void map(LongWritable key, Text value,
              OutputCollector<Text, Text> output,
              Reporter reporter) throws IOException {
       /////
       // Constructing the vertex from the string representation
       /////
       String line = value.toString();
       // id gold_label injected_labels estimated_labels neighbors rw_probabilities
       String[] fields = line.split(_kDelim);      
       TObjectDoubleHashMap neighbors = CollectionUtil.String2Map(fields[4]);
      
       boolean isSeedNode = fields[2].length() > 0 ? true : false;
      
       // If the current node is a seed node but there is no
       // estimate label information yet, then transfer the seed label
       // to the estimated label distribution. Ideally, this is likely
       // to be used in the map of the very first iteration.
       if (isSeedNode && fields[3].length() == 0) {
         fields[3] = fields[2];
       }

       // Send two types of messages:
       //   -- self messages which will store the injection labels and
       //        random walk probabilities.
       //   -- messages to neighbors about current estimated scores
       //        of the node.
       //
       // message to self
       output.collect(new Text(fields[0]), new Text(line));

       // message to neighbors
       TObjectDoubleIterator neighIterator = neighbors.iterator();
       while (neighIterator.hasNext()) {
         neighIterator.advance();
        
         // message (neighbor_node, current_node + DELIM + curr_node_label_scores
         output.collect(new Text((String) neighIterator.key()),
                   new Text(fields[0] + _kDelim + fields[3]));
       }
     }
   }
    
   public static class LP_ZGL_Reduce extends MapReduceBase implements Reducer<Text, Text, Text, Text> {
   private static double mu1;
   private static double mu2;
   private static int keepTopKLabels;
  
   public void configure(JobConf conf) {
     mu1 = Double.parseDouble(conf.get("mu1"));
     mu2 = Double.parseDouble(conf.get("mu2"));
     keepTopKLabels = Integer.parseInt(conf.get("keepTopKLabels"));
   }
  
     public void reduce(Text key, Iterator<Text> values,
               OutputCollector<Text, Text> output, Reporter reporter) throws IOException {      
       // new scores estimated for the current node
       TObjectDoubleHashMap newEstimatedScores = new TObjectDoubleHashMap();
      
       // set to true only if the message sent to itself is found.
       boolean isSelfMessageFound = false;
      
       String vertexId = key.toString();
       String vertexString = "";
      
       TObjectDoubleHashMap neighbors = null;
       TObjectDoubleHashMap randWalkProbs = null;
      
       HashMap<String, String> neighScores =
                 new HashMap<String, String>();
      
       int totalMessagesReceived = 0;
       boolean isSeedNode = false;
      
       // iterate over all the messages received at the node
       while (values.hasNext()) {
         ++totalMessagesReceived;

         String val = values.next().toString();
         String[] fields = val.split(_kDelim);
        
         // System.out.println("src: " + fields[0] + " dest: " + vertexId +
         //       "MESSAGE>>" + val + "<<");

         // self-message check
         if (vertexId.equals(fields[0])) {
           isSelfMessageFound = true;
           vertexString = val;
          
           // System.out.println("Reduce: " + vertexId + " " + val + " " + fields.length);

           TObjectDoubleHashMap injLabels = CollectionUtil.String2Map(fields[2]);
           neighbors = CollectionUtil.String2Map(neighbors, fields[4]);
           randWalkProbs = CollectionUtil.String2Map(fields[5]);
          
           if (injLabels.size() > 0) {
             isSeedNode = true;

             // add injected labels to the estimated scores.
             ProbUtil.AddScores(newEstimatedScores,
                            mu1, injLabels);
           }
         } else {
           // an empty second field represents that the
           // neighbor has no valid label assignment yet.
           if (fields.length > 1) {
             neighScores.put(fields[0], fields[1]);
           }
         }
       }

       // terminate if message from self is not received.
       if (!isSelfMessageFound) {
         throw new RuntimeException("Self message not received for node " + vertexId);
       }
      
       // Add neighbor label scores to current node's label estimates only if the
       // current node is not a seed node. In case of seed nodes, clamp back the
       // injected label distribution, which is already done above when processing
       // the self messages
       if (!isSeedNode) {
         // collect neighbors label distributions and create one single
         // label distribution
         TObjectDoubleHashMap weightedNeigLablDist = new TObjectDoubleHashMap();
         Iterator<String> neighIter = neighScores.keySet().iterator();
         while (neighIter.hasNext()) {
           String neighName = neighIter.next();
           ProbUtil.AddScores(weightedNeigLablDist, // newEstimatedScores,
                         mu2 * neighbors.get(neighName),
                         CollectionUtil.String2Map(neighScores.get(neighName)));
         }
         ProbUtil.Normalize(weightedNeigLablDist, keepTopKLabels);
        
         // now add the collective neighbor label distribution to
         // the estimate of the current node's labels.
         ProbUtil.AddScores(newEstimatedScores,
                        1.0, weightedNeigLablDist);
       }
      
       // normalize the scores
       ProbUtil.Normalize(newEstimatedScores);
      
       // now reconstruct the vertex representation (with the new estimated scores)
       // so that the output from the current mapper can be used as input in next
       // iteration's mapper.
       String[] vertexFields = vertexString.split(_kDelim);
      
       // replace estimated scores with the new ones.
       String[] newVertexFields = new String[vertexFields.length - 1];
       for (int i = 1; i < vertexFields.length; ++i) {
         newVertexFields[i - 1] = vertexFields[i];
       }
       newVertexFields[2] = CollectionUtil.Map2String(newEstimatedScores);

       output.collect(key, new Text(CollectionUtil.Join(newVertexFields, _kDelim)));
     }
   }
  
   public static void main(String[] args) throws Exception {
   Hashtable config = ConfigReader.read_config(args);
  
     String baseInputFilePat = Defaults.GetValueOrDie(config, "hdfs_input_pattern");
     String baseOutputFilePat = Defaults.GetValueOrDie(config, "hdfs_output_base");
     int numIterations = Integer.parseInt(Defaults.GetValueOrDie(config, "iters"));
    
     String currInputFilePat = baseInputFilePat;
     String currOutputFilePat = "";
     for (int iter = 1; iter <= numIterations; ++iter) {
       JobConf conf = new JobConf(LP_ZGL_Hadoop.class);
       conf.setJobName("lp_zgl_hadoop");
 
       conf.setOutputKeyClass(Text.class);
       conf.setOutputValueClass(Text.class);
 
       conf.setMapperClass(LP_ZGL_Map.class);
       // conf.setCombinerClass(LP_ZGL_Reduce.class);
       conf.setReducerClass(LP_ZGL_Reduce.class);
 
       conf.setInputFormat(TextInputFormat.class);
       conf.setOutputFormat(TextOutputFormat.class);
      
       // hyperparameters
       conf.set("mu1", Defaults.GetValueOrDie(config, "mu1"));
       conf.set("mu2", Defaults.GetValueOrDie(config, "mu2"));
       conf.set("keepTopKLabels",
            Defaults.GetValueOrDefault((String) config.get("keep_top_k_labels"),
                           Integer.toString(Integer.MAX_VALUE)));

       if (iter > 1) {
         // output from last iteration is the input for current iteration
         currInputFilePat = currOutputFilePat + "/*";
       }
       FileInputFormat.setInputPaths(conf, new Path(currInputFilePat));
       currOutputFilePat = baseOutputFilePat + "_" + iter;
       FileOutputFormat.setOutputPath(conf, new Path(currOutputFilePat));

       JobClient.runJob(conf);
     }
   }
}
TOP

Related Classes of upenn.junto.algorithm.parallel.LP_ZGL_Hadoop$LP_ZGL_Reduce

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.