Package com.twitter.pycascading

Source Code of com.twitter.pycascading.Util

/**
* Copyright 2011 Twitter, Inc.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.twitter.pycascading;

import java.io.IOException;
import java.net.URISyntaxException;
import java.util.Map;
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;

import cascading.flow.Flow;
import cascading.flow.FlowConnector;
import cascading.flow.FlowListener;
import cascading.pipe.Pipe;
import cascading.tap.Tap;

/**
* Helper cass that sets up the MR environment and runs a Cascading Flow.
*
* @author Gabor Szabo
*/
public class Util {
  // http://www.velocityreviews.com/forums/t147526-how-to-get-jar-file-name.html
  /**
   * Get the temporary folder where the job jar was extracted to by Hadoop.
   *
   * TODO: This only works if we distribute PyCascading as classes. If I switch
   * to using jars, I need to remove the last part of the path which is the jar
   * file.
   *
   * @return the temporary folder with the contents of the job jar
   */
  public static String getJarFolder() {
    try {
      return Util.class.getProtectionDomain().getCodeSource().getLocation().toURI().getPath();
    } catch (URISyntaxException e) {
      throw new RuntimeException("Could not get temporary job folder");
    }
  }

  /**
   * Get the Cascading jar file on the local file system.
   *
   * @return the file location on the Hadoop worker for the Cascading jar
   */
  public static String getCascadingJar() {
    try {
      return cascading.pipe.Pipe.class.getProtectionDomain().getCodeSource().getLocation().toURI()
              .getPath();
    } catch (URISyntaxException e) {
      throw new RuntimeException("Could not get the location of the Cascading jar");
    }
  }

  /**
   * We use the "pycascading.root" Java system property to store the location of
   * the Python sources for PyCascading. This is only used in local mode. This
   * is needed so that we know where to set the import path when we start up the
   * mappers and reducers.
   *
   * @param root
   *          the location of the PyCascading sources on the local file system
   */
  public static void setPycascadingRoot(String root) {
    System.setProperty("pycascading.root", root);
  }

  public static void run(int numReducers, Map<String, Object> config, Map<String, Tap> sources,
          Map<String, Tap> sinks, Pipe... tails) throws IOException, URISyntaxException {
    // String strClassPath = System.getProperty("java.class.path");
    // System.out.println("Classpath is " + strClassPath);

    Properties properties = new Properties();
    properties.put("mapred.reduce.tasks", numReducers);
    // Set this to change the default block size that is routed to one mapper
    // It won't help if the files are smaller than this as each file will go to
    // one mapper
    // properties.put("mapred.min.split.size", 20 * 1024 * 1024 * 1024L);
    // properties.put("mapred.map.tasks", 4000);
    // So that Thrift classes can be serialized
    // We need to add WritableSerialization otherwise sometimes Cascading and
    // Hadoop don't pick it up, and BigInteger serializations fail
    // See https://github.com/twitter/pycascading/issues/2
    // TODO: find the reason for this
    properties.put("io.serializations",
            "com.twitter.pycascading.bigintegerserialization.BigIntegerSerialization,"
                    + "org.apache.hadoop.io.serializer.WritableSerialization,"
                    + "com.twitter.pycascading.pythonserialization.PythonSerialization");
    properties.put("mapred.jobtracker.completeuserjobs.maximum", 50000);
    properties.put("mapred.input.dir.recursive", "true");

    // Set the running mode in the jobconf so that the mappers/reducers can
    // easily check this.
    String runningMode = (String) config.get("pycascading.running_mode");
    properties.setProperty("pycascading.running_mode", runningMode);
    properties.setProperty("pycascading.main_file", (String) config.get("pycascading.main_file"));

    Configuration conf = new Configuration();
    TemporaryHdfs tempDir = null;
    if ("hadoop".equals(runningMode)) {
      tempDir = new TemporaryHdfs();
      // We put the files to be distributed into the distributed cache
      // The pycascading.distributed_cache.archives variable was set by
      // bootstrap.py, based on the command line parameters where we specified
      // the PyCascading & source archives
      Object archives = config.get("pycascading.distributed_cache.archives");
      if (archives != null) {
        tempDir = new TemporaryHdfs();
        String tempDirLocation = tempDir.createTmpFolder(conf);
        String dests = null;
        for (String archive : (Iterable<String>) archives) {
          String dest = tempDir.copyFromLocalFileToHDFS(archive);
          dests = (dests == null ? dest : dests + "," + dest);
        }
        // Set the distributed cache to the files we just copied to HDFS
        //
        // This is an ugly hack, we should use DistributedCache.
        // DistributedCache however operates on a JobConf, and since
        // Cascading expects a Map, we cannot directly pass
        // in the parameters set into a JobConf.
        // TODO: see if a later version of Cascading can update its properties
        // using a JobConf
        properties.setProperty("mapred.cache.archives", dests);
        // This creates a symlink for each of the mappers/reducers to the
        // localized files, instead of copying them for each one. This way we
        // reduce the overhead for copying on one worker machine.
        // TODO: see the one just above
        properties.setProperty("mapred.create.symlink", "yes");
      }
    }

    FlowConnector.setApplicationJarClass(properties, Main.class);
    FlowConnector flowConnector = new FlowConnector(properties);
    Flow flow = flowConnector.connect(sources, sinks, tails);
    if ("hadoop".equals(runningMode)) {
      try {
        flow.addListener(tempDir);
      } catch (Exception e) {
        e.printStackTrace();
      }
    } else {
      try {
        flow.addListener(new FlowListener() {

          @Override
          public void onStarting(Flow flow) {
          }

          @Override
          public void onStopping(Flow flow) {
          }

          @Override
          public void onCompleted(Flow flow) {
          }

          @Override
          public boolean onThrowable(Flow flow, Throwable throwable) {
            throwable.printStackTrace();
            return false;
          }
        });
      } catch (Exception e) {
        e.printStackTrace();
      }
    }
    flow.complete();
  }
}
TOP

Related Classes of com.twitter.pycascading.Util

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.