Package org.apache.axis2.client

Source Code of org.apache.axis2.client.InOutMEPClient$SyncCallBack

/*
* Copyright 2004,2005 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*      http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.axis2.client;

import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.client.async.AsyncResult;
import org.apache.axis2.client.async.Callback;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.OperationContext;
import org.apache.axis2.context.ServiceContext;
import org.apache.axis2.description.AxisOperation;
import org.apache.axis2.description.TransportInDescription;
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.axis2.engine.AxisEngine;
import org.apache.axis2.i18n.Messages;
import org.apache.axis2.om.OMException;
import org.apache.axis2.soap.SOAPBody;
import org.apache.axis2.soap.SOAPEnvelope;
import org.apache.axis2.soap.SOAPFault;
import org.apache.axis2.transport.TransportUtils;
import org.apache.axis2.util.CallbackReceiver;
import org.apache.axis2.util.UUIDGenerator;
import org.apache.wsdl.WSDLConstants;

import javax.xml.namespace.QName;

/**
* This class captures the handling of In-Out type method invocations for both blocking
* and non-blocking calls. The basic API is based on MessageContext and
* provides more convenient APIs.
*/
public class InOutMEPClient extends MEPClient {


    /**
     * This is used for the receiving the asynchronous messages.
     */
    protected CallbackReceiver callbackReceiver;

    /**
     * Constructs a InOutMEPClient from a ServiceContext.
     * Ideally this should be generated from a WSDL, we do not have it yet.
     * <p/>
     * Following code works for the time being. <p/>
     * <blockquote><pre>
     * ConfigurationContextFactory efac = new ConfigurationContextFactory();
     * // Replace the null with your client repository if any
     * ConfigurationContext sysContext = efac.buildClientConfigurationContext(null);
     * // above line "null" may be a file name if you know the client repssitory
     * <p/>
     * //create new service
     * QName assumedServiceName = new QName("Your Service");
     * AxisService axisService = new AxisService(assumedServiceName);
     * sysContext.getEngineConfig().addService(axisService);
     * ServiceContext service = sysContext.createServiceContext(assumedServiceName);
     * return service;
     * </pre></blockquote>
     * </code>
     *
     * @param serviceContext
     */

    public InOutMEPClient(ServiceContext serviceContext) {
        super(serviceContext, WSDLConstants.MEP_URI_OUT_IN);
        //service context has the engine context set in to it !
        callbackReceiver = new CallbackReceiver();
    }


    /**
     * This method is used to make blocking calls. This is independent of the transport.
     * For e.g. invocation done with this method might
     * <ol>
     * <li>send request via http and receive the response at the same http connection.</li>
     * <li>send request via http and receive the response at a different http connection.</li>
     * <li>send request via an email smtp and receive the response via an email.</li>
     * </ol>
     */

    public MessageContext invokeBlocking(AxisOperation axisop,
                                         final MessageContext msgctx)
            throws AxisFault {

        // The message ID is sent all the time
        String messageID = String.valueOf("uuid:" + UUIDGenerator.getUUID());
        msgctx.setMessageID(messageID);
        //
        if (clientOptions.isUseSeparateListener()) {

            //This mean doing a Request-Response invocation using two channel. If the
            //transport is two way transport (e.g. http) Only one channel is used (e.g. in http cases
            //202 OK is sent to say no repsone avalible). Axis2 get blocked return when the response is avalible.

            SyncCallBack callback = new SyncCallBack();
            //this method call two channel non blocking method to do the work and wait on the callbck
            invokeNonBlocking(axisop, msgctx, callback);
            long index = clientOptions.getTimeOutInMilliSeconds() / 100;
            while (!callback.isComplete()) {
                //wait till the reponse arrives
                if (index-- >= 0) {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        throw new AxisFault(e);
                    }
                } else {
                    throw new AxisFault(Messages.getMessage("responseTimeOut"));
                }
            }
            //process the resule of the invocation
            if (callback.envelope != null) {
                MessageContext resMsgctx =
                        new MessageContext(serviceContext.getConfigurationContext());
                resMsgctx.setEnvelope(callback.envelope);
                return resMsgctx;
            } else {
                if (callback.error instanceof AxisFault) {
                    throw (AxisFault) callback.error;
                } else {
                    throw new AxisFault(callback.error);
                }
            }
        } else {
            msgctx.setServiceContext(serviceContext);
            prepareInvocation(axisop, msgctx);
            //This is the Usual Request-Response Sync implemetation
            ConfigurationContext syscontext = serviceContext.getConfigurationContext();
            msgctx.setConfigurationContext(syscontext);

            checkTransport(msgctx);

            OperationContext operationContext = new OperationContext(axisop, serviceContext);
            axisop.registerOperationContext(msgctx, operationContext);
            operationContext.setProperties(clientOptions.getProperties());

            //Send the SOAP Message and receive a response
            MessageContext response =
                    send(msgctx, clientOptions.getListenerTransport());

            //check for a fault and return the result
            SOAPEnvelope resenvelope = response.getEnvelope();
            if (resenvelope.getBody().hasFault()) {
                SOAPFault soapFault = resenvelope.getBody().getFault();
                Exception ex = soapFault.getException();

                if (clientOptions.isExceptionToBeThrownOnSOAPFault()) {
                    //does the SOAPFault has a detail element for Excpetion
                    if (ex != null) {
                        throw new AxisFault(ex);
                    } else {
                        //if detail element not present create a new Exception from the detail
                        String message = "";
                        message = message + "Code =" + soapFault.getCode() == null ? "" :
                                soapFault.getCode().getValue() == null ? "" : soapFault.getCode().getValue().getText();
                        message = message + "Reason =" + soapFault.getReason() == null ? "" :
                                soapFault.getReason().getSOAPText() == null ? "" : soapFault.getReason().getSOAPText().getText();
                        throw new AxisFault(message);
                    }
                }
            }
            return response;
        }
    }

    /**
     * This method is used to make non-blocking calls and is independent of the transport.
     * For e.g. invocation done with this method might
     * <ol>
     * <li>send request via http and receive the response at the same http connection.</li>
     * <li>send request via http and receive the response at a different http connection.</li>
     * <li>send request via an email smtp and receive the response via an email.</li>
     * </ol>
     */
    public void invokeNonBlocking(final AxisOperation axisop,
                                  final MessageContext msgctx,
                                  final Callback callback)
            throws AxisFault {
        prepareInvocation(axisop, msgctx);

        try {
            final ConfigurationContext syscontext =
                    serviceContext.getConfigurationContext();

            AxisEngine engine = new AxisEngine(syscontext);
            checkTransport(msgctx);
            //Use message id all the time!
            String messageID = String.valueOf("uuid:" + UUIDGenerator.getUUID());
            msgctx.setMessageID(messageID);
            ////
            if (clientOptions.isUseSeparateListener()) {
                //the invocation happen via a separate Channel, so we should set up the
                //information need to correlated the response message and invoke the call back

                axisop.setMessageReceiver(callbackReceiver);
                callbackReceiver.addCallback(messageID, callback);

                //set the replyto such that the response will arrive at the transport listener started
                // Note that this will only change the replyTo Address property in the replyTo EPR
                EndpointReference replyToFromTransport = ListenerManager.replyToEPR(
                        serviceContext.getConfigurationContext(),
                        serviceContext.getAxisService().getName().getLocalPart()
                                + "/"
                                + axisop.getName().getLocalPart(),
                        clientOptions.getListenerTransport().getName().getLocalPart());

                if (msgctx.getReplyTo() == null) {
                    msgctx.setReplyTo(replyToFromTransport);
                } else {
                    msgctx.getReplyTo().setAddress(replyToFromTransport.getAddress());
                }

                //create and set the Operation context
                msgctx.setOperationContext(axisop.findOperationContext(msgctx, serviceContext));
                msgctx.setServiceContext(serviceContext);
                msgctx.getOperationContext().setProperties(clientOptions.getProperties());

                //send the message
                engine.send(msgctx);
            } else {
                // here a bloking invocation happens in a new thread, so the
                // progamming model is non blocking
                OperationContext opcontxt = new OperationContext(axisop, serviceContext);
                msgctx.setOperationContext(opcontxt);
                msgctx.setServiceContext(serviceContext);
                opcontxt.setProperties(clientOptions.getProperties());
                serviceContext.getConfigurationContext().getThreadPool().execute(new NonBlockingInvocationWorker(callback, axisop, msgctx));
            }

        } catch (OMException e) {
            throw new AxisFault(e.getMessage(), e);
        } catch (Exception e) {
            throw new AxisFault(e.getMessage(), e);
        }

    }



    protected void configureTransportInformation(MessageContext msgCtx) throws AxisFault {
        AxisConfiguration axisConfig = this.serviceContext.getConfigurationContext().getAxisConfiguration();
        String listenerTransportProtocol = clientOptions.getListenerTransportProtocol();
        if (axisConfig != null) {
            if (listenerTransportProtocol != null && !"".equals(listenerTransportProtocol)) {
                TransportInDescription transportIn = axisConfig.getTransportIn(new QName(listenerTransportProtocol));
                if (transportIn == null) {
                    throw new AxisFault(Messages.getMessage("unknownTransport", listenerTransportProtocol));
                }
                clientOptions.setListenerTransport(transportIn);
            }

            inferTransportOutDescription(msgCtx);
        }

        if (clientOptions.isUseSeparateListener()) {
            //if separate transport is used, start the required listeners
            if (!serviceContext
                    .getConfigurationContext()
                    .getAxisConfiguration()
                    .isEngaged(new QName(Constants.MODULE_ADDRESSING))) {
                throw new AxisFault(Messages.getMessage("2channelNeedAddressing"));
            }
            ListenerManager.makeSureStarted(clientOptions.getListenerTransportProtocol(),
                    serviceContext.getConfigurationContext());
        }
    }


    /**
     * Checks if the transports are identified correctly.
     *
     * @param msgctx
     * @throws AxisFault
     */
    private void checkTransport(MessageContext msgctx) throws AxisFault {
        if (clientOptions.getSenderTransport() == null) {
            clientOptions.setSenderTransport(inferTransport(msgctx.getTo()));
        }
        if (clientOptions.getListenerTransport() == null) {
            clientOptions.setListenerTransport(serviceContext.getConfigurationContext()
                    .getAxisConfiguration()
                    .getTransportIn(clientOptions.getSenderTransport().getName()));
        }

        if (msgctx.getTransportIn() == null) {
            msgctx.setTransportIn(clientOptions.getListenerTransport());
        }
        if (msgctx.getTransportOut() == null) {
            msgctx.setTransportOut(clientOptions.getSenderTransport());
        }

    }

    /**
     * This class acts as a callback that allows users to wait on the result.
     */
    public class SyncCallBack extends Callback {
        private SOAPEnvelope envelope;
        private Exception error;

        public void onComplete(AsyncResult result) {
            this.envelope = result.getResponseEnvelope();
        }

        public void reportError(Exception e) {
            error = e;
        }
    }

    /**
     * Closes the call initiated to the Transport Listeners. If there are multiple
     * requests sent, the call should be closed only when all are are done.
     */
    public void close() throws AxisFault {
        ListenerManager.stop(serviceContext.getConfigurationContext(), clientOptions.getListenerTransport().getName().getLocalPart());
    }

    /**
     * This class is the workhorse for a non-blocking invocation that uses a
     * two way transport.
     */
    private class NonBlockingInvocationWorker implements Runnable {

        private Callback callback;
        private AxisOperation axisop;
        private MessageContext msgctx;

        public NonBlockingInvocationWorker(Callback callback,
                                           AxisOperation axisop,
                                           MessageContext msgctx) {
            this.callback = callback;
            this.axisop = axisop;
            this.msgctx = msgctx;
        }

        public void run() {
            try {
                //send the request and wait for reponse
                MessageContext response =
                        send(msgctx, clientOptions.getListenerTransport());
                //call the callback                       
                SOAPEnvelope resenvelope = response.getEnvelope();
                SOAPBody body = resenvelope.getBody();
                if (body.hasFault()) {
                    Exception ex = body.getFault().getException();
                    if (ex != null) {
                        callback.reportError(ex);
                    } else {
                        //todo this needs to be fixed
                        callback.reportError(new Exception(body.getFault().getReason().getText()));
                    }
                } else {
                    AsyncResult asyncResult = new AsyncResult(response);
                    callback.onComplete(asyncResult);
                }

                callback.setComplete(true);
            } catch (Exception e) {
                callback.reportError(e);
            }
        }
    }

    /**
     * Sends the message using a two way transport and waits for a response
     *
     * @param msgctx
     * @param transportIn
     * @return
     * @throws AxisFault
     */
    public MessageContext send(MessageContext msgctx,
                               TransportInDescription transportIn) throws AxisFault {

        AxisEngine engine = new AxisEngine(msgctx.getConfigurationContext());
        engine.send(msgctx);

        //create the response
        MessageContext response =
                new MessageContext(msgctx.getConfigurationContext(),
                        msgctx.getSessionContext(),
                        msgctx.getTransportIn(),
                        msgctx.getTransportOut());
        response.setProperty(MessageContext.TRANSPORT_IN,
                msgctx.getProperty(MessageContext.TRANSPORT_IN));
        msgctx.getAxisOperation().registerOperationContext(response, msgctx.getOperationContext());
        response.setServerSide(false);
        response.setServiceContext(msgctx.getServiceContext());
        response.setServiceGroupContext(msgctx.getServiceGroupContext());

        //If request is REST we assume the response is REST, so set the variable
        response.setDoingREST(msgctx.isDoingREST());

        SOAPEnvelope resenvelope = TransportUtils.createSOAPMessage(response, msgctx.getEnvelope().getNamespace().getName());

        if (resenvelope != null) {
            response.setEnvelope(resenvelope);
            engine = new AxisEngine(msgctx.getConfigurationContext());
            engine.receive(response);
        } else {
            throw new AxisFault(Messages.getMessage("blockingInvocationExpectsResponse"));
        }
        return response;
    }
}
TOP

Related Classes of org.apache.axis2.client.InOutMEPClient$SyncCallBack

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.