Package org.agilewiki.jactor.lpc

Source Code of org.agilewiki.jactor.lpc.JAEventRequest

/*
* Copyright 2011 Bill La Forge
*
* This file is part of AgileWiki and is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License (LGPL) as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This code is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
* or navigate to the following url http://www.gnu.org/licenses/lgpl-2.1.txt
*
* Note however that only Scala, Java and JavaScript files are being covered by LGPL.
* All other files are covered by the Common Public License (CPL).
* A copy of this license is also included and can be
* found as well at http://www.opensource.org/licenses/cpl1.0.txt
*/
package org.agilewiki.jactor.lpc;

import java.util.List;

import org.agilewiki.jactor.Actor;
import org.agilewiki.jactor.ExceptionHandler;
import org.agilewiki.jactor.JANoResponse;
import org.agilewiki.jactor.Mailbox;
import org.agilewiki.jactor.MailboxFactory;
import org.agilewiki.jactor.RP;
import org.agilewiki.jactor.apc.APCRequestSource;
import org.agilewiki.jactor.apc.JAMessage;
import org.agilewiki.jactor.apc.JARequest;
import org.agilewiki.jactor.apc.JAResponse;
import org.agilewiki.jactor.apc.RequestProcessor;
import org.agilewiki.jactor.bufferedEvents.BufferedEventsDestination;
import org.agilewiki.jactor.bufferedEvents.BufferedEventsQueue;
import org.agilewiki.jactor.events.EventQueue;
import org.agilewiki.jactor.simpleMachine._SMBuilder;

/**
* <p>
* An actor which implements Local Procedure Calls (LPC)
* and mostly works synchronously.
* Actors need to implement the processRequest method.
* </p>
* <pre>
* public class Multiply {
*     public int a;
*     public int b;
* }
*
* public class Multiplier extends JLPCActor {
*
*     public Multiplier(Mailbox mailbox) {
*         super(mailbox);
*     }
*
*     protected void processRequest(Object req, RP rp)
*             throws Exception {
*         Multiply m = (Multiply) req;
*         rp.process(new Integer(m.a * m.b));
*     }
* }
* </pre>
*/
abstract public class JLPCActor implements TargetActor, RequestProcessor,
        RequestSource {
    /**
     * The inbox and outbox of the actor.
     */
    private Mailbox mailbox;

    /**
     * The parent actor, for dependency injection.
     */
    private JLPCActor parent;

    /**
     * Initialize a degraded LiteActor
     */
    public void initialize() throws Exception {
        initialize(null, null);
    }

    /**
     * Initialize a degraded LiteActor
     *
     * @param parent The parent actor.
     */
    public void initialize(final Actor parent) throws Exception {
        initialize(null, parent);
    }

    /**
     * Initialize a LiteActor
     *
     * @param mailbox A mailbox which may be shared with other actors.
     */
    public void initialize(final Mailbox mailbox) throws Exception {
        initialize(mailbox, null);
    }

    /**
     * Initialize a LiteActor
     *
     * @param mailbox A mailbox which may be shared with other actors.
     * @param parent  The parent actor.
     */
    public void initialize(final Mailbox mailbox, final Actor parent)
            throws Exception {
        if (this.mailbox != null || this.parent != null)
            throw new IllegalStateException("already initialized");
        this.mailbox = mailbox;
        this.parent = (JLPCActor) parent;
    }

    /**
     * Returns the actor's parent.
     *
     * @return The actor's parent, or null.
     */
    @Override
    final public JLPCActor getParent() {
        return parent;
    }

    @Override
    public JLPCActor getMatch(final Class targetClass) {
        if (targetClass.isInstance(this))
            return this;
        return getAncestor(targetClass);
    }

    /**
     * Returns A matching ancestor from the parent chain.
     *
     * @param targetClass A class which the ancestor is an instanceof.
     * @return The matching ancestor, or null.
     */
    @Override
    final public JLPCActor getAncestor(final Class targetClass) {
        JLPCActor p = parent;
        while (p != null) {
            if (targetClass.isInstance(p))
                return p;
            p =  p.parent;
        }
        return null;
    }

    /**
     * Returns the exception handler.
     *
     * @return The exception handler.
     */
    @Override
    final public ExceptionHandler getExceptionHandler() {
        return mailbox.getExceptionHandler();
    }

    /**
     * Assign an exception handler.
     *
     * @param exceptionHandler The exception handler.
     */
    @Override
    final public void setExceptionHandler(
            final ExceptionHandler exceptionHandler) {
        mailbox.setExceptionHandler(exceptionHandler);
    }

    /**
     * A notification that there are incoming requests and responses that are ready for processing.
     */
    @Override
    final public void haveEvents() {
        mailbox.dispatchEvents();
    }

    /**
     * Returns the actor's mailbox.
     *
     * @return The actor's mailbox.
     */
    @Override
    final public Mailbox getMailbox() {
        return mailbox;
    }

    /**
     * Enqueues the response in the responder's outbox.
     *
     * @param eventQueue   The responder's outbox.
     * @param japcResponse The wrapped response to be enqueued.
     */
    @Override
    final public void responseFrom(
            final BufferedEventsQueue<JAMessage> eventQueue,
            final JAResponse japcResponse) {
        eventQueue.send(mailbox, japcResponse);
    }

    /**
     * Sends a request to a mailbox.
     *
     * @param destination The mailbox which is to receive the request.
     * @param japcRequest The wrapped request to be sent.
     */
    @Override
    final public void send(
            final BufferedEventsDestination<JAMessage> destination,
            final JARequest japcRequest) {
        mailbox.send(destination, japcRequest);
    }

    /**
     * Set the initial capacity for buffered outgoing messages.
     *
     * @param initialBufferCapacity The initial capacity for buffered outgoing messages.
     */
    @Override
    final public void setInitialBufferCapacity(final int initialBufferCapacity) {
        mailbox.setInitialBufferCapacity(initialBufferCapacity);
    }

    /**
     * Wraps and enqueues an unwrapped request in the requester's inbox.
     *
     * @param apcRequestSource The originator of the request.
     * @param request          The request to be sent.
     * @param rp               The request processor.
     * @throws Exception Any uncaught exceptions raised while processing the request.
     */
    @Override
    final public void acceptRequest(final APCRequestSource apcRequestSource,
            final Request request, final RP rp) throws Exception {
        final RequestSource rs = (RequestSource) apcRequestSource;
        final Mailbox sourceMailbox = rs.getMailbox();
        if (sourceMailbox == mailbox) {
            syncSend(rs, request, rp);
            return;
        }
        if (sourceMailbox == null) {
            asyncSend(rs, request, rp);
            return;
        }
        acceptOtherRequest(sourceMailbox, rs, request, rp);
    }

    private void acceptOtherRequest(final Mailbox sourceMailbox,
            final RequestSource rs, final Request request, final RP rp)
            throws Exception {
        final EventQueue<List<JAMessage>> eventQueue = mailbox
                .getEventQueue();
        final EventQueue<List<JAMessage>> srcController = sourceMailbox
                .getEventQueue().getController();
        if (eventQueue.getController() == srcController) {
            syncSend(rs, request, rp);
            return;
        }
        if (!eventQueue.acquireControl(srcController)) {
            asyncSend(rs, request, rp);
            return;
        }
        try {
            syncSend(rs, request, rp);
        } finally {
            mailbox.dispatchEvents();
            mailbox.sendPendingMessages();
            eventQueue.relinquishControl();
        }
    }

    private void asyncSend(final RequestSource rs, final Request request,
            final RP rp) throws Exception {
        final AsyncRequest asyncRequest = new AsyncRequest(rs, this, request,
                rp, mailbox);
        rs.send(mailbox, asyncRequest);
    }

    private void syncSend(final RequestSource rs, final Request request,
            final RP rp) throws Exception {
        final SyncRequest syncRequest = new SyncRequest(rs, JLPCActor.this,
                request, rp, mailbox);
        mailbox.setCurrentRequest(syncRequest);
        try {
            setExceptionHandler(null);
            request.processRequest(this, syncRequest);
            if (!syncRequest.sync) {
                syncRequest.async = true;
                syncRequest.restoreSourceMailbox();
            }
        } catch (final TransparentException tx) {
            final Throwable cause = tx.getCause();
            if (cause instanceof Exception) {
                throw (Exception) cause;
            }
            throw (Error) cause;
        } catch (final Throwable ex) {
            if (!syncRequest.sync)
                syncRequest.restoreSourceMailbox();
            if (ex instanceof Exception) {
                throw (Exception) ex;
            }
            throw (Error) ex;
        }
    }

    /**
     * Wraps and enqueues an unwrapped request in the requester's inbox.
     *
     * @param apcRequestSource The originator of the request.
     * @param request          The request to be sent.
     * @throws Exception Any uncaught exceptions raised while processing the request.
     */
    @Override
    final public void acceptEvent(final APCRequestSource apcRequestSource,
            final Request request) throws Exception {
        final RequestSource rs = (RequestSource) apcRequestSource;
        final ExceptionHandler sourceExceptionHandler = rs
                .getExceptionHandler();
        final Mailbox sourceMailbox = rs.getMailbox();
        if (sourceMailbox == mailbox) {
            syncSendEvent(rs, request, sourceExceptionHandler);
            return;
        }
        if (sourceMailbox == null) {
            asyncSendEvent(rs, request);
            return;
        }
        final EventQueue<List<JAMessage>> eventQueue = mailbox
                .getEventQueue();
        final EventQueue<List<JAMessage>> srcController = sourceMailbox
                .getEventQueue().getController();
        if (eventQueue.getController() == srcController) {
            syncSendEvent(rs, request, sourceExceptionHandler);
            return;
        }
        if (!eventQueue.acquireControl(srcController)) {
            asyncSendEvent(rs, request);
            return;
        }
        try {
            syncSendEvent(rs, request, sourceExceptionHandler);
        } finally {
            mailbox.dispatchEvents();
            mailbox.sendPendingMessages();
            eventQueue.relinquishControl();
        }
    }

    /**
     * Process a request asynchronously.
     *
     * @param rs      The source of the request.
     * @param request The request.
     */
    private void asyncSendEvent(final RequestSource rs, final Request request) {
        final JAEventRequest jaRequest = new JAEventRequest(rs, this, request,
                mailbox);
        rs.send(mailbox, jaRequest);
    }

    /**
     * Process a request from another mailbox synchronously.
     *
     * @param rs                     The source of the request.
     * @param request                The request.
     * @param sourceExceptionHandler Exception handler of the source actor.
     */
    private void syncSendEvent(final RequestSource rs, final Request request,
            final ExceptionHandler sourceExceptionHandler) {
        final Mailbox oldSourceMailbox = rs.getMailbox();
        final JARequest oldSourceRequest = oldSourceMailbox.getCurrentRequest();
        final JAEventRequest jaRequest = new JAEventRequest(rs, this, request,
                mailbox);
        mailbox.setCurrentRequest(jaRequest);
        try {
            setExceptionHandler(null);
            request.processRequest(this, JANoResponse.nrp);
        } catch (final Throwable ex) {
            final ExceptionHandler eh = getExceptionHandler();
            if (eh != null)
                try {
                    eh.process(ex);
                    return;
                } catch (final Throwable x) {
                    getMailboxFactory().eventException(request, x);
                }
            else {
                getMailboxFactory().eventException(request, ex);
            }
        }
        oldSourceMailbox.setCurrentRequest(oldSourceRequest);
        oldSourceMailbox.setExceptionHandler(sourceExceptionHandler);
    }

    /**
     * Send a request to another actor.
     *
     * @param actor   The target actor.
     * @param request The request.
     * @param rp      The response processor.
     * @throws Exception Any uncaught exceptions raised while processing the request.
     */
    final protected void send(final Actor actor, final Request request,
            final RP rp) throws Exception {
        actor.acceptRequest(this, request, rp);
    }

    /**
     * Send a request to another actor and discard any response.
     *
     * @param actor   The target actor.
     * @param request The request.
     */
    final protected void sendEvent(final Actor actor, final Request request) {
        try {
            send(actor, request, JANoResponse.nrp);
        } catch (final Throwable ex) {
            throw new UnsupportedOperationException("Unexpected exception", ex);
        }
    }

    /**
     * Creates a _SMBuilder.
     */
    final public class SMBuilder extends _SMBuilder {
        @Override
        final public void send(final Actor actor, final Request request,
                final RP rp) throws Exception {
            JLPCActor.this.send(actor, request, rp);
        }
    }

    /**
     * Returns the mailbox factory.
     *
     * @return The mailbox factory.
     */
    final public MailboxFactory getMailboxFactory() {
        return mailbox.getMailboxFactory();
    }
}

final class SyncRequest extends JARequest {
    /**
     * Set true when a response is received synchronously.
     */
    public boolean sync;

    /**
     * Set true when a response is received asynchronously.
     */
    public boolean async;

    public SyncRequest(final RequestSource requestSource,
            final JLPCActor destinationActor, final Request unwrappedRequest,
            final RP rp, final Mailbox mailbox) {
        super(requestSource, destinationActor, unwrappedRequest, rp, mailbox);
    }

    @Override
    public void processResponse(final Object response) throws Exception {
        if (!async) {
            sync = true;
            if (!isActive()) {
                return;
            }
            inactive();
            final JARequest oldCurrent = mailbox.getCurrentRequest();
            final ExceptionHandler oldExceptionHandler = mailbox
                    .getExceptionHandler();
            restoreSourceMailbox();
            if (response instanceof Throwable) {
                mailbox.setCurrentRequest(oldCurrent);
                mailbox.setExceptionHandler(oldExceptionHandler);
                if (response instanceof Exception) {
                    throw (Exception) response;
                }
                throw (Error) response;
            }
            try {
                rp.processResponse(response);
            } catch (final Throwable e) {
                throw new TransparentException(e);
            }
            mailbox.setCurrentRequest(oldCurrent);
            mailbox.setExceptionHandler(oldExceptionHandler);
        } else {
            if (response instanceof Throwable) {
                Throwable ex = (Throwable) response;
                final ExceptionHandler exceptionHandler = mailbox
                        .getExceptionHandler();
                if (exceptionHandler != null) {
                    try {
                        exceptionHandler.process(ex);
                        return;
                    } catch (final Throwable x) {
                        ex = x;
                    }
                    mailbox.response(this, ex);
                }
            }
            mailbox.response(this, response);
            return;
        }
        reset();
    }
}

final class AsyncRequest extends JARequest {
    public AsyncRequest(final RequestSource requestSource,
            final JLPCActor destinationActor, final Request unwrappedRequest,
            final RP rp, final Mailbox mailbox) {
        super(requestSource, destinationActor, unwrappedRequest, rp, mailbox);
    }

    @Override
    public void processResponse(final Object response) throws Exception {
        if (!isActive()) {
            return;
        }
        final JARequest old = mailbox.getCurrentRequest();
        mailbox.setCurrentRequest(this);
        mailbox.response(this, response);
        mailbox.setCurrentRequest(old);
    }
}

final class JAEventRequest extends JARequest {
    public JAEventRequest(final RequestSource requestSource,
            final JLPCActor destinationActor, final Request unwrappedRequest,
            final Mailbox mailbox) {
        super(requestSource, destinationActor, unwrappedRequest, null, mailbox);
    }

    @Override
    public void processResponse(final Object response) throws Exception {
        reset();
    }

    /**
     * Returns true when no response is expected.
     *
     * @return True.
     */
    @Override
    public boolean isEvent() {
        return true;
    }
}
TOP

Related Classes of org.agilewiki.jactor.lpc.JAEventRequest

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.