Package hitune.analysis.mapreduce.processor

Source Code of hitune.analysis.mapreduce.processor.InstrumentDataflow$MapClass

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package hitune.analysis.mapreduce.processor;


import hitune.analysis.mapreduce.AnalysisProcessorConfiguration;
import hitune.analysis.mapreduce.CSVFileOutputFormat;
import hitune.analysis.mapreduce.HiTuneKey;
import hitune.analysis.mapreduce.HiTuneRecord;
import hitune.analysis.mapreduce.MultiSequenceFileInputFormat;
import hitune.analysis.mapreduce.TextArrayWritable;


import java.io.IOException;
import java.io.StringBufferInputStream;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;


import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecord;
import org.apache.hadoop.chukwa.extraction.engine.ChukwaRecordKey;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.UTF8;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.record.Record;
import org.apache.log4j.Logger;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;

import org.xml.sax.SAXException;

import org.xml.sax.SAXException;



/**
* Re-organize the instrumented data for Map tasks, and get the statistics
*
*
*/
public class InstrumentDataflow extends AnalysisProcessor {

    static Logger log = Logger.getLogger(InstrumentDataflow.class);

    /**
     * Get each phase's metrics including :
     * 1. its function list, function sampling count
     * 2. its start,end time
     * 3. its status list and status count
     * 4. its function-status count
     * <br> Each output record represents one sampling point.
     */
    public static class MapClass<K extends Record, V extends Record> extends MapReduceBase implements
    Mapper<K, V, K, V>{
        JobConf conf = null;
        List <String> nodelist = new ArrayList<String>();
        Map <String,List<String>> phases = new HashMap<String,List<String>>();
        Map <String,String> phasealias = new HashMap<String,String>();
        List <String> statuslist = new ArrayList<String>();
        @Override
        public void configure(JobConf jobConf) {
            super.configure(jobConf);
            this.conf = jobConf;
            init();
        }

        void parsePhase(){
            DocumentBuilderFactory factory = DocumentBuilderFactory.newInstance();
            // Now use the factory to create a DOM parser (a.k.a. a DocumentBuilder)
            DocumentBuilder parser;
            try {
                parser = factory.newDocumentBuilder();
                // Parse the file and build a Document tree to represent its content
                Document document = parser.parse(new StringBufferInputStream("<root>"+conf.get("phases") + "</root>"));
                // Ask the document for a list of all phases
                NodeList rows = document.getElementsByTagName(AnalysisProcessorConfiguration.phase);
                int phasenumber = rows.getLength();
                for (int i = 0; i< phasenumber; i++){
                    Node phase = rows.item( i );
                    NodeList fields = phase.getChildNodes();
                    String phasename = null;
                    String stacks = null;
                    String funcs = null;
                    List<String> functionlist = new ArrayList<String>();
                    for (int j = 0; j < fields.getLength(); j++) {
                        Node fieldNode = fields.item(j);
                        if (!(fieldNode instanceof Element))
                            continue;
                        Element field = (Element)fieldNode;
                        if ("phasename".equals(field.getTagName()) && field.hasChildNodes())
                            phasename =  ((org.w3c.dom.Text)field.getFirstChild()).getData().trim();
                        else if ("stack".equals(field.getTagName()) && field.hasChildNodes())
                            stacks = ((org.w3c.dom.Text)field.getFirstChild()).getData();
                        else if ("functions".equals(field.getTagName()) && field.hasChildNodes())
                            funcs = ((org.w3c.dom.Text)field.getFirstChild()).getData();
                    }
                    if(stacks!=null && stacks.length()!=0) stacks = stacks.replace(" ", "");
                    else stacks="";
                    phasealias.put(stacks, phasename);

                    if(funcs == null){
                        continue;
                    }
                    for(String func: funcs.split(SEPERATOR_COMMA)){
                        functionlist.add(func);
                    }
                    this.phases.put(stacks, functionlist);
                }
            } catch (ParserConfigurationException e) {
                // TODO Auto-generated catch block
                log.warn(e);
                e.printStackTrace();
            } catch (SAXException e) {
                // TODO Auto-generated catch block
                log.warn(e);
                e.printStackTrace();
            } catch (IOException e) {
                // TODO Auto-generated catch block
                log.warn(e);
                e.printStackTrace();
            }
        }

        private void init(){
            String nodes = conf.get(AnalysisProcessorConfiguration.nodes);
            this.nodelist = String2List(nodes, SEPERATOR_COMMA);
            String status = conf.get("status");
            this.statuslist = String2List(status, SEPERATOR_COMMA);
            parsePhase();
        }

        private String count(String dest, List<String> patternList){
            StringBuilder results = new StringBuilder();
            if(dest==null || patternList==null || patternList.size()<=0){
                return "";
            }

            for(String pattern : patternList){
                Pattern p = Pattern.compile(pattern);
                Matcher matcher = p.matcher(dest);
                if(matcher.find()){
                    results.append("1").append(SEPERATOR_COMMA);
                }
                else{
                    results.append("0").append(SEPERATOR_COMMA);
                }
            }
            log.debug("results:" + results.toString());
            return results.toString().substring(0, results.length()-1);
        }

        @Override
        public void map(K key, V value,
                OutputCollector<K, V> output,
                Reporter reporter) throws IOException {
            // TODO Auto-generated method stub
            //doing the filter

            //<key,value>
            //<[AttemptID/PhaseStack/PhaseAlias], [ThreadName,ThreadId,starttime,endtime,funlist,funcountlist,statelist,statecountlist,funStateMatric]>
            HiTuneRecord valproxy = new HiTuneRecord(value);
            String hostname = valproxy.getHost();
            String status  = valproxy.getValue("ThreadState");
            String stack = valproxy.getValue("CallStack");
            String attemptID = valproxy.getValue("TaskID");
            log.debug("hostname:" + hostname + " ThreadState:" + status + " stack:" + stack + " attemptID:" + attemptID);
            if(isMatched(this.nodelist,hostname)){
                for(String s : phasealias.keySet()){
                    log.debug("phasealias:" +s);
                    if(s==null || s.length()==0)s="";
                    Pattern p = Pattern.compile(s);
                    if(stack!=null && stack.length()!=0)stack=stack.replace(" ", "");
                    else stack="";
                    Matcher matcher = p.matcher(stack);
                    if(matcher.find()){
                        try{
                            log.debug("find pattern");
                            K newkey = (K) key.getClass().getConstructor().newInstance();
                            V newval = (V) value.getClass().getConstructor().newInstance();

                            HiTuneKey newkeyproxy = new HiTuneKey(newkey);
                            HiTuneRecord newvalproxy = new HiTuneRecord(newval);

                            newkeyproxy.setKey(attemptID + "/" + s + "/" + phasealias.get(s));
                            newkeyproxy.setDataType(new HiTuneKey(key).getDataType());
                            newvalproxy.copyCommonFields(value);


                            newvalproxy.add("thread_id", valproxy.getValue("ThreadID"));
                            newvalproxy.add("thread_name", valproxy.getValue("ThreadName"));
                            newvalproxy.add("attempt_id", attemptID);
                            newvalproxy.add("phase_stack", s);
                            newvalproxy.add("phase_name", phasealias.get(s));
                            newvalproxy.add("start", "" + newvalproxy.getTime());
                            newvalproxy.add("count" , "1");
                            log.debug("status:" + conf.get("status"));
                            newvalproxy.add("statusList", conf.get("status"));
                            newvalproxy.add("statusCount", count(status, this.statuslist));

                            log.debug("funList:" + this.phases.get(s));
                            newvalproxy.add("funList", List2String(this.phases.get(s),SEPERATOR_COMMA));
                            newvalproxy.add("funCount", count(stack, this.phases.get(s)));
                            newvalproxy.add(AnalysisProcessorConfiguration.jobid, conf.get(AnalysisProcessorConfiguration.jobid));

                            log.debug("Key:" + newkeyproxy.toString() + " Record" + newkeyproxy.toString());
                            output.collect((K)newkeyproxy.getObject(), (V)newvalproxy.getObject());
                        } catch (IllegalArgumentException e) {
                            // TODO Auto-generated catch block
                            log.warn(e);
                            e.printStackTrace();
                        } catch (SecurityException e) {
                            // TODO Auto-generated catch block
                            log.warn(e);
                            e.printStackTrace();
                        } catch (InstantiationException e) {
                            // TODO Auto-generated catch block
                            log.warn(e);
                            e.printStackTrace();
                        } catch (IllegalAccessException e) {
                            // TODO Auto-generated catch block
                            log.warn(e);
                            e.printStackTrace();
                        } catch (InvocationTargetException e) {
                            // TODO Auto-generated catch block
                            log.warn(e);
                            e.printStackTrace();
                        } catch (NoSuchMethodException e) {
                            // TODO Auto-generated catch block
                            log.warn(e);
                            e.printStackTrace();
                        }

                    }
                }

            }


        }

    }

    /**
     * Calculate each phase's statistics:
     * 1. choose minimum start time as the start time.
     * 2. choose maximum end time as the end time.
     * 3. sum the sampling count with certain status.
     * 4. sum the function sampling count
     * 5. sum the phase's count
     * The analyzer won't tell that if the phase is  continuous or not in the time sequence.
     */
    public static class ReduceClass<K extends Record, V extends Record> extends MapReduceBase implements
    Reducer<K, V, Text, TextArrayWritable>{

        static boolean initialized = false;

        /**
         * Add two vectors
         * @param a
         * @param b
         * @param seperator
         * @return
         */
        String vectorAdd(String a, String b, String seperator){
            StringBuilder result = new StringBuilder();
            List<String> list_a = String2List(a,seperator);
            List<String> list_b = String2List(b,seperator);

            if(list_a == null || list_b == null||list_a.size()!=list_b.size() || list_a.size()==0 || list_b.size()==0){
                return "";
            }
            for( int i =0; i< list_a.size(); i++){
                int _a = Integer.parseInt(list_a.get(i));
                int _b = Integer.parseInt(list_b.get(i));
                int sum = _a + _b;
                result.append(sum).append(seperator);
            }
            return result.toString().substring(0, result.length()-seperator.length());
        }

        @Override
        public void reduce(K key, Iterator<V> values,
                OutputCollector<Text, TextArrayWritable> output, Reporter reporter)
        throws IOException {
            // TODO Auto-generated method stub
            //organizing into csv format
            Map<String, String> newRecord = new HashMap<String,String>();
            String []headers = new String[]{"attempt_id","breakdown_count", "breakdown_name", "breakdown_type","host",
                    "job_id", "phase_count", "phase_end", "phase_name", "phase_stack", "phase_start",
                    "thread_id", "thread_name"};
            for(String head:headers){
                newRecord.put(head, "");
            }



            long start = -1, end = -1;
            long phaseCount=0;
            String funcCount="", statusCount="";
            String funcList="", statusList="";
            while(values.hasNext()){
                HiTuneRecord valproxy = new HiTuneRecord(values.next());
                long phaseStart = Long.parseLong(valproxy.getValue("start"));
                long phaseEnd = Long.parseLong(valproxy.getValue("start"));
                start = start == -1 ? phaseStart : Math.min(start,phaseStart);
                end = end == -1 ?phaseEnd : Math.max(end,phaseEnd);
                phaseCount++;
                funcCount = funcCount == "" ? valproxy.getValue("funCount"):vectorAdd(valproxy.getValue("funCount"),funcCount, SEPERATOR_COMMA);
                statusCount = statusCount == "" ? valproxy.getValue("statusCount"):vectorAdd(valproxy.getValue("statusCount"),statusCount, SEPERATOR_COMMA);
                newRecord.put("host", valproxy.getHost());
                newRecord.put("job_id", valproxy.getValue(AnalysisProcessorConfiguration.jobid));
                newRecord.put("phase_stack", valproxy.getValue("phase_stack"));
                newRecord.put("phase_name", valproxy.getValue("phase_name"));
                newRecord.put("attempt_id", valproxy.getValue("attempt_id"));
                newRecord.put("thread_id", valproxy.getValue("thread_id"));
                newRecord.put("thread_name", valproxy.getValue("thread_name"));
                funcList = valproxy.getValue("funList");
                statusList = valproxy.getValue("statusList");
            }

            newRecord.put("phase_start", ""+start);
            newRecord.put("phase_end", ""+end);
            newRecord.put("phase_count", ""+phaseCount);

            if(!initialized){
                TextArrayWritable newValue = new TextArrayWritable(newRecord.keySet().toArray(new String[0]));
                output.collect(null, newValue);
                initialized = true;
            }

            if(!funcCount.equals("")){
                newRecord.put("breakdown_type", "function");
                log.debug("funcList: " + funcList);
                log.debug("funcCount: " + funcCount);
                List <String> tmp = String2List(funcList,SEPERATOR_COMMA );
                List <String> counts = String2List(funcCount,SEPERATOR_COMMA );
                for(int i = 0 ; i < tmp.size(); i++ ){
                    log.debug("function:" +tmp.get(i) + " count:" + counts.get(i));
                    newRecord.put("breakdown_name", tmp.get(i));
                    newRecord.put("breakdown_count", ""+counts.get(i));
                    String [] contents = new String[newRecord.keySet().size()];
                    int j = 0;
                    for (String index:  newRecord.keySet() ){
                        contents[j] = newRecord.get(index);
                        log.debug("content: " + index + "," +contents[j] );
                        j++;

                    }
                    TextArrayWritable newValue = new TextArrayWritable(contents);
                    output.collect(null, newValue);
                    contents=null;
                }
            }
            if(!statusCount.equals("")){
                newRecord.put("breakdown_type", "state");
                log.debug("statusList: " + statusList);
                log.debug("statusCount: " + statusCount);
                List <String> tmp = String2List(statusList,SEPERATOR_COMMA );
                List <String> counts = String2List(statusCount,SEPERATOR_COMMA );
                for(int i = 0 ; i < tmp.size(); i++ ){
                    log.debug("function:" +tmp.get(i) + " count:" + counts.get(i));
                    newRecord.put("breakdown_name", tmp.get(i));
                    newRecord.put("breakdown_count", ""+counts.get(i));
                    String [] contents = new String[newRecord.size()];
                    int j = 0;
                    for (String index:  newRecord.keySet() ){
                        contents[j] = newRecord.get(index);
                        j++;
                        log.debug("content: " + index + "," +contents[i] );
                    }
                    TextArrayWritable newValue = new TextArrayWritable(contents);
                    output.collect(null, newValue);
                    contents=null;
                }
            }
        }

    }
    /**
     * @param conf
     */
    public InstrumentDataflow(Configuration conf) {
        super(conf);
        // TODO Auto-generated constructor stub
    }

    /* (non-Javadoc)
     * @see org.apache.hadoop.chukwa.analysis.HiTune.AnalysisProcessor#run()
     */
    @Override
    public void run() {
        // TODO Auto-generated method stub

        long timestamp = System.currentTimeMillis();

        JobConf conf = new JobConf(this.conf,InstrumentDataflow.class);
        try{
            conf.setJobName(this.getClass().getSimpleName()+ timestamp);
            conf.setInputFormat(MultiSequenceFileInputFormat.class);
            conf.setMapperClass(InstrumentDataflow.MapClass.class);
            conf.setReducerClass(InstrumentDataflow.ReduceClass.class);
            conf.setOutputKeyClass(Text.class);
            Class<? extends WritableComparable> outputKeyClass =
                Class.forName(conf.get(AnalysisProcessorConfiguration.mapoutputKeyClass)).asSubclass(WritableComparable.class);
            Class<? extends Writable> outputValueClass =
                Class.forName(conf.get(AnalysisProcessorConfiguration.mapoutputValueClass)).asSubclass(Writable.class);
            conf.setMapOutputKeyClass(outputKeyClass);
            conf.setMapOutputValueClass(outputValueClass);

            conf.setOutputValueClass(TextArrayWritable.class);
            conf.setOutputFormat(CSVFileOutputFormat.class);


            String outputPaths = conf.get(AnalysisProcessorConfiguration.reportfolder) + "/" + conf.get(AnalysisProcessorConfiguration.reportfile);
            String temp_outputPaths = getTempOutputDir(outputPaths );


            if(this.inputfiles != null){
                log.debug("inputPaths:" + inputfiles);
                FileInputFormat.setInputPaths(conf,inputfiles);
                FileOutputFormat.setOutputPath(conf,new Path(temp_outputPaths));

                //FileInputFormat.setInputPathFilter(conf, evtFileFilter.class);
                //conf.setNumReduceTasks(1);

                try {
                    JobClient.runJob(conf);
                    moveResults(conf,outputPaths,temp_outputPaths);
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    log.warn("For " + getOutputFileName() + " :JOB fails!");
                    log.warn(e);
                    e.printStackTrace();
                    this.MOVE_DONE = false;
                }

            }
            else{
                log.warn( "For " + getOutputFileName() + " :No input path!");
            }
        }catch(Exception e){
            log.warn("Job preparation failure!");
            log.warn(e);
            e.printStackTrace();
        }
    }

    /**
     * @param args
     */
    public static void main(String[] args) {
        // TODO Auto-generated method stub

    }

}
TOP

Related Classes of hitune.analysis.mapreduce.processor.InstrumentDataflow$MapClass

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.