/********************************************************* begin of preamble
**
** Copyright (C) 2003-2010 Software- und Organisations-Service GmbH.
** All rights reserved.
**
** This file may be used under the terms of either the
**
** GNU General Public License version 2.0 (GPL)
**
** as published by the Free Software Foundation
** http://www.gnu.org/licenses/gpl-2.0.txt and appearing in the file
** LICENSE.GPL included in the packaging of this file.
**
** or the
**
** Agreement for Purchase and Licensing
**
** as offered by Software- und Organisations-Service GmbH
** in the respective terms of supply that ship with this file.
**
** THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS
** IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
** THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
** PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS
** BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
** CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
** SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
** INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
** CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
** ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
** POSSIBILITY OF SUCH DAMAGE.
********************************************************** end of preamble*/
package com.sos.scheduler.engine.kernel.main.event;
import com.sos.scheduler.engine.kernel.SchedulerException;
import com.sos.scheduler.engine.kernel.event.Event;
import com.sos.scheduler.engine.kernel.event.EventPredicate;
import com.sos.scheduler.engine.kernel.event.EventSubscriber;
import com.sos.scheduler.engine.kernel.main.SchedulerController;
import com.sos.scheduler.engine.kernel.util.Time;
import com.sos.scheduler.engine.kernel.util.sync.ThrowableMailbox;
import java.util.*;
import static com.sos.scheduler.engine.kernel.util.Util.*;
/** Thread zu Verarbeitung von Scheduler-Ereignissen.
*
* @author Zschimmer.sos
*/
public abstract class EventThread extends Thread implements EventSubscriber {
private static final String terminatedEventName = TerminatedEvent.class.getSimpleName();
private final EventRendezvous rendezvous = new EventRendezvous();
private final ThrowableMailbox<Throwable> throwableMailbox = new ThrowableMailbox<Throwable>();
private Collection<EventPredicate> eventPredicates = Collections.singleton(EventPredicate.alwaysTrue);
private boolean threadIsStarted = true;
private SchedulerController schedulerController = null;
private int expectEventCount = 0;
public EventThread() {
setName(getClass().getSimpleName());
}
public final void setEventFilter(EventPredicate... p) {
setEventFilter(Arrays.asList(p));
}
public final void setEventFilter(Collection<EventPredicate> c) {
assert c != null;
eventPredicates = new ArrayList<EventPredicate>(c);
}
@Override public final void onEvent(Event e) {
if (e instanceof SchedulerReadyEvent)
onSchedulerThreadReady((SchedulerReadyEvent)e);
else
if (e instanceof TerminatedEvent)
onSchedulerThreadTerminated((TerminatedEvent)e);
else
if (eventMatchesPredicate(e))
rendezvous.unlockAndCall(e);
}
private void onSchedulerThreadReady(SchedulerReadyEvent e) {
schedulerController = e.getSchedulerController();
start(); // Thread läuft in run()
threadIsStarted = true;
}
private void onSchedulerThreadTerminated(TerminatedEvent e) {
try {
if (threadIsStarted) {
rendezvous.call(e); // Immer durchlassen, ohne eventPredicates zu fragen.
join();
throwableMailbox.throwUncheckedIfSet();
}
}
catch (InterruptedException x) { throw new SchedulerException(x); }
}
private boolean eventMatchesPredicate(Event e) {
for (EventPredicate p: eventPredicates) if (p.apply(e)) return true;
return false;
}
@Override public final void run() {
rendezvous.beginServing();
try {
runEventThreadAndTerminateScheduler();
}
// catch (UnexpectedTerminatedEventException x) {
// throwableMailbox.setIfFirst(x, Level.DEBUG);
// }
catch (Exception x) {
throwableMailbox.setIfFirst(x);
}
catch (Error x) {
throwableMailbox.setIfFirst(x);
throw x;
}
finally {
rendezvous.closeServing();
}
}
private void runEventThreadAndTerminateScheduler() throws Exception {
try {
runEventThread();
}
finally {
if (!rendezvous.terminatedEventReceived()) {
schedulerController.terminateScheduler();
waitForTerminatedEvent(Time.eternal);
}
}
}
protected abstract void runEventThread() throws Exception;
/**
* @return Das nicht mehr aktuelle Event, weil der Scheduler schon weiterläuft. Die Objekte am Events sind vielleicht nicht mehr gültig.
*/
public final void expectEvent(Time timeout, EventPredicate p) throws InterruptedException {
expectEvent(new EventExpectation(timeout, p));
}
/**
* @return Das nicht mehr aktuelle Event, weil der Scheduler schon weiterläuft. Die Objekte am Events sind vielleicht nicht mehr gültig.
*/
public final void expectEvent(EventExpectation expect) {
expectEventCount++;
EventPredicate p = expect.getPredicate();
Event e = enterEventHandling(expect.getTimeout());
try {
if (!p.apply(e)) throw UnexpectedEventException.of(e, p, expectEventCount);
if (e instanceof ExceptionEvent) {
ExceptionEvent xe = (ExceptionEvent)e;
throw new SchedulerException("Error while polling event: " + xe.getException(), xe.getException() );
}
//logger.info("Expected event: " + e);
}
finally {
leaveEventHandling();
}
}
// /**
// * @return Das nicht mehr aktuelle Event, weil der Scheduler schon weiterläuft. Die Objekte am Events sind vielleicht nicht mehr gültig.
// */
// public <T extends Event> T expectEvent(Time timeout, Class<T> clas) throws InterruptedException {
// Event e = enterEventHandling(timeout);
// try {
// return castEvent(clas, e);
// }
// finally {
// leaveEventHandling();
// }
// }
// @SuppressWarnings("unchecked")
// private <T> T castEvent(Class<T> clas, Event e) {
// if (!clas.isAssignableFrom(e.getClass())) throw new SchedulerException(clas.getSimpleName() + " expected instead of " + e);
// return (T)e;
// }
public final void waitForTerminatedEvent(Time timeout) throws InterruptedException {
while(!rendezvous.terminatedEventReceived()) {
Event e = enterEventHandling(timeout);
if (e == null) throw new SchedulerException(terminatedEventName + " expected instead of timeout after " + timeout);
try {
// if (e instanceof SchedulerCloseEvent) logger.info(e + " ignored");
// else
if (!rendezvous.terminatedEventReceived()) {
SchedulerException x = new SchedulerException(terminatedEventName + " expected instead of " + e);
throwableMailbox.setIfFirst(x);
}
}
finally {
leaveEventHandling();
}
}
}
public final Event enterEventHandling() {
return enterEventHandling(Time.eternal);
}
public final Event enterEventHandling(Time timeout) {
return rendezvous.enter(timeout);
}
public final void leaveEventHandling() {
rendezvous.leave();
}
}