Package com.endgame.binarypig.loaders

Source Code of com.endgame.binarypig.loaders.AbstractExecutingLoader

/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*
*  Copyright 2013 Endgame Inc.
*
*/


package com.endgame.binarypig.loaders;

import java.io.File;
import java.io.IOException;

import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.data.Tuple;

import com.endgame.binarypig.util.ProgramExector;
import com.endgame.binarypig.util.StreamUtils;
import com.google.common.base.Stopwatch;

/**
* Hello world!
*
*/
public abstract class AbstractExecutingLoader extends AbstractFileDroppingLoader
{
  String script = null;
  File programFile;
 
  Stopwatch tupleCreationOverhead = new Stopwatch();
 
  public AbstractExecutingLoader(String script){
    this(script, Long.toString(Long.MAX_VALUE));
  }
 
  public AbstractExecutingLoader(String script, String timeoutMS){
    this(script, timeoutMS, "false");   
  }
 
  public AbstractExecutingLoader(String script, String timeoutMS, String useDevShm){
    super(timeoutMS, useDevShm);
    this.script = script;
  }
 
  @Override
  public void prepareToRead(RecordReader reader, PigSplit split) throws IOException {
    super.prepareToRead(reader, split);
    ensureProgramFilePermissions();
  }
 
  public void ensureProgramFilePermissions()
  {
    programFile = new File(workingDir, this.script);
    if(programFile.exists()) {
      programFile.setExecutable(true);
      programFile.setReadable(true);
    }
    else{
      throw new RuntimeException("Program doesn't exist: "+programFile);
    }
  }
 
  String[] getCommand(File inputFile)
  {
    return new String[]{programFile.getPath(), inputFile.getPath()};
  }
 
  public void dumpStats()
  {
    System.err.println("script      = "+script);
    System.err.println("programFile = "+programFile);
    super.dumpStats();
  }

  @Override
  public Tuple processFile(Text key, BytesWritable value, File binaryFile) throws IOException{
   
    ProgramExector exec = new ProgramExector(getCommand(binaryFile), timeoutMS);
    exec.start();
    waitOnProgramExecutor(exec);
    String output = StreamUtils.drainInputStream(exec.getStdout());
    exec.closeStreams();
   
    tupleCreationOverhead.start();
    Tuple tuple = outputToTuple(key, value, output, exec.isTimedOut());
    tupleCreationOverhead.stop();
    return tuple;
  }
 
  public abstract Tuple outputToTuple(Text key, BytesWritable value, String output, boolean timedOut);

  void waitOnProgramExecutor(ProgramExector exec) throws IOException
  {
    while(exec.isAlive())
    { 
      try {
        Thread.sleep(100);
      } catch (InterruptedException e){
        throw new RuntimeException(e);
      }
      System.err.print(StreamUtils.drainInputStream(exec.getStderr()));
    }// end while
   
    System.err.print(StreamUtils.drainInputStream(exec.getStderr()));
    try {
      exec.join();
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }
 
  public File getProgramFile() {
    return programFile;
  }
 
  public String getScript() {
    return script;
 
}
TOP

Related Classes of com.endgame.binarypig.loaders.AbstractExecutingLoader

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.