Package org.apache.airavata.workflow.tracking.impl

Source Code of org.apache.airavata.workflow.tracking.impl.NotifierImpl

/*
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.  See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.  The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.  You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied.  See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.apache.airavata.workflow.tracking.impl;

import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;

import org.apache.airavata.commons.LeadCrosscutParametersUtil;
import org.apache.airavata.workflow.tracking.Notifier;
import org.apache.airavata.workflow.tracking.common.DataDurationObj;
import org.apache.airavata.workflow.tracking.common.DataObj;
import org.apache.airavata.workflow.tracking.common.DurationObj;
import org.apache.airavata.workflow.tracking.common.WorkflowTrackingContext;
import org.apache.airavata.workflow.tracking.impl.state.DataDurationImpl;
import org.apache.airavata.workflow.tracking.impl.state.DataObjImpl;
import org.apache.airavata.workflow.tracking.impl.state.DurationImpl;
import org.apache.airavata.workflow.tracking.types.*;

/**
* DOES NOT SUPPORT MULTI_THREADING -- PUBLISHER QUEUE, DATA CONSUMED/PRODUCED BATCHING * Utility to create and send
* Lead notification messages for an application (script/web service). Since it extends WorkflowNotifierImpl, it can
* also send workflow related notifications.
*
* The constructor of this class uses the following properties from CONSTS: BROKER_URL, TOPIC, WORKFLOW_ID, NODE_ID,
* TIMESTEP, SERVICE_ID, SERVICE_WSDL, IN_XML_MESSAGE, NAME_RESOLVER_URL, FILE_ACCESS_PROTOCOL, DISABLE_NAME_RESOLVER,
* BATCH_PROVENANCE_MSGS, ASYNC_PUB_MODE
*
*/
public class NotifierImpl extends ProvenanceNotifierImpl implements Notifier {

    private static final String WFT_NS = "http://lead.extreme.indiana.edu/namespaces/2006/06/workflow_tracking";

    private static final HashMap<String, String> NS_MAP = new HashMap<String, String>();
    static {
        NS_MAP.put("", WFT_NS);
    }

    // public NotifierImpl(ConstructorProps props) throws XMLStreamException, IOException {
    // super( props);
    // }

    /**
     * @param batchProvMessages
     *            whether provenance messages should be batched and sent as one message
     * @param publisher
     *            a NotificationPublisher used to send the notifications
     *
     */
    public NotifierImpl() {
    }

    // //////////////////////////////////////////////////////////////////////////////////////////////
    //
    // AUDIT NOTIFIER
    //
    // //////////////////////////////////////////////////////////////////////////////////////////////

    /**
     * {@inheritDoc}
     *
     */
    public void resourceMapping(WorkflowTrackingContext context, String mappedResource, int retryStatusCount,
            String... descriptionAndAnnotation) {

        ResourceMappingDocument mapMsg = ResourceMappingDocument.Factory.newInstance();
        ResourceMappingType map = mapMsg.addNewResourceMapping();
        map.setMappedResource(mappedResource);
        map.setRetryStatusCount(retryStatusCount);

        sendNotification(context, mapMsg, descriptionAndAnnotation, "[Resource mapping done for" + mappedResource + "]");
    }

    /**
     * {@inheritDoc}
     *
     */
    public void jobStatus(WorkflowTrackingContext context, String status, int retryCount,
            String... descriptionAndAnnotation) {

        JobStatusDocument jobMsg = JobStatusDocument.Factory.newInstance();
        JobStatusType job = jobMsg.addNewJobStatus();
        job.setJobStatus(status);
        job.setRetryCount(retryCount);

        sendNotification(context, jobMsg, descriptionAndAnnotation, "[Job status is " + status + "]");
    }

    // //////////////////////////////////////////////////////////////////////////////////////////////
    //
    // AUDIT NOTIFIER
    //
    // //////////////////////////////////////////////////////////////////////////////////////////////

    /**
     * {@inheritDoc}
     *
     */
    public void appAudit(WorkflowTrackingContext context, String name, URI jobHandle, String host, String queueName,
            String jobId, String dName, String projectId, String rsl, String... descriptionAndAnnotation) {
        final ApplicationAuditDocument appAuditMsg = ApplicationAuditDocument.Factory.newInstance();
        final ApplicationAuditType appAudit = appAuditMsg.addNewApplicationAudit();
        appAudit.setJobHandle(jobHandle.toString());
        appAudit.setName(name);
        appAudit.setHost(host);
        appAudit.setQueueName(queueName); // queueName is an optional element
        appAudit.setJobId(jobId); // jobId is an optional element
        appAudit.setDistinguishedName(dName);
        appAudit.setProjectId(projectId); // projectId is an optional element
        appAudit.setRsl(rsl);

        sendNotification(context, appAuditMsg, descriptionAndAnnotation, "[Audit msg for '" + name + "' at host "
                + host + " for DN " + dName + "]" // default
        );
    }

    // //////////////////////////////////////////////////////////////////////////////////////////////
    //
    // PERFORMANCE NOTIFIER
    //
    // //////////////////////////////////////////////////////////////////////////////////////////////
    /**
     * {@inheritDoc}
     *
     */
    public DurationObj computationStarted() {

        return new DurationImpl();
    }

    /**
     * {@inheritDoc}
     *
     */
    public DurationObj computationFinished(WorkflowTrackingContext context, DurationObj compObj,
            String... descriptionAndAnnotation) {

        if (context == null)
            throw new RuntimeException("Local entity passed was NULL.");
        if (compObj == null)
            throw new RuntimeException("Comp duration object passed was NULL.");

        // mark computation end
        compObj.markEndTimeMillis();

        // create activity
        ComputationDurationDocument activity = ComputationDurationDocument.Factory.newInstance();
        ComputationDurationDocument.ComputationDuration activityType = activity.addNewComputationDuration();

        activityType.setDurationInMillis(compObj.getDurationMillis());

        sendNotification(context, activity, descriptionAndAnnotation,
                "[Computation Time taken = " + compObj.getDurationMillis() + " ms]");

        return compObj;
    }

    /**
     * {@inheritDoc}
     *
     */
    public DurationObj computationDuration(WorkflowTrackingContext context, long durationMillis,
            String... descriptionAndAnnotation) {

        DurationObj compObj = new DurationImpl(durationMillis);
        return computationFinished(context, compObj, descriptionAndAnnotation);
    }

    /**
     * {@inheritDoc}
     *
     */
    public DataDurationObj dataSendStarted(DataObj dataObj, URI remoteLocation) {

        return new DataDurationImpl(dataObj, remoteLocation);
    }

    /**
     * {@inheritDoc}
     *
     */
    public DataDurationObj dataSendFinished(WorkflowTrackingContext context, DataDurationObj dataDurationObj,
            String... descriptionAndAnnotation) {

        if (context == null)
            throw new RuntimeException("Local entity passed was NULL.");
        if (dataDurationObj == null)
            throw new RuntimeException("Data duration object passed was NULL.");

        DataObj dataObj = null;
        if ((dataObj = dataDurationObj.getDataObj()) == null)
            throw new RuntimeException("Data duration object's DataObje was NULL.");
        if (dataObj.getId() == null)
            throw new RuntimeException("Data object's ID was NULL.");
        if (dataObj.getLocalLocation() == null)
            throw new RuntimeException("Local file URL passed in DataDurationObj.getDataObj was NULL.");
        if (dataDurationObj.getRemoteLocation() == null)
            throw new RuntimeException("Remote file URL passed in DataDurationObj was NULL.");

        // mark computation end
        dataDurationObj.markEndTimeMillis();

        // create activity
        DataSendDurationDocument activity = DataSendDurationDocument.Factory.newInstance();
        DataTransferDurationType activityType = activity.addNewDataSendDuration();

        activityType.setId(dataObj.getId().toString());
        activityType.setDurationInMillis(dataDurationObj.getDurationMillis());
        activityType.setSizeInBytes(dataObj.getSizeInBytes());

        activityType.setSource(dataObj.getLocalLocation().toString());

        activityType.setTarget(dataDurationObj.getRemoteLocation().toString());

        sendNotification(context, activity, descriptionAndAnnotation, "[Data at " + dataObj.getLocalLocation()
                + " was sent to " + dataDurationObj.getRemoteLocation() + "]");

        return dataDurationObj;
    }

    /**
     * {@inheritDoc}
     *
     */
    public DataDurationObj dataSendDuration(WorkflowTrackingContext context, URI dataID, URI localLocation,
            URI remoteLocation, int sizeInBytes, long durationMillis, String... descriptionAndAnnotation) {

        List<URI> locations = new ArrayList<URI>(2);
        locations.add(localLocation);
        locations.add(remoteLocation);

        DataObj dataObj = new DataObjImpl(dataID, locations, sizeInBytes);
        DataDurationObj dataDurationObj = new DataDurationImpl(dataObj, remoteLocation, durationMillis);

        return dataSendFinished(context, dataDurationObj, descriptionAndAnnotation);
    }

    /**
     * {@inheritDoc}
     *
     */
    public DataDurationObj dataReceiveStarted(URI dataID, URI remoteLocation, URI localLocation) {

        List<URI> locations = new ArrayList<URI>(2);
        locations.add(localLocation);
        locations.add(remoteLocation);

        DataObj dataObj = new DataObjImpl(dataID, locations);
        DataDurationObj dataDurationObj = new DataDurationImpl(dataObj, remoteLocation);

        return dataDurationObj;
    }

    /**
     * {@inheritDoc}
     *
     */
    public DataDurationObj dataReceiveFinished(WorkflowTrackingContext context, DataDurationObj dataDurationObj,
            String... descriptionAndAnnotation) {

        if (context == null)
            throw new RuntimeException("Local entity passed was NULL.");
        if (dataDurationObj == null)
            throw new RuntimeException("Data duration object passed was NULL.");

        DataObj dataObj = null;
        if ((dataObj = dataDurationObj.getDataObj()) == null)
            throw new RuntimeException("Data duration object's DataObj was NULL.");
        if (dataObj.getId() == null)
            throw new RuntimeException("Data object's ID was NULL.");
        if (dataObj.getLocalLocation() == null)
            throw new RuntimeException("Local file URL passed in DataDurationObj.getDataObj was NULL.");
        if (dataDurationObj.getRemoteLocation() == null)
            throw new RuntimeException("Remote file URL passed in DataDurationObj was NULL.");

        // mark computation end
        dataDurationObj.markEndTimeMillis();

        // create activity
        DataReceiveDurationDocument activity = DataReceiveDurationDocument.Factory.newInstance();
        DataTransferDurationType activityType = activity.addNewDataReceiveDuration();

        activityType.setId(dataObj.getId().toString());
        activityType.setDurationInMillis(dataDurationObj.getDurationMillis());
        activityType.setSizeInBytes(dataObj.getSizeInBytes());

        activityType.setSource(dataObj.getLocalLocation().toString());

        activityType.setTarget(dataDurationObj.getRemoteLocation().toString());

        sendNotification(context, activity, descriptionAndAnnotation,
                "[Data from " + dataDurationObj.getRemoteLocation() + " was received at " + dataObj.getLocalLocation()
                        + "]");

        return dataDurationObj;
    }

    /**
     * {@inheritDoc}
     *
     */
    public DataDurationObj dataReceiveDuration(WorkflowTrackingContext context, URI dataID, URI remoteLocation,
            URI localLocation, int sizeInBytes, long durationMillis, String... descriptionAndAnnotation) {

        List<URI> locations = new ArrayList<URI>(2);
        locations.add(localLocation);
        locations.add(remoteLocation);

        DataObj dataObj = new DataObjImpl(dataID, locations, sizeInBytes);
        DataDurationObj dataDurationObj = new DataDurationImpl(dataObj, remoteLocation, durationMillis);

        return dataReceiveFinished(context, dataDurationObj, descriptionAndAnnotation);
    }

}
TOP

Related Classes of org.apache.airavata.workflow.tracking.impl.NotifierImpl

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.