Package com.ikanow.infinit.e.processing.custom.utils

Source Code of com.ikanow.infinit.e.processing.custom.utils.InfiniteHadoopUtils

/*******************************************************************************
* Copyright 2012, The Infinit.e Open Source Project.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
******************************************************************************/
package com.ikanow.infinit.e.processing.custom.utils;

import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.LinkedList;
import java.util.List;

import javax.xml.parsers.ParserConfigurationException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.bson.types.ObjectId;
import org.xml.sax.SAXException;

import com.ikanow.infinit.e.data_model.store.DbManager;
import com.ikanow.infinit.e.data_model.store.MongoDbManager;
import com.ikanow.infinit.e.data_model.store.config.source.SourcePojo;
import com.ikanow.infinit.e.data_model.store.custom.mapreduce.CustomMapReduceJobPojo;
import com.ikanow.infinit.e.data_model.store.social.person.PersonCommunityPojo;
import com.ikanow.infinit.e.data_model.store.social.person.PersonPojo;
import com.ikanow.infinit.e.data_model.store.social.sharing.SharePojo;
import com.ikanow.infinit.e.data_model.store.social.sharing.SharePojo.ShareCommunityPojo;
import com.mongodb.BasicDBList;
import com.mongodb.BasicDBObject;
import com.mongodb.DBObject;
import com.mongodb.gridfs.GridFSDBFile;

public class InfiniteHadoopUtils {

  final public static long MS_IN_DAY = 86400000;
  final public static long SECONDS_60 = 60000;
 
  private static final String BUILT_IN_JOB_PATH = "file:///opt/infinite-home/lib/plugins/infinit.e.hadoop.prototyping_engine.jar";
  private static final String BUILT_IN_JOB_NAME = "infinit.e.hadoop.prototyping_engine.jar";

  /**
   * Takes the query argument from a CustomMapReduceJobPojo
   * and returns either the query or post processing part
   *
   * @param query
   * @param wantQuery
   * @return
   */
  public enum QuerySpec { QUERY, POSTPROC, INPUTFIELDS };
 
  public static String getQueryOrProcessing(String query, QuerySpec querySpec)
  {
    if ( query.equals("") || query.equals("null") || query == null )
      query = "{}";
    DBObject dbo = null;
    try
    {
      dbo = (DBObject) com.mongodb.util.JSON.parse(query);
      BasicDBList dbl = (BasicDBList)dbo;
      //is a list
      if ( querySpec == QuerySpec.QUERY )
      {
        return dbl.get(0).toString();
      }
      else if ( querySpec == QuerySpec.POSTPROC )
      {
        if ( dbl.size() > 1 ) {
          if (null == dbl.get(1)) // (only query and fields are specified)
            return null;
          else
            return dbl.get(1).toString();
        }
        else
          return null;
      }
      else if ( querySpec == QuerySpec.INPUTFIELDS )
      {
        if ( dbl.size() > 2 )
          return dbl.get(2).toString();
        else
          return null;
      }
      else
        return null;
    }
    catch (Exception ex)
    {
      try
      {
        //is just a an object
        if ( querySpec == QuerySpec.QUERY )
          return dbo.toString();
        else if ( querySpec == QuerySpec.INPUTFIELDS )
          return ((BasicDBObject) dbo.get("$fields")).toString();
        else if ( querySpec == QuerySpec.POSTPROC )
          return ((BasicDBObject) dbo.get("$output")).toString();
        else
          return null;
      }
      catch (Exception e) // (malformed query gets you here)
      {
        if ( querySpec == QuerySpec.QUERY )
          throw new RuntimeException("Malformed query: " + query);
        else
          return null;
      }
    }
  }
 
  /**
   * Downloads jar file from web using URL call.  Typically
   * the jar files we be kept in our /share store so we will
   * be calling our own api.
   *
   * @param jarURL
   * @return
   * @throws Exception
   */
  public static String downloadJarFile(String jarURL, List<ObjectId> communityIds, PropertiesManager prop_custom, ObjectId submitterId) throws Exception
  {   
    String shareStringOLD = "$infinite/share/get/";
    String shareStringNEW = "$infinite/social/share/get/";
    //jar is local use id to grab jar (skips authentication)
    String shareid = null;
    try {
      new ObjectId(jarURL);
      shareid = jarURL;
    }
    catch (Exception e) {} // that's fine it's just not a raw ObjectId

    if ( jarURL.startsWith(shareStringOLD) || jarURL.startsWith(shareStringNEW) || (null != shareid) )
    {
      if (null == shareid) {
        if ( jarURL.startsWith(shareStringOLD) )
        {
          shareid = jarURL.substring(shareStringOLD.length());
        }
        else
        {
          shareid = jarURL.substring(shareStringNEW.length());
        }
      }
      BasicDBObject query = new BasicDBObject(SharePojo._id_, new ObjectId(shareid));
      query.put(ShareCommunityPojo.shareQuery_id_, new BasicDBObject(MongoDbManager.in_, communityIds));

      SharePojo share = SharePojo.fromDb(DbManager.getSocial().getShare().findOne(query),SharePojo.class);
     
      if (null == share) {
        throw new RuntimeException("Can't find JAR file or share or custom table or source, or insufficient permissions");
      }
     
      // The JAR owner needs to be an admin:
      //TODO (INF-2118): At some point would like there to be a choice ... if not admin then must inherit the Infinit.e sandbox version
      // ... there seemed to be some issues with that however so for now will just allow all admin jars and no non-admin jars
      // (see other INF-2118 branch)
      if (prop_custom.getHarvestSecurity()) {
        if (!AuthUtils.isAdmin(share.getOwner().get_id())) {
          throw new RuntimeException("Permissions error: only administrators can upload custom JARs");
        }
      }//TESTED (by hand)
           
      String extension = ".cache";
      if ((null != share.getMediaType()) && (share.getMediaType().contains("java-archive"))) {
        extension = ".cache.jar";
      }
      else if ((null != share.getMediaType()) && (share.getMediaType().contains("gzip"))) {
        extension = ".cache.tgz";
      }
      else if ((null != share.getMediaType()) && (share.getMediaType().contains("zip"))) {
        extension = ".cache.zip";
      }
      String tempFileName = assignNewJarLocation(prop_custom, shareid + extension);
      File tempFile = new File(tempFileName);
     
      // Compare dates (if it exists) to see if we need to update the cache)
     
      if (!tempFile.exists() || (tempFile.lastModified() < share.getModified().getTime())) {
        OutputStream out = new BufferedOutputStream(new FileOutputStream(tempFileName));
        if ( share.getBinaryId() != null )
        {     
          GridFSDBFile file = DbManager.getSocial().getShareBinary().find(share.getBinaryId());           
          file.writeTo(out);       
        }
        else
        {
          out.write(share.getBinaryData());
        }
      }//TESTED
     
      return tempFileName;
    }
    else // Not an infinit.e share - either a local file or served externally
    {
      if (jarURL.startsWith("$infinite")) { // Local web server
        if (prop_custom.getHarvestSecurity()) {
          if (!AuthUtils.isAdmin(submitterId)) {
            throw new RuntimeException("Permissions error: only administrators can run custom JARs served from a web server (users can run custom JARs when uploaded by an admin to the share store)");
          }
        }//TOTEST       
        jarURL = jarURL.replace("$infinite", "http://localhost:8080");
      }//TESTED (by hand)
      else if (!jarURL.startsWith("http")) {
        // Can't access the file system, except for this one nominated file:
        if (!jarURL.equals(BUILT_IN_JOB_PATH)) {
          throw new RuntimeException("Can't find JAR file or insufficient permissions");
        }
        jarURL = BUILT_IN_JOB_PATH.substring(7);
        if (!(new File(jarURL).exists())) { // (this is really only when debugging)
          // Try looking in temp path
          jarURL = assignNewJarLocation(prop_custom, BUILT_IN_JOB_NAME);
        }
        return jarURL;
      }//TESTED
      else { // Access a JAR from an external web server, can only do this if admin
        if (prop_custom.getHarvestSecurity()) {
          if (!AuthUtils.isAdmin(submitterId)) {
            throw new RuntimeException("Permissions error: only administrators can run custom JARs served from a web server (users can run custom JARs when uploaded by an admin to the share store)");
          }
        }//TOTEST       
      }//TESTED (by hand)
     
      String tempFileName = assignNewJarLocation(prop_custom, null);
      OutputStream out = new BufferedOutputStream(new FileOutputStream(tempFileName));
     
      //download jar from external site
      URL url = new URL(jarURL);
     
      URLConnection ucon = url.openConnection();
      InputStream in = ucon.getInputStream();
      byte[] buf = new byte[1024];
      int byteRead = 0;
      while ((byteRead = in.read(buf)) != -1 )
      {
        out.write(buf,0,byteRead);       
      }
      in.close();
      out.close();
      return tempFileName;
    }//(end share - first clause - or served externally - second clause)
  }

  /**
   * Gets a user's communities from his user id
   *
   * @param submitterId
   * @return
   * @throws Exception
   */
  public static List<ObjectId> getUserCommunities(ObjectId submitterId) {
    // Set up the query
    PersonPojo personQuery = new PersonPojo();
    personQuery.set_id(submitterId);
   
    BasicDBObject dbo = (BasicDBObject) DbManager.getSocial().getPerson().findOne(personQuery.toDb());
    PersonPojo person = PersonPojo.fromDb(dbo, PersonPojo.class);
   
    if (null == person) {
      throw new RuntimeException("User no longer exists?");
    }
    if ((null == person.getCommunities()) || person.getCommunities().isEmpty()) {
      throw new RuntimeException("Corrupt user, no community access?");
    }
    ArrayList<ObjectId> retVal = new ArrayList<ObjectId>(person.getCommunities().size());
    for (PersonCommunityPojo personInfo: person.getCommunities()) {
      retVal.add(personInfo.get_id());
    }
    return retVal;
  }

  /**
   * Returns a new xml file name following the format
   * tempConfigXXXX.xml where XXXX is the next incrementing
   * number in the directory.
   *
   * @return a unique filename for the config file.
   */
  public static String assignNewConfigLocation(PropertiesManager prop_custom)
  {   
    String dirname = prop_custom.getHadoopConfigPath() + "/xmlFiles/";
    File dir = new File(dirname);
    if ( !dir.exists() )
      dir.mkdir();
    String prefix = "tempConfig";
    String suffix = ".xml";
   
    String lastFile = "tempConfig000000.xml";
    String[] filenames = dir.list();
    if ( filenames.length > 0 )
       lastFile = filenames[filenames.length-1];
    String increment = lastFile.replaceFirst(prefix, "");
    increment = increment.replaceFirst(suffix, "");
    //add 1 to increment, and add leading 0's to keep in order
    String nextNumber = (Integer.parseInt(increment) + 1) + "";
    String zeros = "000000" + nextNumber;
    String newincrement = zeros.substring(zeros.length()-6);
       
    return dirname + prefix + newincrement + suffix;
  }
 
  /**
   * Returns a new jar file name following the format
   * tempJarXXXX.jar where XXXX is the next incrementing
   * number in the directory.
   *
   * @return a unique filename for the jar file.
   */
  public static String assignNewJarLocation(PropertiesManager prop_custom, String nameOverride)
  {   
    String dirname = prop_custom.getHadoopConfigPath() + "/jars/";
    File dir = new File(dirname);
    if ( !dir.exists() )
      dir.mkdir();
   
    if (null == nameOverride) {
   
      String prefix = "tempJar";
      String suffix = ".jar";
     
      String lastFile = "tempJar000000.jar";
      String[] filenames = dir.list();
      if ( filenames.length > 0 )
         lastFile = filenames[filenames.length-1];
      String increment = lastFile.replaceFirst(prefix, "");
      increment = increment.replaceFirst(suffix, "");
      //add 1 to increment, and add leading 0's to keep in order
      String nextNumber = (Integer.parseInt(increment) + 1) + "";
      String zeros = "000000" + nextNumber;
      String newincrement = zeros.substring(zeros.length()-6);
     
      return dirname + prefix + newincrement + suffix;
    }
    else {
      return dirname + nameOverride;     
    }
  }
  /**
   * Removes the config file that is not being used anymore.
   *
   * @param file
   */
  public static void removeTempFile(String file)
  {
    if (( file != null ) && !file.contains(".cache") && !file.endsWith(BUILT_IN_JOB_NAME))
    {
      File f = new File(file);
      f.delete();
    }
  }
 
  /**
   * Exception message generation
   *
   */
  public static StringBuffer createExceptionMessage(Exception e) {
    return createExceptionMessage(null, e);
  }
  public static StringBuffer createExceptionMessage(String prefix, Exception e) {
    StackTraceElement[] st = e.getStackTrace();
    StringBuffer errMessage = new StringBuffer();
    if (null != prefix) {
      errMessage.append(prefix).append(':');
    }
    errMessage.append((e.getMessage()==null?"NullPointerException":e.getMessage())).append(':');
    if (st.length > 0) {
      errMessage.append(st[0].getClassName()).append('.').append(st[0].getMethodName()).append(':').append(st[0].getLineNumber());
    }           
    return errMessage;
  }
 
  /**
   * Calls the XML Parser to grab the job client address and opens a connection to
   * the server.  The parameters must be in the hadoopconfig/mapred-site.xml file
   * under the property "mapred.job.tracker"
   *
   * @return Connection to the job client
   * @throws SAXException
   * @throws IOException
   * @throws ParserConfigurationException
   */
  public static InetSocketAddress getJobClientConnection(PropertiesManager prop_custom) throws SAXException, IOException, ParserConfigurationException
  {
    String jobclientAddress = HadoopUtils.getXMLProperty(prop_custom.getHadoopConfigPath() + "/hadoop/mapred-site.xml", "mapred.job.tracker");
    String[] parts = jobclientAddress.split(":");
    String hostname = parts[0];
    int port = Integer.parseInt(parts[1]);   
    return new InetSocketAddress(hostname, port);
  }
 
  /////////////////////////////////////////////////////////////////////////////
  /////////////////////////////////////////////////////////////////////////////
 
  // Some HDFS utilities
 
  // Ensure the input directory is allowed, exceptions out if not allowed
 
  public static String authenticateInputDirectory(CustomMapReduceJobPojo cmr, String path) {
    if (path.startsWith("hdfs://")) {
      path = path.substring(7);
    }
    if (path.startsWith("hdfs:")) {
      path = path.substring(5);
    }
    if (path.startsWith("/user/tomcat/")) { // the home directory from which everything is run     
      path = path.substring(13);
    }
    if (path.startsWith("/")) {     
      path = path.substring(1);
    }
    String[] pathCheck = path.split("/", 3);
    if (pathCheck.length > 1) {
      for (ObjectId communityId: cmr.communityIds) { 
        if (pathCheck[1].contains(communityId.toString())) {       
          return path;
        }
      }
    }
    throw new RuntimeException("Access to this directory is not authenticated: the second directory level must contain a matching community ID - eg 'completed/50bcd6fffbf0fd0b27875a7c/', 'input/52b1be6145ce02c2c6fbab9e,50bcd6fffbf0fd0b27875a7c/' etc");
  }
 
  // Create an output directory
 
  public static Path ensureOutputDirectory(CustomMapReduceJobPojo cmr, PropertiesManager prop_custom) throws IOException, SAXException, ParserConfigurationException {
    Configuration config = HadoopUtils.getConfiguration(prop_custom);
    Path path = HadoopUtils.getPathForJob(cmr, config, true);
   
    FileSystem fs = FileSystem.get(config);
    if (fs.exists(path)) { // delete it
      fs.delete(path, true); // (might be dir => recursive)
    }
    // (don't create the dir, this all gets sorted out by the reducer)
    return path;
  }
 
  public static void bringTempOutputToFront(CustomMapReduceJobPojo cmr, PropertiesManager prop_custom) throws IOException, SAXException, ParserConfigurationException {
    // Get the names:
    Configuration config = HadoopUtils.getConfiguration(prop_custom);
    FileSystem fs = FileSystem.get(config);
    Path pathTmp = HadoopUtils.getPathForJob(cmr, config, true);
    Path pathFinal = HadoopUtils.getPathForJob(cmr, config, false);
   
    // OK don't do anything if pathTmp doesn't exist...
    if (fs.exists(pathTmp)) {
      // If the final path exists, delete it
     
      if (!fs.exists(pathFinal)) { // create it, which guarantees the parent path also exists
        //(otherwise the rename fails sigh)
        fs.mkdirs(pathFinal);
      }
      fs.delete(pathFinal, true);
      fs.rename(pathTmp, pathFinal);
    }
  }
 
  // Cache a local file so can be used in a distributed cache
 
  public static Path cacheLocalFile(String localPath, String localName, Configuration config) throws IOException {
    FileSystem fs = FileSystem.get(config);
    Path toDir = new Path("cache");
    Path destFile = new Path("cache/" + localName);
    File fromFile = new File(localPath + "/" + localName);
    if (!fromFile.exists()) {
      throw new IOException("Source file does not exist: " + fromFile.toString());
    }
    boolean needToCopyFile = true;
    if (!fs.exists(toDir)) { // (ie relative to WD)
      fs.mkdirs(toDir);
    }
    else {
      // Now check if the file already exists
      if (fs.exists(destFile)) {
        FileStatus fsStat = fs.getFileStatus(destFile);
        if ((fsStat.getLen() == fromFile.length())
            &&
            (fromFile.lastModified() <= fsStat.getModificationTime()))
        {
          needToCopyFile = false;
        }
      }
    }
    if (needToCopyFile) {
      fs.copyFromLocalFile(false, true, new Path(localPath + "/" + localName), destFile);
    }
    return new Path(fs.getFileStatus(destFile).getPath().toUri().getPath());
      // (apparently the path has to be in absolute format without even the hdfs:// at the front?!)
  }//TESTED
 
  // Handle a list of cache files of various types:
  // - shares
  // - files
  // (returns a list of cached JAR files for local mode running - you're on your own for other cached files though)
 
  @SuppressWarnings("rawtypes")
  public static List<URL> handleCacheList(Object cacheFileList, CustomMapReduceJobPojo job, Configuration config, PropertiesManager prop_custom) throws MalformedURLException, IOException, Exception {
    if (null == cacheFileList) {
      return null;
    }
    LinkedList<URL> localJarCache = null;
   
    Collection cacheFiles = null;
    if (cacheFileList instanceof String) { // comma separated list
      String[] cacheFilesArray = ((String) cacheFileList).split("\\s*,\\s*");
      cacheFiles = Arrays.asList(cacheFilesArray);
    }
    else {
      cacheFiles = (Collection) cacheFileList;
    }   
    for (Object cache: cacheFiles) {
      String cacheStr = (String) cache;
      ObjectId cacheId = null;
      try {
        cacheId = new ObjectId(cacheStr);
      }
      catch (Exception e) {} // fine
     
      FileSystem fs = null;
     
      if ((null != cacheId) || cacheStr.startsWith("http:") || cacheStr.startsWith("https:") || cacheStr.startsWith("$")) {
        if (null != cacheId) { // this might be a custom cache in which case just bypass all this, handled in the main list
          BasicDBObject query = new BasicDBObject(CustomMapReduceJobPojo._id_, cacheId);
          query.put(CustomMapReduceJobPojo.communityIds_, new BasicDBObject(DbManager.in_, job.communityIds));
          if (null != DbManager.getCustom().getLookup().findOne(query)) {
            continue; // carry on...)
          }//TESTED
          query = new BasicDBObject(SourcePojo._id_, cacheId);
          query.put(SourcePojo.communityIds_, new BasicDBObject(DbManager.in_, job.communityIds));
          if (null != DbManager.getIngest().getSource().findOne(query)) {
            continue; // carry on...)
          }//TESTED
        }       
       
        // Use existing code to cache to local fs (and then onwards to HDFS!)
        URL localPathURL = new File(downloadJarFile(cacheStr, job.communityIds, prop_custom, job.submitterID)).toURI().toURL();
        String localPath = localPathURL.getPath();
        String pathMinusName = localPath.substring(0, localPath.lastIndexOf('/') + 1);
        String name = localPath.substring(localPath.lastIndexOf('/') + 1);
        Path distPath = cacheLocalFile(pathMinusName, name, config);
        if (name.endsWith(".jar")) {
          if (null == localJarCache) {
            localJarCache = new LinkedList<URL>();
          }
          localJarCache.add(localPathURL);
          DistributedCache.addFileToClassPath(distPath, config);
        }//TESTED
        else if (name.endsWith(".zip") || name.endsWith("gz")) {
          DistributedCache.addCacheArchive(distPath.toUri(), config);
        }//TESTED
        else {
          DistributedCache.addCacheFile(distPath.toUri(), config);
        }//TESTED
      }
      else { // this is the location of a file (it is almost certainly an input/output path)
        // Oh could also be sourceKey or custom job title
        BasicDBObject query = new BasicDBObject(CustomMapReduceJobPojo.jobtitle_, cacheStr);
        query.put(CustomMapReduceJobPojo.communityIds_, new BasicDBObject(DbManager.in_, job.communityIds));
        if (null != DbManager.getCustom().getLookup().findOne(query)) {
          continue; // carry on...)
        }//TESTED
        query = new BasicDBObject(SourcePojo.key_, cacheStr);
        query.put(SourcePojo.communityIds_, new BasicDBObject(DbManager.in_, job.communityIds));
        if (null != DbManager.getIngest().getSource().findOne(query)) {
          continue; // carry on...)
        }//TESTED       
       
        String path = authenticateInputDirectory(job, cacheStr);
        if (null == fs) {
          fs = FileSystem.get(config);
        }
        Path distPath = new Path(fs.getFileStatus(new Path(path)).getPath().toUri().getPath());
        if (path.endsWith(".jar")) {
          DistributedCache.addFileToClassPath(distPath, config);
        }//TESTED
        else if (path.endsWith(".zip") || path.endsWith("gz")) {
          DistributedCache.addCacheArchive(distPath.toUri(), config);
        }//TESTED
        else {
          DistributedCache.addCacheFile(distPath.toUri(), config);
        }//TESTED
      }//TESTED
    }   
    return localJarCache;
  }//TOTEST (logically TESTED but needs local testing)
 
}
TOP

Related Classes of com.ikanow.infinit.e.processing.custom.utils.InfiniteHadoopUtils

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.