Package org.jenkinsci.plugins.workflow.cps

Source Code of org.jenkinsci.plugins.workflow.cps.CpsThread

/*
* The MIT License
*
* Copyright (c) 2013-2014, CloudBees, Inc.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/

package org.jenkinsci.plugins.workflow.cps;

import com.cloudbees.groovy.cps.Continuable;
import com.cloudbees.groovy.cps.Outcome;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.SettableFuture;
import org.jenkinsci.plugins.workflow.cps.persistence.PersistIn;
import org.jenkinsci.plugins.workflow.steps.StepExecution;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.io.IOException;
import java.io.Serializable;
import java.util.concurrent.Future;
import java.util.logging.Logger;

import static java.util.logging.Level.*;
import static org.jenkinsci.plugins.workflow.cps.persistence.PersistenceContext.*;

/**
* Represents a {@link Continuable} that is either runnable or suspended (that waits for an
* external event.)
*
* @author Kohsuke Kawaguchi
*/
@PersistIn(PROGRAM)
public final class CpsThread implements Serializable {
    /**
     * Owner object. A thread always belong to a {@link CpsThreadGroup}
     */
    @Nonnull
    final CpsThreadGroup group;

    /**
     * Unique ID of this thread among all the threads in the past or future under the same {@link CpsThreadGroup}.
     * This acts as a persistable handle for {@link CpsStepContext} to
     * {@linkplain CpsStepContext#getThread(CpsThreadGroup) refer back to the thread},
     * because they are persisted separately.
     */
    public final int id;

    /**
     * Represents the remaining computation.
     */
    private volatile Continuable program;

    /**
     * The value that feeds into the next execution of {@link #program}. Even though this is an input
     * from this class' point of view, it's typed as {@link Outcome} because from the CPS-transformed
     * program's point of view, this value acts as a return value (or an exception thrown)
     * from {@link Continuable#suspend(Object)}
     */
    Outcome resumeValue;

    /**
     * Promise that {@link Continuable#run0(Outcome)} gets eventually invoked with {@link #resumeValue}.
     */
    private transient SettableFuture<Object> promise;

    /**
     * The head of the flow node graph that this thread is growing.
     *
     * <p>
     * We create {@link CpsThread}s liberally in {@link CpsStepContext#invokeBodyLater(FutureCallback, Object...)},
     * and so multiple {@link CpsThread}s often share the same flow head.
     */
    final FlowHead head;

    @Nullable
    private final ContextVariableSet contextVariables;

    /**
     * If this thread is waiting for a {@link StepExecution} to complete (by invoking our callback),
     * this field is set to that execution.
     */
    private StepExecution step;

    CpsThread(CpsThreadGroup group, int id, Continuable program, FlowHead head, ContextVariableSet contextVariables) {
        this.group = group;
        this.id = id;
        this.program = group.getExecution().isSandbox() ? new SandboxContinuable(program,this) : program;
        this.head = head;
        this.contextVariables = contextVariables;
    }

    public CpsThreadGroup getGroup() {
        return group;
    }

    public CpsFlowExecution getExecution() {
        return group.getExecution();
    }

    <T> T getContextVariable(Class<T> type) {
        if (contextVariables==null)     return null;
        return contextVariables.get(type);
    }

    public ContextVariableSet getContextVariables() {
        return contextVariables;
    }

    boolean isRunnable() {
        return resumeValue!=null;
    }

    public StepExecution getStep() {
        return step;
    }

    /*package*/ void setStep(StepExecution step) {
        this.step = step;
    }

    /**
     * Executes CPS code synchronously a little bit more, until it hits
     * the point the workflow needs to be dehydrated.
     */
    @Nonnull Outcome runNextChunk() throws IOException {
        assert program!=null;

        while (true) {
            Outcome outcome;

            final CpsThread old = CURRENT.get();
            CURRENT.set(this);

            try {
                LOGGER.log(FINE, "runNextChunk on {0}", resumeValue);
                Outcome o = resumeValue;
                resumeValue = null;
                outcome = program.run0(o);
                if (outcome.getAbnormal() != null) {
                    LOGGER.log(FINE, "ran and produced error", outcome.getAbnormal());
                } else {
                    LOGGER.log(FINE, "ran and produced {0}", outcome);
                }
            } finally {
                CURRENT.set(old);
            }

            if (outcome.getNormal() instanceof ThreadTask) {
                // if an execution in the thread safepoint is requested, deliver that
                ThreadTask sc = (ThreadTask) outcome.getNormal();
                ThreadTaskResult r = sc.eval(this);
                if (r.resume!=null) {
                    // keep evaluating the CPS code
                    resumeValue = r.resume;
                    continue;
                } else {
                    // break but with a different value
                    outcome = r.suspend;
                }
            }


            if (promise!=null) {
                if (outcome.isSuccess())        promise.set(outcome.getNormal());
                else {
                    try {
                        promise.setException(outcome.getAbnormal());
                    } catch (Error e) {
                        if (e==outcome.getAbnormal()) {
                            // SettableFuture tries to rethrow an Error, which we don't want.
                            // so prevent that from happening. I need to see if this behaviour
                            // affects other places that use SettableFuture
                            ;
                        } else {
                            throw e;
                        }
                    }
                }
                promise = null;
            }

            return outcome;
        }
    }

    /**
     * Does this thread still have something to execute?
     * (as opposed to have finished running, either normally or abnormally?)
     */
    boolean isAlive() {
        return program.isResumable();
    }

    /**
     * Schedules the execution of this thread from the last {@linkplain Continuable#suspend(Object)} point.
     *
     * @return
     *      Future that promises the completion of the next {@link #runNextChunk()}.
     */
    public Future<Object> resume(Outcome v) {
        assert resumeValue==null;
        resumeValue = v;
        promise = SettableFuture.create();
        group.scheduleRun();
        return promise;
    }

    private static final Logger LOGGER = Logger.getLogger(CpsThread.class.getName());

    private static final long serialVersionUID = 1L;

    private static final ThreadLocal<CpsThread> CURRENT = new ThreadLocal<CpsThread>();

    /**
     * While {@link CpsThreadGroup} executes, this method returns {@link CpsThread}
     * that's running.
     */
    @CpsVmThreadOnly
    public static CpsThread current() {
        return CURRENT.get();
    }

    @Override public String toString() {
        // getExecution().getOwner() would be useful but seems problematic.
        return "Thread #" + id + String.format(" @%h", this);
    }

}
TOP

Related Classes of org.jenkinsci.plugins.workflow.cps.CpsThread

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.