Package com.sos.scheduler.engine.kernel.main.event

Source Code of com.sos.scheduler.engine.kernel.main.event.EventThread

/********************************************************* begin of preamble
**
** Copyright (C) 2003-2010 Software- und Organisations-Service GmbH.
** All rights reserved.
**
** This file may be used under the terms of either the
**
**   GNU General Public License version 2.0 (GPL)
**
**   as published by the Free Software Foundation
**   http://www.gnu.org/licenses/gpl-2.0.txt and appearing in the file
**   LICENSE.GPL included in the packaging of this file.
**
** or the
** 
**   Agreement for Purchase and Licensing
**
**   as offered by Software- und Organisations-Service GmbH
**   in the respective terms of supply that ship with this file.
**
** THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
** IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
** THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
** PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS
** BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
** CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
** SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
** INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
** CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
** ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
** POSSIBILITY OF SUCH DAMAGE.
********************************************************** end of preamble*/
package com.sos.scheduler.engine.kernel.main.event;

import com.sos.scheduler.engine.kernel.SchedulerException;
import com.sos.scheduler.engine.kernel.event.Event;
import com.sos.scheduler.engine.kernel.event.EventPredicate;
import com.sos.scheduler.engine.kernel.event.EventSubscriber;
import com.sos.scheduler.engine.kernel.main.SchedulerController;
import com.sos.scheduler.engine.kernel.util.Time;
import com.sos.scheduler.engine.kernel.util.sync.ThrowableMailbox;
import java.util.*;
import static com.sos.scheduler.engine.kernel.util.Util.*;


/** Thread zu Verarbeitung von Scheduler-Ereignissen.
*
* @author Zschimmer.sos
*/
public abstract class EventThread extends Thread implements EventSubscriber {
    private static final String terminatedEventName = TerminatedEvent.class.getSimpleName();

    private final EventRendezvous rendezvous = new EventRendezvous();
    private final ThrowableMailbox<Throwable> throwableMailbox = new ThrowableMailbox<Throwable>();
    private Collection<EventPredicate> eventPredicates = Collections.singleton(EventPredicate.alwaysTrue);
    private boolean threadIsStarted = true;
    private SchedulerController schedulerController = null;
    private int expectEventCount = 0;


    public EventThread() {
        setName(getClass().getSimpleName());
    }


    public final void setEventFilter(EventPredicate... p) {
        setEventFilter(Arrays.asList(p));
    }

   
    public final void setEventFilter(Collection<EventPredicate> c) {
        assert c != null;
        eventPredicates = new ArrayList<EventPredicate>(c);
    }


    @Override public final void onEvent(Event e) {
        if (e instanceof SchedulerReadyEvent)
            onSchedulerThreadReady((SchedulerReadyEvent)e);
        else
        if (e instanceof TerminatedEvent)
            onSchedulerThreadTerminated((TerminatedEvent)e);
        else
        if (eventMatchesPredicate(e))
            rendezvous.unlockAndCall(e);
    }


    private void onSchedulerThreadReady(SchedulerReadyEvent e) {
        schedulerController = e.getSchedulerController();
        start()// Thread läuft in run()
        threadIsStarted = true;
    }


    private void onSchedulerThreadTerminated(TerminatedEvent e) {
        try {
            if (threadIsStarted) {
                rendezvous.call(e);     // Immer durchlassen, ohne eventPredicates zu fragen.
                join();
                throwableMailbox.throwUncheckedIfSet();
            }
        }
        catch (InterruptedException x) { throw new SchedulerException(x); }
    }


    private boolean eventMatchesPredicate(Event e) {
        for (EventPredicate p: eventPredicatesif (p.apply(e))  return true;
        return false;
    }


    @Override public final void run() {
        rendezvous.beginServing();
       
        try {
            runEventThreadAndTerminateScheduler();
        }
//        catch (UnexpectedTerminatedEventException x) {
//            throwableMailbox.setIfFirst(x, Level.DEBUG);
//        }
        catch (Exception x) {
            throwableMailbox.setIfFirst(x);
        }
        catch (Error x) {
            throwableMailbox.setIfFirst(x);
            throw x;
        }
        finally {
            rendezvous.closeServing();
        }
    }


    private void runEventThreadAndTerminateScheduler() throws Exception {
        try {
            runEventThread();
        }
        finally {
            if (!rendezvous.terminatedEventReceived()) {
                schedulerController.terminateScheduler();
                waitForTerminatedEvent(Time.eternal);
            }
        }
    }

   
    protected abstract void runEventThread() throws Exception;

   
    /**
     * @return Das nicht mehr aktuelle Event, weil der Scheduler schon weiterläuft. Die Objekte am Events sind vielleicht nicht mehr gültig.
     */
    public final void expectEvent(Time timeout, EventPredicate p) throws InterruptedException {
        expectEvent(new EventExpectation(timeout, p));
    }


    /**
     * @return Das nicht mehr aktuelle Event, weil der Scheduler schon weiterläuft. Die Objekte am Events sind vielleicht nicht mehr gültig.
     */
    public final void expectEvent(EventExpectation expect) {
        expectEventCount++;
        EventPredicate p = expect.getPredicate();
        Event e = enterEventHandling(expect.getTimeout());
        try {
            if (!p.apply(e)) throw UnexpectedEventException.of(e, p, expectEventCount);
            if (e instanceof ExceptionEvent) {
                ExceptionEvent xe = (ExceptionEvent)e;
                throw new SchedulerException("Error while polling event: " + xe.getException(), xe.getException() );
            }

            //logger.info("Expected event: " + e);
        }
        finally {
            leaveEventHandling();
        }
    }


//    /**
//     * @return Das nicht mehr aktuelle Event, weil der Scheduler schon weiterläuft. Die Objekte am Events sind vielleicht nicht mehr gültig.
//     */
//    public <T extends Event> T expectEvent(Time timeout, Class<T> clas) throws InterruptedException {
//        Event e = enterEventHandling(timeout);
//        try {
//            return castEvent(clas, e);
//        }
//        finally {
//            leaveEventHandling();
//        }
//    }

   
//    @SuppressWarnings("unchecked")
//    private <T> T castEvent(Class<T> clas, Event e) {
//        if (!clas.isAssignableFrom(e.getClass()))  throw new SchedulerException(clas.getSimpleName() + " expected instead of " + e);
//        return (T)e;
//    }


    public final void waitForTerminatedEvent(Time timeout) throws InterruptedException {
        while(!rendezvous.terminatedEventReceived()) {
            Event e = enterEventHandling(timeout);
            if (e == nullthrow new SchedulerException(terminatedEventName + " expected instead of timeout after " + timeout);
           
            try {
//                if (e instanceof SchedulerCloseEvent)  logger.info(e + " ignored");
//                else
                if (!rendezvous.terminatedEventReceived()) {
                    SchedulerException x = new SchedulerException(terminatedEventName + " expected instead of " + e);
                    throwableMailbox.setIfFirst(x);
                }
            }
            finally {
                leaveEventHandling();
            }
        }
    }


    public final Event enterEventHandling() {
        return enterEventHandling(Time.eternal);
    }


    public final Event enterEventHandling(Time timeout) {
        return rendezvous.enter(timeout);
    }


    public final void leaveEventHandling() {
        rendezvous.leave();
    }
}
TOP

Related Classes of com.sos.scheduler.engine.kernel.main.event.EventThread

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.