Package org.apache.uima.ducc.cli

Source Code of org.apache.uima.ducc.cli.DuccMonitor$Killer

/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*      http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.uima.ducc.cli;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.management.ManagementFactory;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.atomic.AtomicBoolean;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.PosixParser;
import org.apache.uima.ducc.cli.IUiOptions.UiOption;
import org.apache.uima.ducc.common.json.MonitorInfo;
import org.apache.uima.ducc.common.utils.DuccPropertiesResolver;
import org.apache.uima.ducc.common.utils.SynchronizedSimpleDateFormat;
import org.apache.uima.ducc.common.utils.Utils;
import org.apache.uima.ducc.transport.event.IDuccContext.DuccContext;

import com.google.gson.Gson;

public abstract class DuccMonitor {

  protected static final int RC_SUCCESS = 0;
  protected static final int RC_FAILURE = 1;
  protected static final int RC_HELP = RC_FAILURE;

  protected static final String NotFound = "NotFound";
  protected static final String StateRunning = "Running";
  protected static final String StateCompleting = "Completing";
  protected static final String StateCompleted = "Completed";

  private Options options = new Options();

  private UiOption[] opts = new UiOption[0];

  private String id = null;

  private AtomicBoolean flag_cancel_on_interrupt = new AtomicBoolean(false);
  private AtomicBoolean flag_debug = new AtomicBoolean(false);
  private AtomicBoolean flag_error = new AtomicBoolean(true);
  private AtomicBoolean flag_info = new AtomicBoolean(true);
  private AtomicBoolean flag_trace = new AtomicBoolean(false);
  private AtomicBoolean flag_timestamp = new AtomicBoolean(false);

  private AtomicBoolean flag_observer = new AtomicBoolean(true);

  private int milliseconds = 1;
  private int seconds = 1000 * milliseconds;
  private int wakeupInterval = 15 * seconds;
  private int urlTimeout = 1 * 60 * seconds;

  private Thread main = null;
  private DuccPropertiesResolver duccPropertiesResolver = null;

  private DuccContext context = null;
  IDuccCallback messageProcessor = null;

  private SynchronizedSimpleDateFormat sdf = new SynchronizedSimpleDateFormat(
      "dd/MM/yyyy HH:mm:ss");

  private UiOption[] optsSubmitJob = new UiOption[] { UiOption.Help,
      UiOption.Debug, UiOption.Quiet, UiOption.Timestamp, UiOption.JobId,
      UiOption.CancelOnInterrupt, };

  private UiOption[] optsMonitorJob = new UiOption[] { UiOption.Help,
      UiOption.Debug, UiOption.Quiet, UiOption.Timestamp, UiOption.JobId, };

  private UiOption[] optsSubmitManagedReservation = new UiOption[] {
      UiOption.Help, UiOption.Debug, UiOption.Quiet, UiOption.Timestamp,
      UiOption.ManagedReservationId, UiOption.CancelOnInterrupt, };

  private UiOption[] optsMonitorManagedReservation = new UiOption[] {
      UiOption.Help, UiOption.Debug, UiOption.Quiet, UiOption.Timestamp,
      UiOption.ManagedReservationId, };

  protected DuccMonitor(DuccContext context, boolean submit) {
    initialize(context, submit, new DefaultCallback());
  }

  protected DuccMonitor(DuccContext context, boolean submit,
      IDuccCallback messageProcessor) {
    initialize(context, submit, messageProcessor);
  }

  public abstract void help(Options options);

  public abstract void cancel();

  public abstract String getUrl(String id);

  public String getHost() {
    String host = duccPropertiesResolver.getFileProperty("ducc.ws.node");
    if (host == null) {
      host = duccPropertiesResolver.getFileProperty("ducc.head");
    }
    return host;
  }

  public String getPort() {
    String port = duccPropertiesResolver.getFileProperty("ducc.ws.port");
    return port;
  }

  public String getId() {
    return id;
  }

  private void initialize(DuccContext context, boolean submit,
      IDuccCallback messageProcessor) {
    // context
    this.context = context;
    // submit
    if (context != null) {
      switch (context) {
      case Job:
        if (submit) {
          opts = optsSubmitJob;
        } else {
          opts = optsMonitorJob;
        }
        break;
      case ManagedReservation:
        if (submit) {
          opts = optsSubmitManagedReservation;
        } else {
          opts = optsMonitorManagedReservation;
        }
        break;
      default:
        break;
      }
    }
    options = CliBase.makeOptions(opts);
    // message processor
    if (messageProcessor != null) {
      this.messageProcessor = messageProcessor;
    }
  }

  protected void trace(String message) {
    if (flag_trace.get()) {
      messageProcessor.status(timestamp(message));
    }
  }

  protected void debug(String message) {
    if (flag_debug.get()) {
      messageProcessor.status(timestamp(message));
    }
  }

  protected void debug(Exception e) {
    if (flag_debug.get()) {
      messageProcessor.status(e.toString());
    }
  }

  private void info(String message) {
    if (flag_info.get()) {
      messageProcessor.status(timestamp(message));
    }
  }

  private void error(String message) {
    if (flag_error.get()) {
      messageProcessor.status(timestamp(message));
    }
  }

  protected String timestamp(String message) {
    String tMessage = message;
    if (flag_timestamp.get()) {
      String date = sdf.format(new java.util.Date());
      tMessage = date + " " + message;
    }
    return tMessage;
  }

  private String details(MonitorInfo monitorInfo) {
    StringBuffer sb = new StringBuffer();
    switch (context) {
    case Job:
      sb.append(" ");
      sb.append("total:");
      sb.append(monitorInfo.total);
      sb.append(" ");
      sb.append("done:");
      sb.append(monitorInfo.done);
      sb.append(" ");
      sb.append("error:");
      sb.append(monitorInfo.error);
      sb.append(" ");
      sb.append("retry:");
      sb.append(monitorInfo.retry);
      sb.append(" ");
      sb.append("procs:");
      sb.append(monitorInfo.procs);
      break;
    }
    return sb.toString();
  }

  private void adjustWakeupInterval() {
    String rate = duccPropertiesResolver
        .getFileProperty("ducc.orchestrator.state.publish.rate");
    try {
      wakeupInterval = Integer.parseInt(rate);
    } catch (Exception e) {
      debug(e);
    }
  }

  private int runInternal(String[] args) throws Exception {
    // DUCC_HOME
    String ducc_home = Utils.findDuccHome();
    if (ducc_home == null) {
      messageProcessor
          .status("Missing required environment variable: DUCC_HOME");
      return RC_FAILURE;
    }
    // Ingest ducc.properties
    duccPropertiesResolver = DuccPropertiesResolver.getInstance();
    // Parse
    synchronized (DuccMonitor.class) {
      CommandLineParser parser = new PosixParser();
      CommandLine commandLine = parser.parse(options, args);
      if (commandLine.hasOption(DuccUiConstants.name_help)) {
        help(options);
        return RC_HELP;
      }
      if (commandLine.getOptions().length == 0) {
        help(options);
        return RC_HELP;
      }
      if (commandLine.hasOption(UiOption.Timestamp.pname())) {
        flag_timestamp.set(true);
      }
      if (commandLine.hasOption(UiOption.Quiet.pname())) {
        flag_info.set(false);
        flag_error.set(false);
      }
      if (commandLine.hasOption(UiOption.Debug.pname())) {
        flag_debug.set(true);
      }
      if (commandLine.hasOption(UiOption.CancelOnInterrupt.pname())) {
        flag_cancel_on_interrupt.set(true);
      }
      if (commandLine.hasOption(UiOption.JobId.pname())) {
        id = commandLine.getOptionValue(UiOption.JobId.pname());
      } else if (commandLine.hasOption(UiOption.ManagedReservationId
          .pname())) {
        id = commandLine.getOptionValue(UiOption.ManagedReservationId
            .pname());
      } else {
        HelpFormatter formatter = new HelpFormatter();
        formatter.setWidth(110);
        formatter.printHelp(DuccJobMonitor.class.getName(), options);
        return RC_HELP;
      }
    }
    // Handle Ctl-C
    main = Thread.currentThread();
    Thread killer = new Killer(main);
    Runtime.getRuntime().addShutdownHook(killer);
    // Setup polling
    adjustWakeupInterval();
    String urlString = getUrl(id);
    String lastMessage = "";
    String thisMessage = "";
    StringBuffer message = new StringBuffer();
    message.append("id:" + id);
    message.append(" ");
    message.append("location:");
    message.append(ManagementFactory.getRuntimeMXBean().getName());
    info(message.toString());
    // Poll until finished
    while (flag_observer.get()) {
      String json = getSingleLineStatus(urlString);
      if (json != null) {
        debug(json);
        Gson gson = new Gson();
        MonitorInfo monitorInfo = gson
            .fromJson(json, MonitorInfo.class);
        int stateCount = monitorInfo.stateSequence.size();
        debug("states:" + stateCount);
        if (stateCount <= 0) {
          message = new StringBuffer();
          message.append("id:" + id);
          message.append(" ");
          message.append("state:" + NotFound);
          thisMessage = message.toString();
          info(thisMessage);
          message = new StringBuffer();
          message.append("id:" + id);
          message.append(" ");
          message.append("rc:" + RC_FAILURE);
          thisMessage = message.toString();
          info(thisMessage);
          return RC_FAILURE;
        }
        String state = "";
        Iterator<String> states = monitorInfo.stateSequence.iterator();
        while (states.hasNext()) {
          state = states.next();
          debug("list:" + state);
        }
        message = new StringBuffer();
        message.append("id:" + id);
        message.append(" ");
        message.append("state:" + state);
        if (state.equals(StateRunning)) {
          message.append(details(monitorInfo));
        } else if (state.equals(StateCompleting)) {
          flag_cancel_on_interrupt.set(false);
          message.append(details(monitorInfo));
        } else if (state.equals(StateCompleted)) {
          flag_cancel_on_interrupt.set(false);
          message.append(details(monitorInfo));
        }
        thisMessage = message.toString();
        if (!thisMessage.equals(lastMessage)) {
          info(thisMessage);
          lastMessage = thisMessage;
        }
        if (state.equals(StateCompleted)) {
          // See Jira 2911
          //if (monitorInfo.procs.equals("0")) {
            if (monitorInfo.total.equals(monitorInfo.done)) {
              if (!monitorInfo.rationale.equals("")) {
                message = new StringBuffer();
                message.append("id:" + id);
                message.append(" rationale:" + monitorInfo.rationale);
                thisMessage = message.toString();
                info(thisMessage);
              }
              int rc = RC_FAILURE;
              message = new StringBuffer();
              message.append("id:" + id);
              try {
                rc = Integer.parseInt(monitorInfo.code);
                message.append(" rc:" + rc);
              } catch (NumberFormatException e) {
                message.append(" code:" + monitorInfo.code);
              }
              thisMessage = message.toString();
              info(thisMessage);
              return rc;
            } else {
              if (!monitorInfo.errorLogs.isEmpty()) {
                message = new StringBuffer();
                message.append("id:" + id);
                ArrayList<String> errorLogs = monitorInfo.errorLogs;
                for (String errorLog : errorLogs) {
                  message.append(" file:" + errorLog);
                }
                thisMessage = message.toString();
                info(thisMessage);
              }
              if (!monitorInfo.rationale.equals("")) {
                message = new StringBuffer();
                message.append("id:" + id);
                message.append(" rationale:" + monitorInfo.rationale);
                thisMessage = message.toString();
                info(thisMessage);
              }
              message = new StringBuffer();
              message.append("id:" + id);
              message.append(" rc:" + RC_FAILURE);
              thisMessage = message.toString();
              info(thisMessage);
              return RC_FAILURE;
            }
          //}
        }
      } else {
        error("error: accessing " + urlString);
      }
      long start = System.currentTimeMillis();
      long end = start;
      while (!isTimeExpired(start, end, wakeupInterval)) {
        if (!flag_observer.get()) {
          break;
        }
        try {
          Thread.sleep(wakeupInterval);
        } catch (InterruptedException e) {
          debug(e);
        }
        end = System.currentTimeMillis();
      }

    }
    return RC_SUCCESS;
  }

  private boolean isTimeExpired(long start, long end, long interval) {
    boolean retVal = false;
    long diff = end - start;
    if (diff >= interval) {
      retVal = true;
    }
    trace("start:" + start + " " + "end:" + end + " " + "diff:" + diff
        + " " + "interval:" + interval + " " + "result:" + retVal);
    return retVal;
  }

  private String getSingleLineStatus(String urlString) {
    String line = null;
    URL url = null;
    try {
      url = new URL(urlString);
      URLConnection uc = url.openConnection();
      uc.setReadTimeout(urlTimeout);
      BufferedReader br = new BufferedReader(new InputStreamReader(
          uc.getInputStream()));
      line = br.readLine();
      br.close();
    } catch (MalformedURLException e) {
      e.printStackTrace();
    } catch (IOException e) {
      e.printStackTrace();
    }
    return line;
  }

  private class Killer extends Thread {

    public Killer(Thread thread) {
    }

    public void run() {
      StringBuffer message = new StringBuffer();
      if (flag_cancel_on_interrupt.get()) {
        message.append("killer: cancel");
        cancel();
      } else {
        message.append("killer: no cancel");
      }
      debug(message.toString());
      flag_observer.set(false);
    }
  }

  public int run(String[] args) {
    int code = RC_FAILURE;
    try {
      code = runInternal(args);
    } catch (Exception e) {
      messageProcessor.status("ERROR: " + e.toString());
    }
    debug("rc=" + code);
    return code;
  }

}
TOP

Related Classes of org.apache.uima.ducc.cli.DuccMonitor$Killer

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.