Package org.apache.axis2.clientapi

Source Code of org.apache.axis2.clientapi.InOutMEPClient$NonBlockingInvocationWorker

/*
* Copyright 2004,2005 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
*      http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*  Runtime state of the engine
*/
package org.apache.axis2.clientapi;

import org.apache.axis2.Constants;
import org.apache.axis2.addressing.EndpointReference;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.OperationContextFactory;
import org.apache.axis2.context.ServiceContext;
import org.apache.axis2.description.OperationDescription;
import org.apache.axis2.description.TransportInDescription;
import org.apache.axis2.description.TransportOutDescription;
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.axis2.engine.AxisEngine;
import org.apache.axis2.engine.AxisFault;
import org.apache.axis2.om.OMException;
import org.apache.axis2.soap.SOAPEnvelope;
import org.apache.axis2.transport.TransportListener;
import org.apache.axis2.util.threadpool.AxisWorker;
import org.apache.wsdl.WSDLConstants;

import javax.xml.namespace.QName;
import java.io.IOException;

/**
* This Class capture handling the In-Out type Method invocations. this provides the
* methods to do blocking and non blocking invocation. The basic API is based on the
* MessageContext and the more convients API is provided by the Call
*/
public class InOutMEPClient extends MEPClient {
    protected TransportListener listener;
    /**
     * transport that should be used for sending and reciving the message
     */
    protected TransportOutDescription senderTransport;
    protected TransportInDescription listenerTransport;

    /**
     * Should the two SOAPMessage are sent over same channel over seperate channels.
     * The value of this variable depends on the transport specified.
     * e.g. If the transports are different this is true by default.
     *      HTTP transport support both cases
     *      SMTP transport support only two channel case
     */
    protected boolean useSeparateListener = false;


    /**
     * The address the message should be send
     */
    protected EndpointReference to;

    //variables use for internal implementations

    /**
     * This is used for the Receiving the Async Messages
     */
    protected CallbackReceiver callbackReceiver;
    /**
     * This accepts a ServiceContext, and the ServiceContext should have all the parents set in to it right
     * Ideall this should be generated from a WSDL, we do not have it yet.
     *
     * Follwoing code works for the time been
     * <code>
     *  ConfigurationContextFactory efac = new ConfigurationContextFactory();
        ConfigurationContext sysContext = efac.buildClientConfigurationContext(null);
        // above line "null" may be a file name if you know the client repssitory
   
        //create new service
        QName assumedServiceName = new QName("Your Service");
        ServiceDescription axisService = new ServiceDescription(assumedServiceName);
        sysContext.getEngineConfig().addService(axisService);
        ServiceContext service = sysContext.createServiceContext(assumedServiceName);
        return service;
     *
     * </code>
     *
     * @param serviceContext
     */

    public InOutMEPClient(ServiceContext serviceContext) {
        super(serviceContext, WSDLConstants.MEP_URI_OUT_IN);
        //service context has the engine context set in to it !
        callbackReceiver = new CallbackReceiver();
    }

    //    this method is commented out, till we implemented it    
    //    public InOutMEPClient(String wsdlfile) {
    //        super(null, WSDLConstants.MEP_URI_OUT_IN);
    //        throw new UnsupportedOperationException();
    //    }

    public MessageContext invokeBlocking(OperationDescription axisop, final MessageContext msgctx)
        throws AxisFault {
        verifyInvocation(axisop,msgctx);
        if (useSeparateListener) {
            SyncCallBack callback = new SyncCallBack();
            invokeNonBlocking(axisop, msgctx, callback);
            int index = 0;
            while (!callback.isComplete()) {
                if (index < 20) {
                    try {
                        Thread.sleep(100);
                    } catch (InterruptedException e) {
                        throw new AxisFault(e);
                    }
                } else {
                    throw new AxisFault("Time out waiting for the response");
                }
            }
            if (callback.envelope != null) {
                MessageContext resMsgctx = new MessageContext(serviceContext.getEngineContext());
                resMsgctx.setEnvelope(callback.envelope);
                return resMsgctx;
            } else {
                if (callback.error instanceof AxisFault) {
                    throw (AxisFault) callback.error;
                } else {
                    throw new AxisFault(callback.error);
                }
            }
        } else {
            msgctx.setTo(to);
            msgctx.setSoapAction(soapAction);
            msgctx.setServiceContext(serviceContext);
            ConfigurationContext syscontext = serviceContext.getEngineContext();

            checkTransport(msgctx);

            ConfigurationContext sysContext = serviceContext.getEngineContext();
            AxisConfiguration registry = sysContext.getAxisConfiguration();

            msgctx.setOperationContext(
                OperationContextFactory.createMEPContext(
                    WSDLConstants.MEP_CONSTANT_IN_OUT,
                    axisop,
                    serviceContext));
            MessageContext response = TwoChannelBasedSender.send(msgctx, listenerTransport);

            SOAPEnvelope resenvelope = response.getEnvelope();

            if (resenvelope.getBody().hasFault()) {
                throw new AxisFault(resenvelope.getBody().getFault().getException());
            }
            return response;
        }
    }

    public void invokeNonBlocking(
        final OperationDescription axisop,
        final MessageContext msgctx,
        final Callback callback)
        throws AxisFault {
        verifyInvocation(axisop,msgctx);
        msgctx.setTo(to);
        try {
            final ConfigurationContext syscontext = serviceContext.getEngineContext();

            AxisEngine engine = new AxisEngine(syscontext);
            //TODO
            checkTransport(msgctx);
            msgctx.setSoapAction(soapAction);

            if (useSeparateListener) {
                String messageID = String.valueOf(System.currentTimeMillis());
                msgctx.setMessageID(messageID);
                axisop.setMessageReciever(callbackReceiver);
                callbackReceiver.addCallback(messageID, callback);
                msgctx.setReplyTo(
                    ListenerManager.replyToEPR(
                        serviceContext.getServiceConfig().getName().getLocalPart()
                            + "/"
                            + axisop.getName().getLocalPart(),
                        listenerTransport.getName().getLocalPart()));
                msgctx.setOperationContext(axisop.findOperationContext(msgctx, serviceContext));
                msgctx.setServiceContext(serviceContext);
                engine.send(msgctx);
            } else {
                serviceContext.getEngineContext().getThreadPool().addWorker(
                    new NonBlockingInvocationWorker(callback, axisop, msgctx));
            }

        } catch (OMException e) {
            throw AxisFault.makeFault(e);
        } catch (IOException e) {
            throw AxisFault.makeFault(e);
        }

    }

    /**
      * @param to
      */
    public void setTo(EndpointReference to) {
        this.to = to;
    }

    /**
     * Set transport information to the the Call, for find how the each parameter acts see the commant at the instance
     * variables. The senarios supoorted are as follows.
     * [senderTransport, listenerTransport, useSeparateListener]
     * http, http, true
     * http, http, false
     * http,smtp,true
     * smtp,http,true
     * smtp,smtp,true
     * 
     * @param senderTransport
     * @param listenerTransport
     * @param useSeparateListener
     * @throws AxisFault
     */

    public void setTransportInfo(
        String senderTransport,
        String listenerTransport,
        boolean useSeparateListener)
        throws AxisFault {

        if (!useSeparateListener) {
            boolean isTransportsEqual = senderTransport.equals(listenerTransport);
            boolean isATwoWaytransport = Constants.TRANSPORT_HTTP.equals(senderTransport)
                            || Constants.TRANSPORT_TCP.equals(senderTransport)
                            || Constants.TRANSPORT_COMMONS_HTTP.equals(senderTransport);
            boolean isCommonsAndHTTP =  Constants.TRANSPORT_COMMONS_HTTP.equals(senderTransport)
                    && Constants.TRANSPORT_HTTP.equals(listenerTransport);                        
            if(!isCommonsAndHTTP && (!isTransportsEqual || !isATwoWaytransport)){
                throw new AxisFault("useSeparateListener = false is only supports by the htpp/tcp and tcp commons transport set as the sender and receiver");
            }
        }else{
            this.useSeparateListener = useSeparateListener;

        }

        AxisConfiguration axisConfig = serviceContext.getEngineContext().getAxisConfiguration();
        this.listenerTransport = axisConfig.getTransportIn(new QName(listenerTransport));
        this.senderTransport = axisConfig.getTransportOut(new QName(senderTransport));
        if (this.senderTransport == null) {
            throw new AxisFault("Unknown transport " + senderTransport);
        }

        if (this.listenerTransport == null) {
            throw new AxisFault("Unknown transport " + listenerTransport);
        }

        if (useSeparateListener == true) {
            if (!serviceContext
                .getEngineContext()
                .getAxisConfiguration()
                .isEngaged(new QName(Constants.MODULE_ADDRESSING))) {
                throw new AxisFault("to do two Transport Channels the Addressing Modules must be engeged");
            }
            ListenerManager.makeSureStarted(listenerTransport, serviceContext.getEngineContext());
        }
    }

    private void checkTransport(MessageContext msgctx) throws AxisFault {
        if (senderTransport == null) {
            senderTransport = inferTransport(to);
        }
        if (listenerTransport == null) {
            listenerTransport =
                serviceContext.getEngineContext().getAxisConfiguration().getTransportIn(
                    senderTransport.getName());
        }

        if (msgctx.getTransportIn() == null) {
            msgctx.setTransportIn(listenerTransport);
        }
        if (msgctx.getTransportOut() == null) {
            msgctx.setTransportOut(senderTransport);
        }

    }

    public class SyncCallBack extends Callback {
        private SOAPEnvelope envelope;
        private Exception error;
        public void onComplete(AsyncResult result) {
            this.envelope = result.getResponseEnvelope();
        }
        public void reportError(Exception e) {
            error = e;
        }
        //        public boolean hasResult() {
        //            return envelope != null || error != null;
        //        }
    }

    public void engageModule(QName moduleName) throws AxisFault {
        serviceContext.getEngineContext().getAxisConfiguration().engageModule(moduleName);
    }

    public void close() throws AxisFault {
        //senderTransport.getSender().cleanUp();
        ListenerManager.stop(listenerTransport.getName().getLocalPart());
    }

    private class NonBlockingInvocationWorker implements AxisWorker {

        private Callback callback;
        private OperationDescription axisop;
        private MessageContext msgctx;

        public NonBlockingInvocationWorker(
            Callback callback,
            OperationDescription axisop,
            MessageContext msgctx) {
            this.callback = callback;
            this.axisop = axisop;
            this.msgctx = msgctx;
        }

        public void doWork() {
            try {
                msgctx.setOperationContext(
                    OperationContextFactory.createMEPContext(
                        WSDLConstants.MEP_CONSTANT_IN_OUT,
                        axisop,
                        serviceContext));
                msgctx.setServiceContext(serviceContext);
                MessageContext response = TwoChannelBasedSender.send(msgctx, listenerTransport);
                SOAPEnvelope resenvelope = response.getEnvelope();
                AsyncResult asyncResult = new AsyncResult(response);
                callback.onComplete(asyncResult);
                callback.setComplete(true);
            } catch (Exception e) {
                callback.reportError(e);
            }
        }
    }
  
}
TOP

Related Classes of org.apache.axis2.clientapi.InOutMEPClient$NonBlockingInvocationWorker

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.