Package eu.planets_project.tb.impl.system.batch.backends.ifwee

Source Code of eu.planets_project.tb.impl.system.batch.backends.ifwee.TestbedWEEBatchProcessor

/*******************************************************************************
* Copyright (c) 2007, 2010 The Planets Project Partners.
*
* All rights reserved. This program and the accompanying
* materials are made available under the terms of the
* Apache License, Version 2.0 which accompanies
* this distribution, and is available at
* http://www.apache.org/licenses/LICENSE-2.0
*
*******************************************************************************/
package eu.planets_project.tb.impl.system.batch.backends.ifwee;

import java.io.BufferedReader;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSender;
import javax.jms.QueueSession;
import javax.jms.TextMessage;
import javax.naming.Context;
import javax.naming.InitialContext;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import eu.planets_project.ifr.core.storage.api.DataRegistryFactory;
import eu.planets_project.ifr.core.storage.api.DigitalObjectManager;
import eu.planets_project.ifr.core.wee.api.WorkflowExecutionStatus;
import eu.planets_project.ifr.core.wee.api.utils.WFResultUtil;
import eu.planets_project.ifr.core.wee.api.utils.WorkflowConfigUtil;
import eu.planets_project.ifr.core.wee.api.workflow.WorkflowResult;
import eu.planets_project.ifr.core.wee.api.workflow.generated.WorkflowConf;
import eu.planets_project.ifr.core.wee.api.wsinterface.WeeService;
import eu.planets_project.services.datatypes.DigitalObject;
import eu.planets_project.tb.api.TestbedManager;
import eu.planets_project.tb.api.model.Experiment;
import eu.planets_project.tb.api.system.batch.BatchProcessor;
import eu.planets_project.tb.gui.backing.ExperimentBean;
import eu.planets_project.tb.gui.util.JSFUtil;
import eu.planets_project.tb.impl.AdminManagerImpl;
import eu.planets_project.tb.impl.TestbedManagerImpl;
import eu.planets_project.tb.impl.services.mockups.workflow.ExperimentWorkflow;
import eu.planets_project.tb.impl.services.util.wee.WeeRemoteUtil;
import eu.planets_project.tb.impl.system.batch.TestbedBatchJob;

/**
* The WEE implementation of a Testbed BatchProcessor. Provides checking procedures for events used
* by the BatchExperimentListener (MDB) as well as notify operations for reporting back. All information
* exchanged between this class and the MDB are passed through the TestbedBatchJOb object (e.g. WorkflowResult, etc.)
* @author <a href="mailto:andrew.lindley@ait.ac.at">Andrew Lindley</a>
* @since 23.10.2009
*
*/
public class TestbedWEEBatchProcessor implements BatchProcessor{
 
  private static final Log log = LogFactory.getLog(TestbedWEEBatchProcessor.class);
 
  private static TestbedWEEBatchProcessor instance;
  private static TestbedManager tbManager;
  private static WeeService weeService;
  private static WEEBatchExperimentTestbedUpdater weeTBUpdater;
  //make sure it's a synchronized Map
  private Map<String,TestbedBatchJob> jobs = Collections.synchronizedMap(new HashMap<String,TestbedBatchJob>());
 
    //JMS configuration
    private static final String QueueConnectionFactoryName = "ConnectionFactory";
    private static final String QueueName_shortTimeout = "queue/tbBatchExecQueue_shortTimeout";
    private static final String QueueName_longTimeout = "queue/tbBatchExecQueue_longTimeout";
 
  public static synchronized TestbedWEEBatchProcessor getInstance(){
    if (instance == null){
      instance = new TestbedWEEBatchProcessor();
      tbManager = TestbedManagerImpl.getInstance();
      weeService = WeeRemoteUtil.getInstance().getWeeService();
      weeTBUpdater = new WEEBatchExperimentTestbedUpdater();
    }
    return instance;
  }
 

  /* (non-Javadoc)
   * @see eu.planets_project.tb.api.system.batch.BatchProcessor#getBatchProcessorSystemIdentifier()
   */
  public String getBatchProcessorSystemIdentifier() {
    return BatchProcessor.BATCH_QUEUE_TESTBED_WEE_LOCAL;
  }

  /* (non-Javadoc)
   * @see eu.planets_project.tb.api.system.batch.BatchProcessor#getJob(java.lang.String)
   */
  public synchronized TestbedBatchJob getJob(String job_key) {
     return jobs.get(job_key);
  }
 
  public synchronized void setJob(String job_key, TestbedBatchJob job){
    this.jobs.put(job_key, job);
  }

  /* (non-Javadoc)
   * @see eu.planets_project.tb.api.system.batch.BatchProcessor#getJobPercentComplete(java.lang.String)
   */
  public synchronized int getJobPercentComplete(String job_key) {
        if(this.isCompleted(job_key)){
          return 100;
        }
        else{
          try {
            return weeService.getProgress(UUID.fromString(job_key));
        } catch (Exception e) {
          log.debug("WEE getProgress error for job: "+job_key);
          return -1;
       
        }
  }

  /* (non-Javadoc)
   * @see eu.planets_project.tb.api.system.batch.BatchProcessor#getJobStatus(java.lang.String)
   */
  public synchronized String getJobStatus(String job_key) {
        if( this.getJob(job_key) == null ) return TestbedBatchJob.NO_SUCH_JOB;
        return this.getJob(job_key).getStatus();
  }

  /* (non-Javadoc)
   * @see eu.planets_project.tb.api.system.batch.BatchProcessor#getPositionInQueue(java.lang.String)
   */
  public synchronized String getPositionInQueue(String job_key) {
    if( this.getJob(job_key) == null ) return TestbedBatchJob.POSITION_NOT_SUPPORTED;
    if( this.getJobStatus(job_key).equals(TestbedBatchJob.RUNNING)){
      return TestbedBatchJob.POSITION_IN_PROGRESS;
    }
    if( this.getJobStatus(job_key).equals(TestbedBatchJob.DONE)){
      return TestbedBatchJob.POSITION_COMPLETED;
    }
    try {
      return weeService.getPositionInQueue(UUID.fromString(job_key))+"";
    } catch (Exception e) {
      log.debug("WEE getPositionInQueue error for job: "+job_key);
      return TestbedBatchJob.POSITION_NOT_SUPPORTED;
    }
  }

  @Deprecated
  public String submitBatch(long expID, ExperimentWorkflow workflow,
      Collection<String> digitalObjects) {
    // TODO Auto-generated method stub
    return null;
  }

  /*
   * since TB version 1.0
   * (non-Javadoc)
   * @see eu.planets_project.tb.api.system.batch.BatchProcessor#sumitBatch(long, java.util.List, java.lang.String, eu.planets_project.ifr.core.wee.api.workflow.generated.WorkflowConf)
   */
  public String sumitBatch(long expID, List<DigitalObject> digObjs, WorkflowConf workflowConfig) {
   
    String retTicket ="";
    //0. create a TestbedBatchJob object that's used to temporarily park the results
     TestbedBatchJob testbedBatchJob = new TestbedBatchJob( expID);

    //1. submit the Workflow to the WEE System and receive a ticket
    try {
      retTicket = "" + TestbedWEEBatchProcessor.weeService.submitWorkflow((ArrayList<DigitalObject>) digObjs, workflowConfig.getTemplate().getClazz(), new WorkflowConfigUtil().marshalWorkflowConfigToXMLTemplate(workflowConfig));
      submitTicketToExecutionQueue(retTicket, testbedBatchJob, expID);
          return retTicket;
    } catch (Exception e) {
      log.error("Error when remotely submitting workflow to WEE or ticket to pollingQueue was called",e);
      return "";
    }
  }
 
  /** {@inheritDoc} */
  public String sumitBatchByReference(long expID, List<URI> digObjRef,
      WorkflowConf workflowConfig) {
   
    String retTicket ="";
    //0. create a TestbedBatchJob object that's used to temporarily park the results
    TestbedBatchJob testbedBatchJob = new TestbedBatchJob( expID);
   
    //1. submit the Workflow to the WEE System and receive a ticket
    try {
      retTicket = "" + TestbedWEEBatchProcessor.weeService.submitWorkflowByReference((ArrayList<URI>) digObjRef, workflowConfig.getTemplate().getClazz(), new WorkflowConfigUtil().marshalWorkflowConfigToXMLTemplate(workflowConfig));
      submitTicketToExecutionQueue(retTicket, testbedBatchJob, expID);
          return retTicket;
    } catch (Exception e) {
      log.error("Error when remotely submitting workflow by reference to WEE or ticket to pollingQueue was called",e);
      return "";
    }
  }
 
 
  /**
   * Decide how long we want to poll for an object before from the engine before we throw a time-out
   * automatically approved experiments - use shortTimeout (5 Minutes)
   * manually approved experiments - use longTimeout (3 Days)
   * @param ticket
   * @param testbedBatchJob
   * @param expID
   * @throws Exception
   */
  private void submitTicketToExecutionQueue(String ticket, TestbedBatchJob testbedBatchJob, long expID) throws Exception{
    jobs.put(ticket, testbedBatchJob);
   
        Experiment exp = tbManager.getExperiment(expID);  
        //2. decide how long we want to poll for an object before from the engine before we throw a time-out
        //2a. automatically approved experiments - use shortTimeout (5 Minutes)
        if(exp.getExperimentApproval().getApprovalUsersIDs().contains(AdminManagerImpl.APPROVAL_AUTOMATIC_USER)){
          submitTicketForPollingToQueue(ticket,TestbedWEEBatchProcessor.QueueName_shortTimeout,this.getBatchProcessorSystemIdentifier());
        }
        //2b manually approved experiments - use longTimeout (3 Days)
        if(!exp.getExperimentApproval().getApprovalUsersIDs().contains(AdminManagerImpl.APPROVAL_AUTOMATIC_USER)){
          submitTicketForPollingToQueue(ticket,TestbedWEEBatchProcessor.QueueName_longTimeout,this.getBatchProcessorSystemIdentifier());
        }
  }
 
 

  /* (non-Javadoc)
   * @see eu.planets_project.tb.api.system.batch.BatchProcessor#submitTicketForPollingToQueue(java.lang.String, java.lang.String, java.lang.String)
   */
  public void submitTicketForPollingToQueue(String ticket, String queueName, String batchProcessorSystemID)throws Exception{
    Context         ctx      = null;
      QueueConnection cnn  = null;
      QueueSession    sess  = null;
      Queue           queue    = null;
      QueueSender     sender   = null;
    try{
      ctx = new InitialContext();
        QueueConnectionFactory factory =
            (QueueConnectionFactory) ctx.lookup(QueueConnectionFactoryName);
        queue = (Queue) ctx.lookup(queueName);
        cnn = factory.createQueueConnection();
        sess = cnn.createQueueSession(false,QueueSession.AUTO_ACKNOWLEDGE);
       
        //create the message to send to the MDB e.g. a TextMessage
        TextMessage message = sess.createTextMessage(ticket);
        message.setStringProperty(BatchProcessor.QUEUE_PROPERTY_NAME_FOR_SENDING, batchProcessorSystemID);

          //and finally send the message to the queue.
          sender = sess.createSender(queue);
      sender.send(message);
      log.debug("TestbedWEEBatchProcessor: sent message to queue, ID:"+message.getJMSMessageID());
    } finally {
          try { if( null != sender)   sender.close()} catch( Exception ex ) {}
          try { if( null != sess)     sess.close(); }    catch( Exception ex ) {}
          try { if( null != cnn)      cnn.close(); }     catch( Exception ex ) {}
          try { if( null != ctx)     ctx.close();     } catch( Exception ex ) {}
        }
  }


  /* (non-Javadoc)
   * @see eu.planets_project.tb.api.system.batch.BatchProcessor#notifyComplete(java.lang.String, eu.planets_project.tb.impl.system.TestbedBatchJob)
   */
  public synchronized void notifyComplete(String job_key, TestbedBatchJob job) {
    //this.setJob(job_key, job);
    WorkflowResult wfLog=null;
   
    //check if job object still exists
    if(job==null){
       //when null (e.g. not in memory any more) try to pull in experiment ID from current session
       ExperimentBean expBean = (ExperimentBean)JSFUtil.getManagedObject("ExperimentBean");
           Experiment exp = expBean.getExperiment();
           //in the case the server has already been shut down and in memory allocation is missing
           wfLog = retrieveWorkflowResult(exp.getExperimentExecutable().getBatchExecutionIdentifier());
           weeTBUpdater.processNotify_WorkflowCompleted(exp.getEntityID(),wfLog);
           log.debug("notifyComplete: complete for "+job_key+" experimentID: "+exp.getEntityID());
    }else{
      wfLog = retrieveWorkflowResult(job_key);
      weeTBUpdater.processNotify_WorkflowCompleted(job.getExpID(),wfLog);
      log.debug("notifyComplete: complete for "+job_key+" experimentID: "+job.getExpID());
    }
  }
 
  private WorkflowResult retrieveWorkflowResult(String job_key){
    WorkflowResult wfLog = null;
    try {
      //fist try to retrieve the wfResult from the DR
      wfLog = fetchWFResultFromDR(job_key);
      log.debug("notifyComplete: fetched wfResult.xml from DR for "+job_key);
    }catch(Exception e){
      try {
        wfLog = weeService.getResult(UUID.fromString(job_key));
        log.debug("notifyComplete: fetched wfResult.xml from web-service WEE-Manager for "+job_key);
      } catch (Exception e2) {
        log.debug("error building UUID from String "+job_key+ "processNotify_WorkflowCompleted without a WorkflowResult"+e2);
      }
    }
    return wfLog;
  }
 
  private WorkflowResult fetchWFResultFromDR(String job_key) throws Exception{
    //2. get the data registry manager and fetch the digital object containing the wfResult
    URI drManagerID = DataRegistryFactory.createDataRegistryIdFromName("/experiment-files/executions/").normalize();
    DigitalObjectManager drExpResults = DataRegistryFactory.getDataRegistry().getDigitalObjectManager(drManagerID);
    URI wfResultstorageURI =new URI(drManagerID.getScheme(),drManagerID.getAuthority(),drManagerID.getPath()+"/"+job_key+"/wfResult-id-"+job_key+".xml",null,null).normalize();
    DigitalObject doWFResult = drExpResults.retrieve(wfResultstorageURI);
    if(doWFResult.getContent()==null){
      throw new Exception("No workflow xml content available.");
    }
    //3. now read the stream into a String and unmarshall the WorkflwoResult object         
        StringBuilder sb = new StringBuilder();
        String line;

        InputStream fis = doWFResult.getContent().getInputStream();
        try {
            BufferedReader reader = new BufferedReader(new InputStreamReader(
                fis , "UTF-8"));
           
            while ((line = reader.readLine()) != null) {
              sb.append(line).append("\n");
            }
        } finally {
          fis.close();
        }
       
        //4. call unmarshall xml->WorkflowResult object
        return WFResultUtil.unmarshalWorkflowResult(sb.toString());
  }
 
  /* (non-Javadoc)
   * @see eu.planets_project.tb.api.system.batch.BatchProcessor#notifyFailed(java.lang.String, eu.planets_project.tb.impl.system.TestbedBatchJob)
   */
  public synchronized void notifyFailed(String job_key, TestbedBatchJob job) {
    //this.setJob(job_key, job);
    weeTBUpdater.processNotify_WorkflowFailed(job.getExpID(),(String)job.getWorkflowFailureReport());
    log.debug("callback notify wee for "+job_key);
  }


  /* (non-Javadoc)
   * @see eu.planets_project.tb.api.system.batch.BatchProcessor#notifyRunning(java.lang.String, eu.planets_project.tb.impl.system.TestbedBatchJob)
   */
  public synchronized void notifyRunning(String job_key, TestbedBatchJob job) {
    // TODO AL: this is currently not supported by the WEE implementation
  }


  /* (non-Javadoc)
   * @see eu.planets_project.tb.api.system.batch.BatchProcessor#notifyUpdate(java.lang.String, eu.planets_project.tb.impl.system.TestbedBatchJob)
   */
  public synchronized void notifyUpdate(String job_key, TestbedBatchJob job) {
    //TODO AL: currently not used - as not supported by WEE...
    //This could be used for pulling incremental updates in
  }


  /* (non-Javadoc)
   * @see eu.planets_project.tb.api.system.batch.BatchProcessor#notifyStart(java.lang.String, eu.planets_project.tb.impl.system.TestbedBatchJob)
   */
  public synchronized void notifyStart(String job_key, TestbedBatchJob job) {
    //this.setJob(job_key, job);
    weeTBUpdater.processNotify_WorkflowStarted(job.getExpID());
    log.debug("callback notify wee for "+job_key);
  }
 
  /* (non-Javadoc)
   * @see eu.planets_project.tb.api.system.batch.BatchProcessor#isRunning(java.lang.String)
   */
  public synchronized boolean isStarted(String job_key){
    try {
      String status = weeService.getStatus(UUID.fromString(job_key));
      if(
      status.equals(WorkflowExecutionStatus.RUNNING.toString())||
      status.equals(WorkflowExecutionStatus.COMPLETED.toString())||
      status.equals(WorkflowExecutionStatus.FAILED.toString())){
        return true;
      }
      else{
        return false;
      }
    } catch (Exception e) {
      log.error("unable to build UUID for "+job_key+" "+e);
      return false;
    }
  }


  /* (non-Javadoc)
   * @see eu.planets_project.tb.api.system.batch.BatchProcessor#isCompleted(java.lang.String)
   */
  public synchronized boolean isCompleted(String job_key) {
    try {
      String status = weeService.getStatus(UUID.fromString(job_key));
      if(status.equals(WorkflowExecutionStatus.COMPLETED.toString())||
         status.equals(WorkflowExecutionStatus.FAILED.toString())){
        return true;
      }
      else{
        return false;
      }
    } catch (Exception e) {
      log.error("unable to build UUID for "+job_key+" "+e);
      return false;
    }
  }


  /* (non-Javadoc)
   * @see eu.planets_project.tb.api.system.batch.BatchProcessor#isUpdated(java.lang.String)
   */
  public synchronized boolean isUpdated(String job_key) {
    // TODO AL. This operation is currently not supported by the WEE. Would allow  checking for incremental updates
    return false;
  }


  /* (non-Javadoc)
   * @see eu.planets_project.tb.api.system.batch.BatchProcessor#getWorkflowEngineResult(java.lang.String)
   */
  public Object getWorkflowEngineResult(String job_key) {
    try {
      return weeService.getResult(UUID.fromString(job_key));
    } catch (Exception e) {
      log.debug("getWorkflowEngineResult failed "+e);
      return new WorkflowResult();
    }
  }


  /* (non-Javadoc)
   * @see eu.planets_project.tb.api.system.batch.BatchProcessor#isFailed(java.lang.String)
   */
  public synchronized boolean isFailed(String job_key) {
    try {
      String status = weeService.getStatus(UUID.fromString(job_key));
      if(status.equals(WorkflowExecutionStatus.FAILED.toString())){
        return true;
      }
      else{
        return false;
      }
    } catch (Exception e) {
      log.error("unable to build UUID for "+job_key+" "+e);
      return false;
    }
  }


  /* (non-Javadoc)
   * @see eu.planets_project.tb.api.system.batch.BatchProcessor#isRunning(java.lang.String)
   */
  public synchronized boolean isRunning(String job_key) {
    try {
      String status = weeService.getStatus(UUID.fromString(job_key));
      if(status.equals(WorkflowExecutionStatus.RUNNING.toString())){
        return true;
      }
      else{
        return false;
      }
    } catch (Exception e) {
      log.error("unable to build UUID for "+job_key+" "+e);
      return false;
    }
  }
 
  /* (non-Javadoc)
   * @see eu.planets_project.tb.api.system.batch.BatchProcessor#isQueued(java.lang.String)
   */
  public synchronized boolean isQueued(String job_key) {
    try {
      String status = weeService.getStatus(UUID.fromString(job_key));
      if((!status.equals(WorkflowExecutionStatus.RUNNING.toString()))&&
         (!status.equals(WorkflowExecutionStatus.COMPLETED.toString()))){
        return true;
      }
      else if(status.equals(WorkflowExecutionStatus.SUBMITTED.toString())){
        return true;
      }
      else{
        return false;
      }
    } catch (Exception e) {
      log.error("unable to build UUID for "+job_key+" "+e);
      return false;
    }
  }

}
TOP

Related Classes of eu.planets_project.tb.impl.system.batch.backends.ifwee.TestbedWEEBatchProcessor

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.