Package org.apache.hama.bsp

Source Code of org.apache.hama.bsp.GroomServer

/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*     http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hama.bsp;

import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.PrintStream;
import java.lang.reflect.Constructor;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.LinkedBlockingQueue;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSError;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.DNS;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.RunJar;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hama.Constants;
import org.apache.hama.HamaConfiguration;
import org.apache.hama.ipc.MasterProtocol;
import org.apache.hama.ipc.WorkerProtocol;
import org.apache.log4j.LogManager;
import org.apache.zookeeper.KeeperException;

/**
* A Groom Server (shortly referred to as groom) is a process that performs bsp
* tasks assigned by BSPMaster. Each groom contacts the BSPMaster, and it takes
* assigned tasks and reports its status by means of periodical piggybacks with
* BSPMaster. Each groom is designed to run with HDFS or other distributed
* storages. Basically, a groom server and a data node should be run on one
* physical node.
*/
public class GroomServer implements Runnable, WorkerProtocol, BSPPeerProtocol {

  public static final Log LOG = LogFactory.getLog(GroomServer.class);
  private BSPPeer bspPeer;
  static final String SUBDIR = "groomServer";

  private volatile static int REPORT_INTERVAL = 1 * 1000;

  Configuration conf;

  // Constants
  static enum State {
    NORMAL, COMPUTE, SYNC, BARRIER, STALE, INTERRUPTED, DENIED
  };

  // Running States and its related things
  volatile boolean initialized = false;
  volatile boolean running = true;
  volatile boolean shuttingDown = false;
  boolean justInited = true;
  GroomServerStatus status = null;

  // Attributes
  String groomServerName;
  String localHostname;
  InetSocketAddress bspMasterAddr;
  private Instructor instructor;

  // Filesystem
  // private LocalDirAllocator localDirAllocator;
  Path systemDirectory = null;
  FileSystem systemFS = null;

  // Job
  private int failures;
  private int maxCurrentTasks = 1;
  Map<TaskAttemptID, TaskInProgress> tasks = new HashMap<TaskAttemptID, TaskInProgress>();
  /** Map from taskId -> TaskInProgress. */
  Map<TaskAttemptID, TaskInProgress> runningTasks = null;
  Map<TaskAttemptID, TaskInProgress> finishedTasks = null;
  Map<BSPJobID, RunningJob> runningJobs = null;

  // new nexus between GroomServer and BSPMaster
  // holds/ manage all tasks
  // List<TaskInProgress> tasksList = new
  // CopyOnWriteArrayList<TaskInProgress>();

  private String rpcServer;
  private Server workerServer;
  MasterProtocol masterClient;

  InetSocketAddress taskReportAddress;
  Server taskReportServer = null;

//  private BlockingQueue<GroomServerAction> tasksToCleanup = new LinkedBlockingQueue<GroomServerAction>();

  private class DispatchTasksHandler implements DirectiveHandler {

    public void handle(Directive directive) throws DirectiveException {
      GroomServerAction[] actions = ((DispatchTasksDirective) directive)
          .getActions();
      synchronized (bspPeer) {
        bspPeer.setAllPeerNames(((DispatchTasksDirective) directive)
            .getGroomServerPeers().values());
      }
      if (LOG.isDebugEnabled()) {
        LOG.debug("Got Response from BSPMaster with "
            + ((actions != null) ? actions.length : 0) + " actions");
      }
      if (actions != null) {
        for (GroomServerAction action : actions) {
          if (action instanceof LaunchTaskAction) {
            startNewTask((LaunchTaskAction) action);
          } else {

            // TODO Use the cleanup thread
            // tasksToCleanup.put(action);

            KillTaskAction killAction = (KillTaskAction) action;
            if (tasks.containsKey(killAction.getTaskID())) {
              TaskInProgress tip = tasks.get(killAction.getTaskID());
              tip.taskStatus.setRunState(TaskStatus.State.FAILED);
              try {
                tip.killAndCleanup(true);
              } catch (IOException ioe) {
                throw new DirectiveException("Error when killing a "
                    + "TaskInProgress.", ioe);
              }
            }
          }
        }
      }
    }
  }

  private class Instructor extends Thread {
    final BlockingQueue<Directive> buffer = new LinkedBlockingQueue<Directive>();
    final ConcurrentMap<Class<? extends Directive>, DirectiveHandler> handlers = new ConcurrentHashMap<Class<? extends Directive>, DirectiveHandler>();

    public void bind(Class<? extends Directive> instruction,
        DirectiveHandler handler) {
      handlers.putIfAbsent(instruction, handler);
    }

    public void put(Directive directive) {
      try {
        buffer.put(directive);
      } catch (InterruptedException ie) {
        LOG.error("Unable to put directive into queue.", ie);
        Thread.currentThread().interrupt();
      }
    }

    public void run() {
      while (true) {
        try {
          Directive directive = buffer.take();
          if (directive instanceof DispatchTasksDirective) {
            ((DirectiveHandler) handlers.get(DispatchTasksDirective.class))
                .handle(directive);
          } else {
            throw new RuntimeException("Directive is not supported."
                + directive);
          }
        } catch (InterruptedException ie) {
          LOG.error("Unable to retrieve directive from the queue.", ie);
          Thread.currentThread().interrupt();
        } catch (Exception e) {
          LOG.error("Fail to execute directive.", e);
        }
      }
    }
  }

  public GroomServer(Configuration conf) throws IOException {
    LOG.info("groom start");
    this.conf = conf;

    bspMasterAddr = BSPMaster.getAddress(conf);
    // FileSystem local = FileSystem.getLocal(conf);
    // this.localDirAllocator = new LocalDirAllocator("bsp.local.dir");
  }

  public synchronized void initialize() throws IOException {
    if (this.conf.get(Constants.PEER_HOST) != null) {
      this.localHostname = conf.get(Constants.PEER_HOST);
    }

    if (localHostname == null) {
      this.localHostname = DNS.getDefaultHost(conf.get("bsp.dns.interface",
          "default"), conf.get("bsp.dns.nameserver", "default"));
    }
    // check local disk
    checkLocalDirs(conf.getStrings("bsp.local.dir"));
    deleteLocalFiles("groomserver");

    // Clear out state tables
    this.tasks.clear();
    this.runningJobs = new TreeMap<BSPJobID, RunningJob>();
    this.runningTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
    this.finishedTasks = new LinkedHashMap<TaskAttemptID, TaskInProgress>();
    this.conf.set(Constants.PEER_HOST, localHostname);
    this.conf.set(Constants.GROOM_RPC_HOST, localHostname);
    bspPeer = new BSPPeer(conf);

    int rpcPort = -1;
    String rpcAddr = null;
    if (false == this.initialized) {
      rpcAddr = conf.get(Constants.GROOM_RPC_HOST,
          Constants.DEFAULT_GROOM_RPC_HOST);
      rpcPort = conf.getInt(Constants.GROOM_RPC_PORT,
          Constants.DEFAULT_GROOM_RPC_PORT);
      if (-1 == rpcPort || null == rpcAddr)
        throw new IllegalArgumentException("Error rpc address " + rpcAddr
            + " port" + rpcPort);
      this.workerServer = RPC.getServer(this, rpcAddr, rpcPort, conf);
      this.workerServer.start();
      this.rpcServer = rpcAddr + ":" + rpcPort;

      LOG.info("Worker rpc server --> " + rpcServer);
    }

    @SuppressWarnings("deprecation")
    String address = NetUtils.getServerAddress(conf,
        "bsp.groom.report.bindAddress", "bsp.groom.report.port",
        "bsp.groom.report.address");
    InetSocketAddress socAddr = NetUtils.createSocketAddr(address);
    String bindAddress = socAddr.getHostName();
    int tmpPort = socAddr.getPort();

    // RPC initialization
    // TODO numHandlers should be a ..
    this.taskReportServer = RPC.getServer(this, bindAddress, tmpPort, 10,
        false, this.conf);

    this.taskReportServer.start();

    // get the assigned address
    this.taskReportAddress = taskReportServer.getListenerAddress();
    this.conf.set("bsp.groom.report.address", taskReportAddress.getHostName()
        + ":" + taskReportAddress.getPort());
    LOG.info("GroomServer up at: " + this.taskReportAddress);

    this.groomServerName = "groomd_" + bspPeer.getPeerName().replace(':', '_');
    LOG.info("Starting groom: " + this.groomServerName);

    DistributedCache.purgeCache(this.conf);

    // establish the communication link to bsp master
    this.masterClient = (MasterProtocol) RPC.waitForProxy(MasterProtocol.class,
        MasterProtocol.versionID, bspMasterAddr, conf);

    // enroll in bsp master
    if (-1 == rpcPort || null == rpcAddr)
      throw new IllegalArgumentException("Error rpc address " + rpcAddr
          + " port" + rpcPort);
    if (!this.masterClient.register(new GroomServerStatus(groomServerName,
        bspPeer.getPeerName(), cloneAndResetRunningTaskStatuses(), failures,
        maxCurrentTasks, this.rpcServer))) {
      LOG.error("There is a problem in establishing communication"
          + " link with BSPMaster");
      throw new IOException("There is a problem in establishing"
          + " communication link with BSPMaster.");
    }

    this.instructor = new Instructor();
    this.instructor.bind(DispatchTasksDirective.class,
        new DispatchTasksHandler());
    instructor.start();
    this.running = true;
    this.initialized = true;
  }

  /** Return the port at which the tasktracker bound to */
  public synchronized InetSocketAddress getTaskTrackerReportAddress() {
    return taskReportAddress;
  }

  @Override
  public void dispatch(Directive directive) throws IOException {
    if (!instructor.isAlive())
      throw new IOException();

    instructor.put(directive);
  }

  private static void checkLocalDirs(String[] localDirs)
      throws DiskErrorException {
    boolean writable = false;

    LOG.info(localDirs);

    if (localDirs != null) {
      for (int i = 0; i < localDirs.length; i++) {
        try {
          LOG.info(localDirs[i]);
          DiskChecker.checkDir(new File(localDirs[i]));
          writable = true;
        } catch (DiskErrorException e) {
          LOG.warn("BSP Processor local " + e.getMessage());
        }
      }
    }

    if (!writable)
      throw new DiskErrorException("all local directories are not writable");
  }

  public String[] getLocalDirs() {
    return conf.getStrings("bsp.local.dir");
  }

  public void deleteLocalFiles() throws IOException {
    String[] localDirs = getLocalDirs();
    for (int i = 0; i < localDirs.length; i++) {
      FileSystem.getLocal(this.conf).delete(new Path(localDirs[i]), true);
    }
  }

  public void deleteLocalFiles(String subdir) throws IOException {
    try {
      String[] localDirs = getLocalDirs();
      for (int i = 0; i < localDirs.length; i++) {
        FileSystem.getLocal(this.conf).delete(new Path(localDirs[i], subdir),
            true);
      }
    } catch (NullPointerException e) {
      LOG.info(e);
    }
  }

  public void cleanupStorage() throws IOException {
    deleteLocalFiles();
  }

  private void startCleanupThreads() throws IOException {

  }

  public State offerService() throws Exception {
    while (running && !shuttingDown) {
      try {

        // Reports to a BSPMaster
        for (Map.Entry<TaskAttemptID, TaskInProgress> e : runningTasks
            .entrySet()) {
          Thread.sleep(REPORT_INTERVAL);
          TaskInProgress tip = e.getValue();
          TaskStatus taskStatus = tip.getStatus();
          taskStatus.setProgress(bspPeer.getSuperstepCount());

          if (bspPeer.getLocalQueueSize() == 0
              && bspPeer.getOutgoingQueueSize() == 0 && !tip.runner.isAlive()) {
            if (taskStatus.getRunState() != TaskStatus.State.FAILED) {
              taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
            }
            taskStatus.setPhase(TaskStatus.Phase.CLEANUP);
          }

          doReport(taskStatus);
        }

        Thread.sleep(REPORT_INTERVAL);
      } catch (InterruptedException ie) {
      }

      try {
        if (justInited) {
          String dir = masterClient.getSystemDir();
          if (dir == null) {
            LOG.error("Fail to get system directory.");
            throw new IOException("Fail to get system directory.");
          }
          systemDirectory = new Path(dir);
          systemFS = systemDirectory.getFileSystem(conf);
        }
        justInited = false;
      } catch (DiskErrorException de) {
        String msg = "Exiting groom server for disk error:\n"
            + StringUtils.stringifyException(de);
        LOG.error(msg);

        return State.STALE;
      } catch (RemoteException re) {
        return State.DENIED;
      } catch (Exception except) {
        String msg = "Caught exception: "
            + StringUtils.stringifyException(except);
        LOG.error(msg);
      }
    }
    return State.NORMAL;
  }

  private void startNewTask(LaunchTaskAction action) {
    Task t = action.getTask();
    BSPJob jobConf = null;
    try {
      jobConf = new BSPJob(t.getJobID(), t.getJobFile());
    } catch (IOException e1) {
      LOG.error(e1);
    }

    TaskInProgress tip = new TaskInProgress(t, jobConf, this.groomServerName);

    synchronized (this) {
      tasks.put(t.getTaskID(), tip);
      runningTasks.put(t.getTaskID(), tip);
    }

    try {
      localizeJob(tip);
    } catch (Throwable e) {
      String msg = ("Error initializing " + tip.getTask().getTaskID() + ":\n" + StringUtils
          .stringifyException(e));
      LOG.warn(msg);
    }
  }

  /**
   * Update and report refresh status back to BSPMaster.
   */
  public void doReport(TaskStatus taskStatus) {
    GroomServerStatus groomStatus = new GroomServerStatus(groomServerName,
        bspPeer.getPeerName(), updateTaskStatus(taskStatus), failures,
        maxCurrentTasks, rpcServer);
    try {
      boolean ret = masterClient.report(new ReportGroomStatusDirective(
          groomStatus));
      if (!ret) {
        LOG.warn("Fail to renew BSPMaster's GroomServerStatus. "
            + " groom name: " + groomStatus.getGroomName() + " peer name:"
            + groomStatus.getPeerName() + " rpc server:" + rpcServer);
      }
    } catch (IOException ioe) {
      LOG.error("Fail to communicate with BSPMaster for reporting.", ioe);
    }
  }

  public List<TaskStatus> updateTaskStatus(TaskStatus taskStatus) {
    List<TaskStatus> tlist = new ArrayList<TaskStatus>();
    synchronized (runningTasks) {

      if (taskStatus.getRunState() == TaskStatus.State.SUCCEEDED
          || taskStatus.getRunState() == TaskStatus.State.FAILED) {
        synchronized (finishedTasks) {
          TaskInProgress tip = runningTasks.remove(taskStatus.getTaskId());
          tlist.add((TaskStatus) taskStatus.clone());
          finishedTasks.put(taskStatus.getTaskId(), tip);
        }
      } else if (taskStatus.getRunState() == TaskStatus.State.RUNNING) {
        tlist.add((TaskStatus) taskStatus.clone());
      }

    }
    return tlist;
  }

  private void localizeJob(TaskInProgress tip) throws IOException {
    Task task = tip.getTask();
    conf.addResource(task.getJobFile());
    BSPJob defaultJobConf = new BSPJob((HamaConfiguration) conf);
    Path localJobFile = defaultJobConf.getLocalPath(SUBDIR + "/"
        + task.getTaskID() + "/" + "job.xml");

    RunningJob rjob = addTaskToJob(task.getJobID(), localJobFile, tip);
    BSPJob jobConf = null;

    synchronized (rjob) {
      if (!rjob.localized) {
        Path localJarFile = defaultJobConf.getLocalPath(SUBDIR + "/"
            + task.getTaskID() + "/" + "job.jar");
        systemFS.copyToLocalFile(new Path(task.getJobFile()), localJobFile);

        HamaConfiguration conf = new HamaConfiguration();
        conf.addResource(localJobFile);
        jobConf = new BSPJob(conf, task.getJobID().toString());

        Path jarFile = new Path(jobConf.getJar());
        jobConf.setJar(localJarFile.toString());

        if (jarFile != null) {
          systemFS.copyToLocalFile(jarFile, localJarFile);

          // also unjar the job.jar files in workdir
          File workDir = new File(
              new File(localJobFile.toString()).getParent(), "work");
          if (!workDir.mkdirs()) {
            if (!workDir.isDirectory()) {
              throw new IOException("Mkdirs failed to create "
                  + workDir.toString());
            }
          }
          RunJar.unJar(new File(localJarFile.toString()), workDir);
        }
        rjob.localized = true;
      }
    }

    launchTaskForJob(tip, jobConf);
  }

  private void launchTaskForJob(TaskInProgress tip, BSPJob jobConf) {
    try {
      tip.setJobConf(jobConf);
      tip.launchTask();
    } catch (Throwable ie) {
      tip.taskStatus.setRunState(TaskStatus.State.FAILED);
      String error = StringUtils.stringifyException(ie);
      LOG.info(error);
    }
  }

  private RunningJob addTaskToJob(BSPJobID jobId, Path localJobFile,
      TaskInProgress tip) {
    synchronized (runningJobs) {
      RunningJob rJob = null;
      if (!runningJobs.containsKey(jobId)) {
        rJob = new RunningJob(jobId, localJobFile);
        rJob.localized = false;
        rJob.tasks = new HashSet<TaskInProgress>();
        rJob.jobFile = localJobFile;
        runningJobs.put(jobId, rJob);
      } else {
        rJob = runningJobs.get(jobId);
      }
      rJob.tasks.add(tip);
      return rJob;
    }
  }

  /**
   * The datastructure for initializing a job
   */
  static class RunningJob {
    private BSPJobID jobid;
    private Path jobFile;
    // keep this for later use
    Set<TaskInProgress> tasks;
    boolean localized;
    boolean keepJobFiles;

    RunningJob(BSPJobID jobid, Path jobFile) {
      this.jobid = jobid;
      localized = false;
      tasks = new HashSet<TaskInProgress>();
      this.jobFile = jobFile;
      keepJobFiles = false;
    }

    Path getJobFile() {
      return jobFile;
    }

    BSPJobID getJobId() {
      return jobid;
    }
  }

  private synchronized List<TaskStatus> cloneAndResetRunningTaskStatuses() {
    List<TaskStatus> result = new ArrayList<TaskStatus>(runningTasks.size());
    for (TaskInProgress tip : runningTasks.values()) {
      TaskStatus status = tip.getStatus();
      result.add((TaskStatus) status.clone());
    }
    return result;
  }

  public void run() {
    try {
      initialize();
      startCleanupThreads();
      boolean denied = false;
      while (running && !shuttingDown && !denied) {

        boolean staleState = false;
        try {
          while (running && !staleState && !shuttingDown && !denied) {
            try {
              State osState = offerService();
              if (osState == State.STALE) {
                staleState = true;
              } else if (osState == State.DENIED) {
                denied = true;
              }
            } catch (Exception e) {
              if (!shuttingDown) {
                LOG.info("Lost connection to BSP Master [" + bspMasterAddr
                    + "].  Retrying...", e);
                try {
                  Thread.sleep(5000);
                } catch (InterruptedException ie) {
                }
              }
            }
          }
        } finally {
          // close();
        }
        if (shuttingDown) {
          return;
        }
        LOG.warn("Reinitializing local state");
        initialize();
      }
    } catch (IOException ioe) {
      LOG.error("Got fatal exception while reinitializing GroomServer: "
          + StringUtils.stringifyException(ioe));
      return;
    }
  }

  public synchronized void shutdown() throws IOException {
    shuttingDown = true;
    close();
  }

  public synchronized void close() throws IOException {
    this.running = false;
    this.initialized = false;
    bspPeer.close();
    cleanupStorage();
    this.workerServer.stop();
    RPC.stopProxy(masterClient);

    if (taskReportServer != null) {
      taskReportServer.stop();
      taskReportServer = null;
    }
  }

  public static Thread startGroomServer(final GroomServer hrs) {
    return startGroomServer(hrs, "regionserver" + hrs.groomServerName);
  }

  public static Thread startGroomServer(final GroomServer hrs, final String name) {
    Thread t = new Thread(hrs);
    t.setName(name);
    t.start();
    return t;
  }

  // /////////////////////////////////////////////////////
  // TaskInProgress maintains all the info for a Task that
  // lives at this GroomServer. It maintains the Task object,
  // its TaskStatus, and the BSPTaskRunner.
  // /////////////////////////////////////////////////////
  class TaskInProgress {
    Task task;
    BSPJob jobConf;
    BSPJob localJobConf;
    BSPTaskRunner runner;
    volatile boolean done = false;
    volatile boolean wasKilled = false;
    private TaskStatus taskStatus;

    public TaskInProgress(Task task, BSPJob jobConf, String groomServer) {
      this.task = task;
      this.jobConf = jobConf;
      this.localJobConf = null;
      this.taskStatus = new TaskStatus(task.getJobID(), task.getTaskID(), 0,
          TaskStatus.State.UNASSIGNED, "running", groomServer,
          TaskStatus.Phase.STARTING);
    }

    private void localizeTask(Task task) throws IOException {
      Path localJobFile = this.jobConf.getLocalPath(SUBDIR + "/"
          + task.getTaskID() + "/job.xml");
      Path localJarFile = this.jobConf.getLocalPath(SUBDIR + "/"
          + task.getTaskID() + "/job.jar");

      String jobFile = task.getJobFile();
      systemFS.copyToLocalFile(new Path(jobFile), localJobFile);
      task.setJobFile(localJobFile.toString());

      localJobConf = new BSPJob(task.getJobID(), localJobFile.toString());
      localJobConf.set("bsp.task.id", task.getTaskID().toString());
      String jarFile = localJobConf.getJar();

      if (jarFile != null) {
        systemFS.copyToLocalFile(new Path(jarFile), localJarFile);
        localJobConf.setJar(localJarFile.toString());
      }

      LOG.debug("localizeTask : " + localJobConf.getJar());
      LOG.debug("localizeTask : " + localJobFile.toString());

      task.setConf(localJobConf);
    }

    public synchronized void setJobConf(BSPJob jobConf) {
      this.jobConf = jobConf;
    }

    public synchronized BSPJob getJobConf() {
      return localJobConf;
    }

    public void launchTask() throws IOException {
      localizeTask(task);
      taskStatus.setRunState(TaskStatus.State.RUNNING);
      bspPeer.setJobConf(jobConf);
      bspPeer.setCurrentTaskStatus(taskStatus);
      this.runner = task.createRunner(GroomServer.this);
      this.runner.start();
    }

    /**
     * This task has run on too long, and should be killed.
     */
    public synchronized void killAndCleanup(boolean wasFailure)
        throws IOException {
      runner.kill();
    }

    /**
     */
    public Task getTask() {
      return task;
    }

    /**
     */
    public synchronized TaskStatus getStatus() {
      return taskStatus;
    }

    /**
     */
    public TaskStatus.State getRunState() {
      return taskStatus.getRunState();
    }

    public boolean wasKilled() {
      return wasKilled;
    }

    @Override
    public boolean equals(Object obj) {
      return (obj instanceof TaskInProgress)
          && task.getTaskID().equals(
              ((TaskInProgress) obj).getTask().getTaskID());
    }

    @Override
    public int hashCode() {
      return task.getTaskID().hashCode();
    }
  }

  public boolean isRunning() {
    return running;
  }

  public static GroomServer constructGroomServer(
      Class<? extends GroomServer> groomServerClass, final Configuration conf2) {
    try {
      Constructor<? extends GroomServer> c = groomServerClass
          .getConstructor(Configuration.class);
      return c.newInstance(conf2);
    } catch (Exception e) {
      throw new RuntimeException("Failed construction of " + "Master: "
          + groomServerClass.toString(), e);
    }
  }

  @Override
  public long getProtocolVersion(String protocol, long clientVersion)
      throws IOException {
    if (protocol.equals(WorkerProtocol.class.getName())) {
      return WorkerProtocol.versionID;
    } else if (protocol.equals(BSPPeerProtocol.class.getName())) {
      return BSPPeerProtocol.versionID;
    } else {
      throw new IOException("Unknown protocol to GroomServer: " + protocol);
    }
  }

  /**
   * GroomServer address information.
   *
   * @return bsp peer information in the form of "address:port".
   */
  public String getBspPeerName() {
    if (null != bspPeer)
      return bspPeer.getPeerName();
    return null;
  }

  /**
   * The main() for child processes.
   */
  public static class Child {

    public static void main(String[] args) throws Throwable {
      LOG.debug("Child starting");

      HamaConfiguration defaultConf = new HamaConfiguration();
      // report address
      String host = args[0];
      int port = Integer.parseInt(args[1]);
      InetSocketAddress address = new InetSocketAddress(host, port);
      TaskAttemptID taskid = TaskAttemptID.forName(args[2]);

      // //////////////////
      BSPPeerProtocol umbilical = (BSPPeerProtocol) RPC.getProxy(
          BSPPeerProtocol.class, BSPPeerProtocol.versionID, address,
          defaultConf);

      Task task = umbilical.getTask(taskid);

      defaultConf.addResource(new Path(task.getJobFile()));
      BSPJob job = new BSPJob(task.getJobID(), task.getJobFile());

      try {
        // use job-specified working directory
        FileSystem.get(job.getConf()).setWorkingDirectory(
            job.getWorkingDirectory());

        task.run(job, umbilical); // run the task
      } catch (FSError e) {
        LOG.fatal("FSError from child", e);
        umbilical.fsError(taskid, e.getMessage());
      } catch (Throwable throwable) {
        LOG.warn("Error running child", throwable);
        // Report back any failures, for diagnostic purposes
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        throwable.printStackTrace(new PrintStream(baos));
      } finally {
        RPC.stopProxy(umbilical);
        // Shutting down log4j of the child-vm...
        // This assumes that on return from Task.run()
        // there is no more logging done.
        LogManager.shutdown();
      }
    }
  }

  @Override
  public Task getTask(TaskAttemptID taskid) throws IOException {
    TaskInProgress tip = tasks.get(taskid);
    if (tip != null) {
      return tip.getTask();
    } else {
      return null;
    }
  }

  @Override
  public boolean ping(TaskAttemptID taskid) throws IOException {
    // TODO Auto-generated method stub
    return false;
  }

  @Override
  public void done(TaskAttemptID taskid, boolean shouldBePromoted)
      throws IOException {
    // TODO Auto-generated method stub

  }

  @Override
  public void fsError(TaskAttemptID taskId, String message) throws IOException {
    // TODO Auto-generated method stub

  }

  @Override
  public void send(String peerName, BSPMessage msg) throws IOException {
    bspPeer.send(peerName, msg);
  }

  @Override
  public void put(BSPMessage msg) throws IOException {
    bspPeer.put(msg);
  }

  @Override
  public void put(BSPMessageBundle messages) throws IOException {
    bspPeer.put(messages);
  }

  @Override
  public BSPMessage getCurrentMessage() throws IOException {
    return bspPeer.getCurrentMessage();
  }

  @Override
  public int getNumCurrentMessages() {
    return bspPeer.getNumCurrentMessages();
  }

  @Override
  public void sync() throws IOException, KeeperException, InterruptedException {
    bspPeer.sync();
  }

  @Override
  public long getSuperstepCount() {
    return bspPeer.getSuperstepCount();
  }

  @Override
  public String getPeerName() {
    return bspPeer.getPeerName();
  }

  @Override
  public String[] getAllPeerNames() {
    return bspPeer.getAllPeerNames();
  }

  @Override
  public void clear() {
    bspPeer.clear();
  }
}
TOP

Related Classes of org.apache.hama.bsp.GroomServer

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.