Package org.apache.oozie.command

Source Code of org.apache.oozie.command.Command

/**
* Copyright (c) 2010 Yahoo! Inc. All rights reserved.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*   http://www.apache.org/licenses/LICENSE-2.0
*
*  Unless required by applicable law or agreed to in writing, software
*  distributed under the License is distributed on an "AS IS" BASIS,
*  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*  See the License for the specific language governing permissions and
*  limitations under the License. See accompanying LICENSE file.
*/
package org.apache.oozie.command;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;

import org.apache.oozie.CoordinatorActionBean;
import org.apache.oozie.CoordinatorJobBean;
import org.apache.oozie.ErrorCode;
import org.apache.oozie.FaultInjection;
import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.XException;
import org.apache.oozie.service.CallableQueueService;
import org.apache.oozie.service.DagXLogInfoService;
import org.apache.oozie.service.InstrumentationService;
import org.apache.oozie.service.MemoryLocksService;
import org.apache.oozie.service.Services;
import org.apache.oozie.service.StoreService;
import org.apache.oozie.service.XLogService;
import org.apache.oozie.store.Store;
import org.apache.oozie.store.StoreException;
import org.apache.oozie.store.WorkflowStore;
import org.apache.oozie.util.Instrumentation;
import org.apache.oozie.util.ParamChecker;
import org.apache.oozie.util.XCallable;
import org.apache.oozie.util.XLog;
import org.apache.oozie.util.MemoryLocks.LockToken;

/**
* Base class for all synchronous and asynchronous DagEngine commands.
*/
public abstract class Command<T, S extends Store> implements XCallable<T> {
    /**
     * The instrumentation group used for Commands.
     */
    private static final String INSTRUMENTATION_GROUP = "commands";

    private final long createdTime;

    /**
     * The instrumentation group used for Jobs.
     */
    private static final String INSTRUMENTATION_JOB_GROUP = "jobs";

    private static final long LOCK_TIMEOUT = 1000;
    protected static final long LOCK_FAILURE_REQUEUE_INTERVAL = 30000;

    protected Instrumentation instrumentation;
    private List<XCallable<Void>> callables;
    private List<XCallable<Void>> delayedCallables;
    private long delay = 0;
    private List<XCallable<Void>> exceptionCallables;
    private String name;
    private String type;
    private String key;
    private int priority;
    private int logMask;
    private boolean withStore;
    protected boolean dryrun = false;
    private ArrayList<LockToken> locks = null;

    /**
     * This variable is package private for testing purposes only.
     */
    XLog.Info logInfo;

    /**
     * Create a command that uses a {@link WorkflowStore} instance. <p/> The current {@link XLog.Info} values are
     * captured for execution.
     *
     * @param name command name.
     * @param type command type.
     * @param priority priority of the command, used when queuing for asynchronous execution.
     * @param logMask log mask for the command logging calls.
     */
    public Command(String name, String type, int priority, int logMask) {
        this(name, type, priority, logMask, true);
    }

    /**
     * Create a command. <p/> The current {@link XLog.Info} values are captured for execution.
     *
     * @param name command name.
     * @param type command type.
     * @param priority priority of the command, used when queuing for asynchronous execution.
     * @param logMask log mask for the command logging calls.
     * @param withStore indicates if the command needs a {@link org.apache.oozie.store.WorkflowStore} instance or not.
     */
    public Command(String name, String type, int priority, int logMask, boolean withStore) {
        this.name = ParamChecker.notEmpty(name, "name");
        this.type = ParamChecker.notEmpty(type, "type");
        this.key = name + "_" + UUID.randomUUID();
        this.priority = priority;
        this.withStore = withStore;
        this.logMask = logMask;
        instrumentation = Services.get().get(InstrumentationService.class).get();
        logInfo = new XLog.Info(XLog.Info.get());
        createdTime = System.currentTimeMillis();
        locks = new ArrayList<LockToken>();
    }

    /**
     * Create a command. <p/> The current {@link XLog.Info} values are captured for execution.
     *
     * @param name command name.
     * @param type command type.
     * @param priority priority of the command, used when queuing for asynchronous execution.
     * @param logMask log mask for the command logging calls.
     * @param withStore indicates if the command needs a {@link org.apache.oozie.store.WorkflowStore} instance or not.
     * @param dryrun indicates if dryrun option is enabled. if enabled coordinator will show a diagnostic output without
     * really submitting the job
     */
    public Command(String name, String type, int priority, int logMask, boolean withStore, boolean dryrun) {
        this(name, type, priority, logMask, withStore);
        this.dryrun = dryrun;
    }

    /**
     * Return the name of the command.
     *
     * @return the name of the command.
     */
    @Override
    public String getName() {
        return name;
    }

    /**
     * Return the callable type. <p/> The callable type is used for concurrency throttling in the {@link
     * org.apache.oozie.service.CallableQueueService}.
     *
     * @return the callable type.
     */
    @Override
    public String getType() {
        return type;
    }

    /**
     * Return the priority of the command.
     *
     * @return the priority of the command.
     */
    @Override
    public int getPriority() {
        return priority;
    }

    /**
     * Returns the createdTime of the callable in milliseconds
     *
     * @return the callable createdTime
     */
    @Override
    public long getCreatedTime() {
        return createdTime;
    }

    /**
     * Execute the command {@link #call(WorkflowStore)} setting all the necessary context. <p/> The {@link XLog.Info} is
     * set to the values at instance creation time. <p/> The command execution is logged and instrumented. <p/> If a
     * {@link WorkflowStore} is used, a fresh instance will be passed and it will be commited after the {@link
     * #call(WorkflowStore)} execution. It will be closed without committing if an exception is thrown. <p/> Commands
     * queued via the DagCommand queue methods are queued for execution after the workflow store has been committed.
     * <p/> If an exception happends the queued commands will not be effectively queued for execution. Instead, the the
     * commands queued for exception will be effectively queued fro execution..
     *
     * @throws CommandException thrown if the command could not be executed successfully, the workflow store is closed
     * without committing, thus doing a rollback.
     */
    @SuppressWarnings({"ThrowFromFinallyBlock", "unchecked"})
    public final T call() throws CommandException {
        XLog.Info.get().setParameters(logInfo);
        XLog log = XLog.getLog(getClass());
        log.trace(logMask, "Start");
        Instrumentation.Cron cron = new Instrumentation.Cron();
        cron.start();
        callables = new ArrayList<XCallable<Void>>();
        delayedCallables = new ArrayList<XCallable<Void>>();
        exceptionCallables = new ArrayList<XCallable<Void>>();
        delay = 0;
        S store = null;
        boolean exception = false;

        try {
            if (withStore) {
                store = (S) Services.get().get(StoreService.class).getStore(getStoreClass());
                store.beginTrx();
            }
            T result = execute(store);
            /*
             *
             * if (store != null && log != null) { log.info(XLog.STD,
             * "connection log from store Flush Mode {0} ",
             * store.getFlushMode()); }
             */
            if (withStore) {
                if (store == null) {
                    throw new IllegalStateException("WorkflowStore should not be null");
                }
                if (FaultInjection.isActive("org.apache.oozie.command.SkipCommitFaultInjection")) {
                    throw new RuntimeException("Skipping Commit for Failover Testing");
                }
                store.commitTrx();
            }

            // TODO figure out the reject due to concurrency problems and remove
            // the delayed queuing for callables.
            boolean ret = Services.get().get(CallableQueueService.class).queueSerial(callables, 10);
            if (ret == false) {
                logQueueCallableFalse(callables);
            }

            ret = Services.get().get(CallableQueueService.class).queueSerial(delayedCallables, delay);
            if (ret == false) {
                logQueueCallableFalse(delayedCallables);
            }

            return result;
        }
        catch (XException ex) {
            log.error(logMask | XLog.OPS, "XException, {0}", ex);
            if (store != null) {
                log.info(XLog.STD, "XException - connection logs from store {0}, {1}", store.getConnection(), store
                        .isClosed());
            }
            exception = true;
            if (store != null && store.isActive()) {
                try {
                    store.rollbackTrx();
                }
                catch (RuntimeException rex) {
                    log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex);
                }
            }

            // TODO figure out the reject due to concurrency problems and remove
            // the delayed queuing for callables.
            boolean ret = Services.get().get(CallableQueueService.class).queueSerial(exceptionCallables, 10);
            if (ret == false) {
                logQueueCallableFalse(exceptionCallables);
            }
            if (ex instanceof CommandException) {
                throw (CommandException) ex;
            }
            else {
                throw new CommandException(ex);
            }
        }
        catch (Exception ex) {
            log.error(logMask | XLog.OPS, "Exception, {0}", ex);
            exception = true;
            if (store != null && store.isActive()) {
                try {
                    store.rollbackTrx();
                }
                catch (RuntimeException rex) {
                    log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex);
                }
            }
            throw new CommandException(ErrorCode.E0607, ex);
        }
        catch (Error er) {
            log.error(logMask | XLog.OPS, "Error, {0}", er);
            exception = true;
            if (store != null && store.isActive()) {
                try {
                    store.rollbackTrx();
                }
                catch (RuntimeException rex) {
                    log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex);
                }
            }
            throw er;
        }
        finally {
            FaultInjection.deactivate("org.apache.oozie.command.SkipCommitFaultInjection");
            cron.stop();
            instrumentation.addCron(INSTRUMENTATION_GROUP, name, cron);
            incrCommandCounter(1);
            log.trace(logMask, "End");
            if (locks != null) {
                for (LockToken lock : locks) {
                    lock.release();
                }
                locks.clear();
            }
            if (store != null) {
                if (!store.isActive()) {
                    try {
                        store.closeTrx();
                    }
                    catch (RuntimeException rex) {
                        if (exception) {
                            log.warn(logMask | XLog.OPS, "openjpa error, {0}, {1}", name, rex.getMessage(), rex);
                        }
                        else {
                            throw rex;
                        }
                    }
                }
                else {
                    log.warn(logMask | XLog.OPS, "transaction is not committed or rolled back before closing entitymanager.");
                }
            }
        }
    }

    /**
     * Queue a callable for execution after the current callable call invocation completes and the {@link WorkflowStore}
     * transaction commits. <p/> All queued callables, regardless of the number of queue invocations, are queued for a
     * single serial execution. <p/> If the call invocation throws an exception all queued callables are discarded, they
     * are not queued for execution.
     *
     * @param callable callable to queue for execution.
     */
    protected void queueCallable(XCallable<Void> callable) {
        callables.add(callable);
    }

    /**
     * Queue a list of callables for execution after the current callable call invocation completes and the {@link
     * WorkflowStore} transaction commits. <p/> All queued callables, regardless of the number of queue invocations, are
     * queued for a single serial execution. <p/> If the call invocation throws an exception all queued callables are
     * discarded, they are not queued for execution.
     *
     * @param callables list of callables to queue for execution.
     */
    protected void queueCallable(List<? extends XCallable<Void>> callables) {
        this.callables.addAll(callables);
    }

    /**
     * Queue a callable for delayed execution after the current callable call invocation completes and the {@link
     * WorkflowStore} transaction commits. <p/> All queued delayed callables, regardless of the number of delay queue
     * invocations, are queued for a single serial delayed execution with the highest delay of all queued callables.
     * <p/> If the call invocation throws an exception all queued callables are discarded, they are not queued for
     * execution.
     *
     * @param callable callable to queue for delayed execution.
     * @param delay the queue delay in milliseconds
     */
    protected void queueCallable(XCallable<Void> callable, long delay) {
        this.delayedCallables.add(callable);
        this.delay = Math.max(this.delay, delay);
    }

    /**
     * Queue a callable for execution only in the event of an exception being thrown during the call invocation. <p/> If
     * an exception does not happen, all the callables queued by this method are discarded, they are not queued for
     * execution. <p/> All queued callables, regardless of the number of queue invocations, are queued for a single
     * serial execution.
     *
     * @param callable callable to queue for execution in the case of an exception.
     */
    protected void queueCallableForException(XCallable<Void> callable) {
        exceptionCallables.add(callable);
    }

    /**
     * Logging the info if failed to queue the callables.
     *
     * @param callables
     */
    protected void logQueueCallableFalse(List<? extends XCallable<Void>> callables) {
        StringBuilder sb = new StringBuilder(
                "Unable to queue the callables, delayedQueue is full or system is in SAFEMODE - failed to queue:[");
        int size = callables.size();
        for (int i = 0; i < size; i++) {
            XCallable<Void> callable = callables.get(i);
            sb.append(callable.getName());
            if (i < size - 1) {
                sb.append(", ");
            }
            else {
                sb.append("]");
            }
        }
        XLog.getLog(getClass()).warn(sb.toString());
    }

    /**
     * DagCallable subclasses must implement this method to perform their task. <p/> The workflow store works in
     * transactional mode. The transaction is committed only if this method ends successfully. Otherwise the transaction
     * is rolledback.
     *
     * @param store the workflow store instance for the callable, <code>null</code> if the callable does not use a
     * store.
     * @return the return value of the callable.
     * @throws StoreException thrown if the workflow store could not perform an operation.
     * @throws CommandException thrown if the command could not perform its operation.
     */
    protected abstract T call(S store) throws StoreException, CommandException;

    // to do
    // need to implement on all sub commands and break down the transactions

    // protected abstract T execute(String id) throws CommandException;

    /**
     * Command subclasses must implement this method correct Store can be passed to call(store);
     *
     * @return the Store class for use by Callable
     * @throws CommandException thrown if the command could not perform its operation.
     */
    protected abstract Class<? extends Store> getStoreClass();

    /**
     * Set the log info with the context of the given coordinator bean.
     *
     * @param cBean coordinator bean.
     */
    protected void setLogInfo(CoordinatorJobBean cBean) {
        if (logInfo.getParameter(XLogService.GROUP) == null) {
            logInfo.setParameter(XLogService.GROUP, cBean.getGroup());
        }
        if (logInfo.getParameter(XLogService.USER) == null) {
            logInfo.setParameter(XLogService.USER, cBean.getUser());
        }
        logInfo.setParameter(DagXLogInfoService.JOB, cBean.getId());
        logInfo.setParameter(DagXLogInfoService.TOKEN, "");
        logInfo.setParameter(DagXLogInfoService.APP, cBean.getAppName());
        XLog.Info.get().setParameters(logInfo);
    }

    /**
     * Set the log info with the context of the given coordinator action bean.
     *
     * @param action action bean.
     */
    protected void setLogInfo(CoordinatorActionBean action) {
        logInfo.setParameter(DagXLogInfoService.JOB, action.getJobId());
        // logInfo.setParameter(DagXLogInfoService.TOKEN, action.getLogToken());
        logInfo.setParameter(DagXLogInfoService.ACTION, action.getId());
        XLog.Info.get().setParameters(logInfo);
    }

    /**
     * Set the log info with the context of the given workflow bean.
     *
     * @param workflow workflow bean.
     */
    protected void setLogInfo(WorkflowJobBean workflow) {
        if (logInfo.getParameter(XLogService.GROUP) == null) {
            logInfo.setParameter(XLogService.GROUP, workflow.getGroup());
        }
        if (logInfo.getParameter(XLogService.USER) == null) {
            logInfo.setParameter(XLogService.USER, workflow.getUser());
        }
        logInfo.setParameter(DagXLogInfoService.JOB, workflow.getId());
        logInfo.setParameter(DagXLogInfoService.TOKEN, workflow.getLogToken());
        logInfo.setParameter(DagXLogInfoService.APP, workflow.getAppName());
        XLog.Info.get().setParameters(logInfo);
    }

    /**
     * Set the log info with the context of the given action bean.
     *
     * @param action action bean.
     */
    protected void setLogInfo(WorkflowActionBean action) {
        logInfo.setParameter(DagXLogInfoService.JOB, action.getJobId());
        logInfo.setParameter(DagXLogInfoService.TOKEN, action.getLogToken());
        logInfo.setParameter(DagXLogInfoService.ACTION, action.getId());
        XLog.Info.get().setParameters(logInfo);
    }

    /**
     * Reset the action bean information from the log info.
     */
    // TODO check if they are used, else delete
    protected void resetLogInfoAction() {
        logInfo.clearParameter(DagXLogInfoService.ACTION);
        XLog.Info.get().clearParameter(DagXLogInfoService.ACTION);
    }

    /**
     * Reset the workflow bean information from the log info.
     */
    // TODO check if they are used, else delete
    protected void resetLogInfoWorkflow() {
        logInfo.clearParameter(DagXLogInfoService.JOB);
        logInfo.clearParameter(DagXLogInfoService.APP);
        logInfo.clearParameter(DagXLogInfoService.TOKEN);
        XLog.Info.get().clearParameter(DagXLogInfoService.JOB);
        XLog.Info.get().clearParameter(DagXLogInfoService.APP);
        XLog.Info.get().clearParameter(DagXLogInfoService.TOKEN);
    }

    /**
     * Convenience method to increment counters.
     *
     * @param group the group name.
     * @param name the counter name.
     * @param count increment count.
     */
    private void incrCounter(String group, String name, int count) {
        if (instrumentation != null) {
            instrumentation.incr(group, name, count);
        }
    }

    /**
     * Used to increment command counters.
     *
     * @param count the increment count.
     */
    protected void incrCommandCounter(int count) {
        incrCounter(INSTRUMENTATION_GROUP, name, count);
    }

    /**
     * Used to increment job counters. The counter name s the same as the command name.
     *
     * @param count the increment count.
     */
    protected void incrJobCounter(int count) {
        incrJobCounter(name, count);
    }

    /**
     * Used to increment job counters.
     *
     * @param name the job name.
     * @param count the increment count.
     */
    protected void incrJobCounter(String name, int count) {
        incrCounter(INSTRUMENTATION_JOB_GROUP, name, count);
    }

    /**
     * Return the {@link Instrumentation} instance in use.
     *
     * @return the {@link Instrumentation} instance in use.
     */
    protected Instrumentation getInstrumentation() {
        return instrumentation;
    }

    /**
     * Return the identity.
     *
     * @return the identity.
     */
    @Override
    public String toString() {
        StringBuilder sb = new StringBuilder();
        sb.append(getType());
        sb.append(",").append(getPriority());
        return sb.toString();
    }

    protected boolean lock(String id) throws InterruptedException {
        if (id == null || id.length() == 0) {
            XLog.getLog(getClass()).warn("lock(): Id is null or empty :" + id + ":");
            return false;
        }
        LockToken token = Services.get().get(MemoryLocksService.class).getWriteLock(id, LOCK_TIMEOUT);
        if (token != null) {
            locks.add(token);
            return true;
        }
        else {
            return false;
        }
    }

    /*
     * TODO - remove store coupling to EM. Store will only contain queries
     * protected EntityManager getEntityManager() { return
     * store.getEntityManager(); }
     */
    protected T execute(S store) throws CommandException, StoreException {
        T result = call(store);
        return result;
    }

    /**
     * Get command key
     *
     * @return command key
     */
    @Override
    public String getKey(){
        return this.key;
    }

}
TOP

Related Classes of org.apache.oozie.command.Command

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.