Package org.apache.pig.test.pigmix.datagen

Source Code of org.apache.pig.test.pigmix.datagen.DataGenerator$UniformRandomGenerator

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.pig.test.pigmix.datagen;

import java.io.*;
import java.lang.SecurityException;
import java.text.ParseException;
import java.util.*;

import sdsu.algorithms.data.Zipf;

import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.MRConfiguration;
import org.apache.pig.tools.cmdline.CmdLineParser;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapred.*;
import org.apache.hadoop.util.*;

/**
* A tool to generate data for performance testing.
*/
public class DataGenerator extends Configured implements Tool {
    ColSpec[] colSpecs;
    long seed = -1;
    long numRows = -1;
    int numMappers = -1;
    String outputFile;
    String inFile;
    char separator = '\u0001' ;
    Random rand;

    private String[] mapkey = { "a", "b", "c", "d", "e", "f", "g", "h", "i",
        "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w",
        "x", "y", "z"};

    public static void main(String[] args) throws Exception {
        DataGenerator dg = new DataGenerator();
        try {
            ToolRunner.run(new Configuration(), dg, args);
           }catch(Exception e) {
            throw new IOException (e);
        }
        dg.go();
    }

    protected DataGenerator(long seed) {
        System.out.println("Using seed " + seed);
        rand = new Random(seed);
    }

    protected DataGenerator() {

    }

    protected DataGenerator(String[] args) {

    }

    public int run(String[] args) throws Exception {
        CmdLineParser opts = new CmdLineParser(args);
        opts.registerOpt('e', "seed", CmdLineParser.ValueExpected.REQUIRED);
        opts.registerOpt('f', "file", CmdLineParser.ValueExpected.REQUIRED);
        opts.registerOpt('r', "rows", CmdLineParser.ValueExpected.REQUIRED);
        opts.registerOpt('s', "separator", CmdLineParser.ValueExpected.REQUIRED);
        opts.registerOpt('i', "input", CmdLineParser.ValueExpected.REQUIRED);
        opts.registerOpt('m', "mappers", CmdLineParser.ValueExpected.OPTIONAL);

        char opt;
        try {
            while ((opt = opts.getNextOpt()) != CmdLineParser.EndOfOpts) {
                switch (opt) {
                case 'e':
                    seed = Long.valueOf(opts.getValStr());
                    break;

                case 'f':
                    outputFile = opts.getValStr();
                    break;

                case 'i':
                    inFile = opts.getValStr();
                    break;

                case 'r':
                    numRows = Long.valueOf(opts.getValStr());
                    break;

                case 's':
                    separator = opts.getValStr().charAt(0);
                    break;

                case 'm':
                    numMappers = Integer.valueOf(opts.getValStr());
                    break;

                default:
                    usage();
                    break;
                }
            }
        } catch (ParseException pe) {
            System.err.println("Couldn't parse the command line arguments, " +
                pe.getMessage());
            usage();
        }

        if (numRows < 1 && inFile == null) usage();

        if (numRows > 0 && inFile != null) usage();

        if (numMappers > 0 && seed != -1) usage();

        if (seed == -1){
            seed = System.currentTimeMillis();
        }

        String remainders[] = opts.getRemainingArgs();
        colSpecs = new ColSpec[remainders.length];
        for (int i = 0; i < remainders.length; i++) {
            colSpecs[i] = new ColSpec(remainders[i]);
        }
        System.err.println("Using seed " + seed);
        rand = new Random(seed);

        return 0;
    }

    private void go() throws IOException {
        long t1 = System.currentTimeMillis();
        if (numMappers <= 0) {
            System.out.println("Generate data in local mode.");
            goLocal();
        }else{
            System.out.println("Generate data in hadoop mode.");
            HadoopRunner runner = new HadoopRunner();
            runner.goHadoop();
        }

        long t2 = System.currentTimeMillis();
        System.out.println("Job is successful! It took " + (t2-t1)/1000 + " seconds.");
    }

    public void goLocal() throws IOException {

        PrintWriter out = null;
        try {
            out = new PrintWriter(outputFile);
        } catch (FileNotFoundException fnfe) {
            System.err.println("Could not find file " + outputFile +
                ", " + fnfe.getMessage());
            return;
        } catch (SecurityException se) {
            System.err.println("Could not write to file " + outputFile +
                ", " + se.getMessage());
            return;
        }

        BufferedReader in = null;
        if (inFile != null) {
            try {
                    in = new BufferedReader(new FileReader(inFile));
            } catch (FileNotFoundException fnfe) {
                System.err.println("Unable to find input file " + inFile);
                return;
            }
        }

        if (numRows > 0) {
            for (int i = 0; i < numRows; i++) {
                writeLine(out);
                out.println();
            }
        } else if (in != null) {
            String line;
            while ((line = in.readLine()) != null) {
                out.print(line);
                writeLine(out);
                out.println();
            }
        }
        out.close();
    }

    protected void writeLine(PrintWriter out) {
        for (int j = 0; j < colSpecs.length; j++) {
            if (j != 0) out.print(separator);
            // First, decide if it's going to be null
            if (rand.nextInt(100) < colSpecs[j].pctNull) {
                continue;
            }
            writeCol(colSpecs[j], out);
        }
    }

    private void writeCol(ColSpec colspec, PrintWriter out) {
        switch (colspec.datatype) {
        case INT:
            out.print(colspec.nextInt());
            break;

        case LONG:
            out.print(colspec.nextLong());
            break;

        case FLOAT:
            out.print(colspec.nextFloat());
            break;

        case DOUBLE:
            out.print(colspec.nextDouble());
            break;

        case STRING:
            out.print(colspec.nextString());
            break;

        case MAP:
            int len = rand.nextInt(20) + 6;
            for (int k = 0; k < len; k++) {
                if (k != 0) out.print('');
                out.print(mapkey[k] + '');
                out.print(colspec.gen.randomString());
            }
            break;

        case BAG:
            int numElements = rand.nextInt(5) + 5;
            for (int i = 0; i < numElements; i++) {
                if (i != 0) out.print('');
                switch(colspec.contained.datatype) {
                    case INT: out.print("i"); break;
                    case LONG: out.print("l"); break;
                    case FLOAT: out.print("f"); break;
                    case DOUBLE: out.print("d"); break;
                    case STRING: out.print("s"); break;
                    case MAP: out.print("m"); break;
                    case BAG: out.print("b"); break;
                    default: throw new RuntimeException("should never be here");
                }
                writeCol(colspec.contained, out);
            }
        }
    }

    private void usage() {
        System.err.println("Usage: datagen -rows numrows [options] colspec ...");
        System.err.println("\tOptions:");
        System.err.println("\t-e -seed seed value for random numbers");
        System.err.println("\t-f -file output file, default is stdout");
        System.err.println("\t-i -input input file, lines will be read from");
        System.err.println("\t\tthe file and additional columns appended.");
        System.err.println("\t\tMutually exclusive with -r.");
        System.err.println("\t-r -rows number of rows to output");
        System.err.println("\t-s -separator character, default is ^A");
        System.err.println("\t-m -number of mappers to run concurrently to generate data. " +
                "If not specified, DataGenerator runs locally. This option can NOT be used with -e.");
        System.err.println();
        System.err.print("\tcolspec: columntype:average_size:cardinality:");
        System.err.println("distribution_type:percent_null");
        System.err.println("\tcolumntype:");
        System.err.println("\t\ti = int");
        System.err.println("\t\tl = long");
        System.err.println("\t\tf = float");
        System.err.println("\t\td = double");
        System.err.println("\t\ts = string");
        System.err.println("\t\tm = map");
        System.err.println("\t\tbx = bag of x, where x is a columntype");
        System.err.println("\tdistribution_type:");
        System.err.println("\t\tu = uniform");
        System.err.println("\t\tz = zipf");

        throw new RuntimeException();
    }



    static enum Datatype { INT, LONG, FLOAT, DOUBLE, STRING, MAP, BAG };
    static enum DistributionType { UNIFORM, ZIPF };
    protected class ColSpec {
        String arg;
        Datatype datatype;
        DistributionType distype;
        int avgsz;
        int card;
        RandomGenerator gen;
        int pctNull;
        ColSpec contained;
        String mapfile;
        Map<Integer, Object> map;

        public ColSpec(String arg) {
            this.arg = arg;

            String[] parts = arg.split(":");
            if (parts.length != 5 && parts.length != 6) {
                System.err.println("Colspec [" + arg + "] format incorrect");
                usage();
            }

            switch (parts[0].charAt(0)) {
                case 'i': datatype = Datatype.INT; break;
                case 'l': datatype = Datatype.LONG; break;
                case 'f': datatype = Datatype.FLOAT; break;
                case 'd': datatype = Datatype.DOUBLE; break;
                case 's': datatype = Datatype.STRING; break;
                case 'm': datatype = Datatype.MAP; break;
                case 'b':
                    datatype = Datatype.BAG;
                    contained = new ColSpec(arg.substring(1));
                    return;
                default:
                    System.err.println("Don't know column type " +
                        parts[0].charAt(0));
                    usage();
                    break;
            }
            avgsz = Integer.valueOf(parts[1]);
            card = Integer.valueOf(parts[2]);
            switch (parts[3].charAt(0)) {
                case 'u':
                    gen = new UniformRandomGenerator(avgsz, card);
                    distype = DistributionType.UNIFORM;
                    break;

                case 'z':
                    gen = new ZipfRandomGenerator(avgsz, card);
                    distype = DistributionType.ZIPF;
                    break;

                default:
                    System.err.println("Don't know generator type " +
                        parts[3].charAt(0));
                    usage();
                    break;
            }

            pctNull = Integer.valueOf(parts[4]);
            if (pctNull > 100) {
                System.err.println("Percentage null must be between 0-100, "
                    + "you gave" + pctNull);
                usage();
            }
            contained = null;

            // if config has 6 columns, the last col is the file name
            // of the mapping file from random number to field value
            if (parts.length == 6) {
                mapfile = parts[5];
                gen.hasMapFile = true;
            }

            map = new HashMap<Integer, Object>();
        }

        public int nextInt() {
            return gen.nextInt(map);
        }

        public long nextLong() {
            return gen.nextInt(map);
        }

        public double nextDouble() {
            return gen.nextDouble(map);
        }

        public float nextFloat() {
            return gen.nextFloat(map);
        }

        public String nextString() {
            return gen.nextString(map);
        }
    }

    abstract class RandomGenerator {

        protected int avgsz;
        protected boolean hasMapFile; // indicating whether a map file from
                                      // random number to the field value is pre-defined

        abstract public int nextInt(Map<Integer, Object> map);
        abstract public long nextLong(Map<Integer, Object> map);
        abstract public float nextFloat(Map<Integer, Object> map);
        abstract public double nextDouble(Map<Integer, Object> map);
        abstract public String nextString(Map<Integer, Object> map);

        public String randomString() {
            int var = (int)((double)avgsz * 0.3);
            StringBuffer sb = new StringBuffer(avgsz + var);
            if (var < 1) var = 1;
            int len = rand.nextInt(2 * var) + avgsz - var;
            for (int i = 0; i < len; i++) {
                int n = rand.nextInt(122 - 65) + 65;
                sb.append(Character.toChars(n));
            }
            return sb.toString();
        }

        public float randomFloat() {
            return rand.nextFloat() * rand.nextInt();
        }

        public double randomDouble() {
            return rand.nextDouble() * rand.nextInt();
        }
    }

    class UniformRandomGenerator extends RandomGenerator {
        int card;

        public UniformRandomGenerator(int a, int c) {
            avgsz = a;
            card = c;
        }

        public int nextInt(Map<Integer, Object> map) {
            return rand.nextInt(card);
        }

        public long nextLong(Map<Integer, Object> map) {
            return rand.nextLong() % card;
        }

        public float nextFloat(Map<Integer, Object> map) {
            int seed = rand.nextInt(card);
            Float f = (Float)map.get(seed);
            if (f == null) {
                if (!hasMapFile) {
                    f = randomFloat();
                    map.put(seed, f);
                }else{
                    throw new IllegalStateException("Number " + seed + " is not found in map file");
                }
            }
            return f;
        }

        public double nextDouble(Map<Integer, Object> map) {
            int seed = rand.nextInt(card);
            Double d = (Double)map.get(seed);
            if (d == null) {
                if (!hasMapFile) {
                    d = randomDouble();
                    map.put(seed, d);
                }else{
                    throw new IllegalStateException("Number " + seed + " is not found in map file");
                }
            }
            return d;
        }

        public String nextString(Map<Integer, Object> map) {
            int seed = rand.nextInt(card);
            String s = (String)map.get(seed);
            if (s == null) {
                if (!hasMapFile) {
                    s = randomString();
                    map.put(seed, s);
                }else{
                    throw new IllegalStateException("Number " + seed + " is not found in map file");
                }
            }
            return s;
        }

    }

    class ZipfRandomGenerator extends RandomGenerator {
        Zipf z;

        public ZipfRandomGenerator(int a, int c) {
            avgsz = a;
            z = new Zipf(c);
        }


        // the Zipf library returns a random number [1..cardinality], so we substract by 1
        // to get [0..cardinality)
        // the randome number returned by zipf library is an integer, but converted into double
        private double next() {
            return z.nextElement()-1;
        }

        public int nextInt(Map<Integer, Object> map) {
            return (int)next();
        }

        public long nextLong(Map<Integer, Object> map) {
            return (long)next();
        }

        public float nextFloat(Map<Integer, Object> map) {
            int seed = (int)next();
            Float d = (Float)map.get(seed);
            if (d == null) {
                if (!hasMapFile) {
                    d = randomFloat();
                    map.put(seed, d);
                }else{
                    throw new IllegalStateException("Number " + seed + " is not found in map file");
                }
            }
            return d;
        }

        public double nextDouble(Map<Integer, Object> map) {
             int seed = (int)next();
             Double d = (Double)map.get(seed);
             if (d == null) {
                 if (!hasMapFile) {
                     d = randomDouble();
                     map.put(seed, d);
                 }else{
                     throw new IllegalStateException("Number " + seed + " is not found in map file");
                 }
             }
             return d;
        }

        public String nextString(Map<Integer, Object> map) {
            int seed = (int)next();
            String s = (String)map.get(seed);
            if (s == null) {
                if (!hasMapFile) {
                    s = randomString();
                    map.put(seed, s);
                }else{
                    throw new IllegalStateException("Number " + seed + " is not found in map file");
                }
            }
            return s;
        }
    }

    //  launch hadoop job
    class HadoopRunner {
        Random r;
        FileSystem fs;
        Path tmpHome;

        public HadoopRunner() {
            r = new Random();
        }

        public void goHadoop() throws IOException {
            // Configuration processed by ToolRunner
            Configuration conf = getConf();

            // Create a JobConf using the processed conf
            JobConf job = new JobConf(conf);
            fs = FileSystem.get(job);

            tmpHome = createTempDir(null);

            String config = genMapFiles().toUri().getRawPath();
            // set config properties into job conf
            job.set("fieldconfig", config);
            job.set("separator", String.valueOf((int)separator));


            job.setJobName("data-gen");
            job.setNumMapTasks(numMappers);
            job.setNumReduceTasks(0);
            job.setMapperClass(DataGenMapper.class);
            job.setJarByClass(DataGenMapper.class);

            // if inFile is specified, use it as input
            if (inFile != null) {
                FileInputFormat.setInputPaths(job, inFile);
                job.set("hasinput", "true");
           } else {
               job.set("hasinput", "false");
               Path input = genInputFiles();
               FileInputFormat.setInputPaths(job, input);
           }
           FileOutputFormat.setOutputPath(job, new Path(outputFile));

            // Submit the job, then poll for progress until the job is complete
            System.out.println("Submit hadoop job...");
            RunningJob j = JobClient.runJob(job);
            if (!j.isSuccessful()) {
                throw new IOException("Job failed");
            }

            if (fs.exists(tmpHome)) {
                fs.delete(tmpHome, true);
            }
         }

         private Path genInputFiles() throws IOException {
             long avgRows = numRows/numMappers;

             // create a temp directory as mappers input
             Path input = createTempDir(tmpHome);
             System.out.println("Generating input files into " + input.toString());

             long rowsLeft = numRows;

             // create one input file per mapper, which contains
             // the number of rows
             for(int i=0; i<numMappers; i++) {
                 Object[] tmp = createTempFile(input, false);
                 PrintWriter pw = new PrintWriter((OutputStream)tmp[1]);

                 if (i < numMappers-1) {
                     pw.println(avgRows);
                 }else{
                     // last mapper takes all the rows left
                     pw.println(rowsLeft);
                 }

                 pw.close();
                 rowsLeft -= avgRows;
             }

             return input;
         }

        // generate map files for all the fields that need to pre-generate map files
        // return a config file which contains config info for each field, including
        // the path to their map file
         private Path genMapFiles() throws IOException {
             Object[] tmp = createTempFile(tmpHome, false);

             System.out.println("Generating column config file in " + tmp[0].toString());
             PrintWriter pw = new PrintWriter((OutputStream)tmp[1]);
             for(int i=0; i<colSpecs.length; i++) {
                 DataGenerator.Datatype datatype = colSpecs[i].datatype;
                 pw.print(colSpecs[i].arg);

                 if ( datatype == DataGenerator.Datatype.FLOAT || datatype == DataGenerator.Datatype.DOUBLE ||
                         datatype == DataGenerator.Datatype.STRING)      {
                     Path p = genMapFile(colSpecs[i]);
                     pw.print(':');
                     pw.print(p.toUri().getRawPath());
                 }

                 pw.println();
             }

             pw.close();

             return (Path)tmp[0];
         }

         // genereate a map file between random number to field value
         // return the path of the map file
         private Path genMapFile(DataGenerator.ColSpec col) throws IOException {
             int card = col.card;
             Object[] tmp = createTempFile(tmpHome, false);

             System.out.println("Generating mapping file for column " + col.arg + " into " + tmp[0].toString());
             PrintWriter pw = new PrintWriter((OutputStream)tmp[1]);
             HashSet<Object> hash = new HashSet<Object>(card);
             for(int i=0; i<card; i++) {
                 pw.print(i);
                 pw.print("\t");
                 Object next = null;
                 do {
                     if (col.datatype == DataGenerator.Datatype.DOUBLE) {
                         next = col.gen.randomDouble();
                     }else if (col.datatype == DataGenerator.Datatype.FLOAT) {
                         next = col.gen.randomFloat();
                     }else if (col.datatype == DataGenerator.Datatype.STRING) {
                         next = col.gen.randomString();
                     }
                 }while(hash.contains(next));

                 hash.add(next);

                 pw.println(next);

                 if ( (i>0 && i%300000 == 0) || i == card-1 ) {
                     System.out.println("processed " + i*100/card + "%." );
                     pw.flush();
                 }
             }

             pw.close();

             return (Path)tmp[0];

         }

         private Path createTempDir(Path parentDir) throws IOException {
             Object[] obj = createTempFile(parentDir, true);
             return (Path)obj[0];
         }

         private Object[] createTempFile(Path parentDir, boolean isDir) throws IOException {
             Path tmp_home = parentDir;

             if (tmp_home == null) {
                 tmp_home = new Path(fs.getHomeDirectory(), "tmp");
             }

             if (!fs.exists(tmp_home)) {
                 fs.mkdirs(tmp_home);
             }

             int id = r.nextInt();
             Path f = new Path(tmp_home, "tmp" + id);
             while (fs.exists(f)) {
                 id = r.nextInt();
                 f = new Path(tmp_home, "tmp" + id);
             }

             // return a 2-element array. first element is PATH,
             // second element is OutputStream
             Object[] r = new Object[2];
             r[0] = f;
             if (!isDir) {
                 r[1] = fs.create(f);
             }else{
                 fs.mkdirs(f);
             }

             return r;
         }
    }

    public static class DataGenMapper extends MapReduceBase implements Mapper<LongWritable, Text, String, String> {
         private JobConf jobConf;
         private DataGenerator dg;
         private boolean hasInput;

         public void configure(JobConf jobconf) {
             this.jobConf = jobconf;

             int id = Integer.parseInt(jobconf.get(MRConfiguration.TASK_PARTITION));
             long time = System.currentTimeMillis() - id*3600*24*1000;

             dg = new DataGenerator( ((time-id*3600*24*1000) | (id << 48)));

             dg.separator = (char)Integer.parseInt(jobConf.get("separator"));

             if (jobConf.get("hasinput").equals("true")) {
                 hasInput = true;
             }

             String config = jobConf.get("fieldconfig");

             try {
                 FileSystem fs = FileSystem.get(jobconf);

                 // load in config file for each column
                 BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(new Path(config))));
                 String line = null;
                 List<DataGenerator.ColSpec> cols = new ArrayList<DataGenerator.ColSpec>();
                 while((line = reader.readLine()) != null) {
                     cols.add(dg.new ColSpec(line));
                 }
                 reader.close();
                 dg.colSpecs = cols.toArray(new DataGenerator.ColSpec[0]);

                 // load in mapping files
                 for(int i=0; i<dg.colSpecs.length; i++) {
                     DataGenerator.ColSpec col = dg.colSpecs[i];
                     if (col.mapfile != null) {
                         reader = new BufferedReader(new InputStreamReader(fs.open(new Path(col.mapfile))));
                         Map<Integer, Object> map = dg.colSpecs[i].map;
                         while((line = reader.readLine()) != null) {
                             String[] fields = line.split("\t");
                             int key = Integer.parseInt(fields[0]);
                             if (col.datatype == DataGenerator.Datatype.DOUBLE) {
                                 map.put(key, Double.parseDouble(fields[1]));
                             }else if (col.datatype == DataGenerator.Datatype.FLOAT) {
                                 map.put(key, Float.parseFloat(fields[1]));
                             }else {
                                 map.put(key, fields[1]);
                             }
                         }

                         reader.close();
                     }
                 }
             }catch(IOException e) {
                 throw new RuntimeException("Failed to load config file. " + e);
             }
         }

         public void map(LongWritable key, Text value, OutputCollector<String, String> output, Reporter reporter) throws IOException {
             int intialsz = dg.colSpecs.length * 50;

             if (!hasInput) {
                 long numRows = Long.parseLong(value.toString().trim());
                  dg.numRows = numRows;

                  for (int i = 0; i < numRows; i++) {
                      StringWriter str = new StringWriter(intialsz);
                      PrintWriter pw = new PrintWriter(str);
                      dg.writeLine(pw);
                      output.collect(null, str.toString());

                      if ((i+1) % 10000 == 0) {
                          reporter.progress();
                          reporter.setStatus("" + (i+1) + " tuples generated.");
                      }
                  }
             } else {
                 StringWriter str = new StringWriter(intialsz);
                 PrintWriter pw = new PrintWriter(str);
                 pw.write(value.toString());
                 dg.writeLine(pw);
                 output.collect(null, str.toString());
             }
         }
     }
}
TOP

Related Classes of org.apache.pig.test.pigmix.datagen.DataGenerator$UniformRandomGenerator

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.