Package com.backtype.hadoop

Source Code of com.backtype.hadoop.Consolidator$ConsolidatorInputFormat$PathSizePair

package com.backtype.hadoop;

import com.backtype.hadoop.formats.RecordInputStream;
import com.backtype.hadoop.formats.RecordOutputStream;
import com.backtype.hadoop.formats.RecordStreamFactory;
import com.backtype.support.SubsetSum;
import com.backtype.support.SubsetSum.Value;
import com.backtype.support.Utils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.mapred.lib.NullOutputFormat;
import org.apache.log4j.Logger;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.io.Serializable;
import java.util.*;


public class Consolidator {
    public static final long DEFAULT_CONSOLIDATION_SIZE = 1024*1024*127; //127 MB
    private static final String ARGS = "consolidator_args";

    private static Thread shutdownHook;
    private static RunningJob job = null;

    public static class ConsolidatorArgs implements Serializable {
        public String fsUri;
        public RecordStreamFactory streams;
        public PathLister pathLister;
        public List<String> dirs;
        public long targetSizeBytes;
        public String extension;


        public ConsolidatorArgs(String fsUri, RecordStreamFactory streams, PathLister pathLister,
            List<String> dirs, long targetSizeBytes, String extension) {
            this.fsUri = fsUri;
            this.streams = streams;
            this.pathLister = pathLister;
            this.dirs = dirs;
            this.targetSizeBytes = targetSizeBytes;
            this.extension = extension;
        }
    }

    public static void consolidate(FileSystem fs, String targetDir, RecordStreamFactory streams,
        PathLister pathLister, long targetSizeBytes) throws IOException {
        consolidate(fs, targetDir, streams, pathLister, targetSizeBytes, "");
    }

    public static void consolidate(FileSystem fs, String targetDir, RecordStreamFactory streams,
        PathLister pathLister, String extension) throws IOException {
        consolidate(fs, targetDir, streams, pathLister, DEFAULT_CONSOLIDATION_SIZE, extension);
    }

    public static void consolidate(FileSystem fs, String targetDir, RecordStreamFactory streams,
        PathLister pathLister) throws IOException {
        consolidate(fs, targetDir, streams, pathLister, DEFAULT_CONSOLIDATION_SIZE, "");
    }

    public static void consolidate(FileSystem fs, String targetDir, RecordStreamFactory streams,
        PathLister pathLister, long targetSizeBytes, String extension) throws IOException {
        List<String> dirs = new ArrayList<String>();
        dirs.add(targetDir);
        consolidate(fs, streams, pathLister, dirs, targetSizeBytes, extension);
    }

    private static String getDirsString(List<String> targetDirs) {
        String ret = "";
        for(int i=0; i < 3 && i < targetDirs.size(); i++) {
            ret+=(targetDirs.get(i) + " ");
        }
        if(targetDirs.size()>3) {
            ret+="...";
        }
        return ret;
    }

    public static void consolidate(FileSystem fs, RecordStreamFactory streams, PathLister lister, List<String> dirs,
        long targetSizeBytes, String extension) throws IOException {
        JobConf conf = new JobConf(fs.getConf(), Consolidator.class);
        String fsUri = fs.getUri().toString();
        ConsolidatorArgs args = new ConsolidatorArgs(fsUri, streams, lister, dirs, targetSizeBytes, extension);
        Utils.setObject(conf, ARGS, args);

        conf.setJobName("Consolidator: " + getDirsString(dirs));

        conf.setInputFormat(ConsolidatorInputFormat.class);
        conf.setOutputFormat(NullOutputFormat.class);
        conf.setMapperClass(ConsolidatorMapper.class);

        conf.setSpeculativeExecution(false);

        conf.setNumReduceTasks(0);

        conf.setOutputKeyClass(NullWritable.class);
        conf.setOutputValueClass(NullWritable.class);

        try {
            registerShutdownHook();
            job = new JobClient(conf).submitJob(conf);

            while(!job.isComplete()) {
                Thread.sleep(100);
            }
            if(!job.isSuccessful()) throw new IOException("Consolidator failed");
            deregisterShutdownHook();
        } catch(IOException e) {

            IOException ret = new IOException("Consolidator failed");
            ret.initCause(e);
            throw ret;
        } catch(InterruptedException e) {
            throw new RuntimeException(e);
        }
    }

    private static void registerShutdownHook() {
        shutdownHook = new Thread()
        {
            @Override
            public void run()
            {
                try {
                    if(job != null)
                        job.killJob();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        };
        Runtime.getRuntime().addShutdownHook( shutdownHook );
    }

    private static void deregisterShutdownHook()
    {
        Runtime.getRuntime().removeShutdownHook( shutdownHook );
    }

    public static class ConsolidatorMapper extends MapReduceBase implements Mapper<ArrayWritable, Text, NullWritable, NullWritable> {
        public static Logger LOG = Logger.getLogger(ConsolidatorMapper.class);

        FileSystem fs;
        ConsolidatorArgs args;

        public void map(ArrayWritable sourcesArr, Text target, OutputCollector<NullWritable, NullWritable> oc, Reporter rprtr) throws IOException {

            Path finalFile = new Path(target.toString());

            List<Path> sources = new ArrayList<Path>();
            for(int i=0; i<sourcesArr.get().length; i++) {
                sources.add(new Path(((Text)sourcesArr.get()[i]).toString()));
            }
            //must have failed after succeeding to create file but before task finished - this is valid
            //because path is selected with a UUID
            if(!fs.exists(finalFile)) {
                Path tmpFile = new Path("/tmp/consolidator/" + UUID.randomUUID().toString());
                fs.mkdirs(tmpFile.getParent());

                String status = "Consolidating " + sources.size() + " files into " + tmpFile.toString();
                LOG.info(status);
                rprtr.setStatus(status);

                RecordStreamFactory fact = args.streams;
                fs.mkdirs(finalFile.getParent());

                RecordOutputStream os = fact.getOutputStream(fs, tmpFile);
                for(Path i: sources) {
                    LOG.info("Opening " + i.toString() + " for consolidation");
                    RecordInputStream is = fact.getInputStream(fs, i);
                    byte[] record;
                    while((record = is.readRawRecord()) != null) {
                        os.writeRaw(record);
                    }
                    is.close();
                    rprtr.progress();
                }
                os.close();

                status = "Renaming " + tmpFile.toString() + " to " + finalFile.toString();
                LOG.info(status);
                rprtr.setStatus(status);

                if(!fs.rename(tmpFile, finalFile))
                    throw new IOException("could not rename " + tmpFile.toString() + " to " + finalFile.toString());
            }

            String status = "Deleting " + sources.size() + " original files";
            LOG.info(status);
            rprtr.setStatus(status);

            for(Path p: sources) {
                fs.delete(p, false);
                rprtr.progress();
            }

        }

        @Override
        public void configure(JobConf conf) {
            args = (ConsolidatorArgs) Utils.getObject(conf, ARGS);
            try {
                fs = Utils.getFS(args.fsUri, conf);
            } catch(IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    public static class ConsolidatorSplit implements InputSplit {
        public String[] sources;
        public String target;

        public ConsolidatorSplit() {

        }

        public ConsolidatorSplit(String[] sources, String target) {
            this.sources = sources;
            this.target = target;
        }


        public long getLength() throws IOException {
            return 1;
        }

        public String[] getLocations() throws IOException {
            return new String[] {};
        }

        public void write(DataOutput d) throws IOException {
            WritableUtils.writeString(d, target);
            WritableUtils.writeStringArray(d, sources);
        }

        public void readFields(DataInput di) throws IOException {
            target = WritableUtils.readString(di);
            sources = WritableUtils.readStringArray(di);
        }

    }

    public static class ConsolidatorRecordReader implements RecordReader<ArrayWritable, Text> {
        private ConsolidatorSplit split;
        boolean finished = false;

        public ConsolidatorRecordReader(ConsolidatorSplit split) {
            this.split = split;
        }

        public boolean next(ArrayWritable k, Text v) throws IOException {
            if(finished) return false;
            Writable[] sources = new Writable[split.sources.length];
            for(int i=0; i<sources.length; i++) {
                sources[i] = new Text(split.sources[i]);
            }
            k.set(sources);
            v.set(split.target);

            finished = true;
            return true;
        }

        public ArrayWritable createKey() {
            return new ArrayWritable(Text.class);
        }

        public Text createValue() {
            return new Text();
        }

        public long getPos() throws IOException {
            if(finished) return 1;
            else return 0;
        }

        public void close() throws IOException {
        }

        public float getProgress() throws IOException {
            if(finished) return 1;
            else return 0;
        }

    }


    public static class ConsolidatorInputFormat implements InputFormat<ArrayWritable, Text> {

        private static class PathSizePair implements Value {
            public Path path;
            public long size;

            public PathSizePair(Path p, long s) {
                this.path = p;
                this.size = s;
            }

            public long getValue() {
                return size;
            }

        }

        private List<PathSizePair> getFileSizePairs(FileSystem fs, List<Path> files) throws IOException {
            List<PathSizePair> results = new ArrayList<PathSizePair>();
            for(Path p: files) {
                long size = fs.getContentSummary(p).getLength();
                results.add(new PathSizePair(p, size));
            }
            return results;
        }


        private String[] pathsToStrs(List<PathSizePair> pairs) {
            String[] ret = new String[pairs.size()];
            for(int i=0; i<pairs.size(); i++) {
                ret[i] = pairs.get(i).path.toString();
            }
            return ret;
        }

        private List<InputSplit> createSplits(FileSystem fs, List<Path> files,
            String target, long targetSize, String extension) throws IOException {
            List<PathSizePair> working = getFileSizePairs(fs, files);
            List<InputSplit> ret = new ArrayList<InputSplit>();
            List<List<PathSizePair>> splits = SubsetSum.split(working, targetSize);
            for(List<PathSizePair> c: splits) {
                if(c.size()>1) {
                    String rand = UUID.randomUUID().toString();
                    String targetFile = new Path(target,
                        "" + rand.charAt(0) + rand.charAt(1) + "/cons" +
                        rand + extension).toString();
                    ret.add(new ConsolidatorSplit(pathsToStrs(c), targetFile));

                }
            }
            Collections.sort(ret, new Comparator<InputSplit>() {
                public int compare(InputSplit o1, InputSplit o2) {
                    return ((ConsolidatorSplit)o2).sources.length - ((ConsolidatorSplit)o1).sources.length;
                }
            });
            return ret;
        }

        public InputSplit[] getSplits(JobConf conf, int ignored) throws IOException {
            ConsolidatorArgs args = (ConsolidatorArgs) Utils.getObject(conf, ARGS);
            PathLister lister = args.pathLister;
            List<String> dirs = args.dirs;
            List<InputSplit> ret = new ArrayList<InputSplit>();
            for(String dir: dirs) {
                FileSystem fs = Utils.getFS(dir, conf);
                ret.addAll(createSplits(fs, lister.getFiles(fs,dir),
                    dir, args.targetSizeBytes, args.extension));
            }
            return ret.toArray(new InputSplit[ret.size()]);
        }

        public RecordReader<ArrayWritable, Text> getRecordReader(InputSplit is, JobConf jc, Reporter rprtr) throws IOException {
            return new ConsolidatorRecordReader((ConsolidatorSplit) is);
        }
    }
}
TOP

Related Classes of com.backtype.hadoop.Consolidator$ConsolidatorInputFormat$PathSizePair

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.