/*
* Copyright (c) 2009, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.wso2.carbon.bpel.b4p.extension;
import org.apache.axiom.om.OMAbstractFactory;
import org.apache.axiom.soap.SOAPFactory;
import org.apache.axiom.soap.SOAPBody;
import org.apache.axis2.AxisFault;
import org.apache.axis2.client.OperationClient;
import org.apache.axis2.client.Options;
import org.apache.axis2.client.ServiceClient;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.ServiceContext;
import org.apache.axis2.context.ServiceGroupContext;
import org.apache.axis2.description.AxisOperation;
import org.apache.axis2.description.AxisService;
import org.apache.axis2.description.AxisServiceGroup;
import org.apache.axis2.description.TransportOutDescription;
import org.apache.axis2.engine.AxisConfiguration;
import org.apache.axis2.transport.http.HTTPConstants;
import org.apache.axis2.wsdl.WSDLConstants;
import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.neethi.Policy;
import org.apache.neethi.PolicyEngine;
import org.apache.ode.axis2.ODEService;
import org.apache.ode.bpel.iapi.*;
import org.apache.ode.bpel.iapi.Message;
import org.apache.ode.bpel.common.FaultException;
import org.apache.ode.bpel.epr.EndpointFactory;
import org.apache.ode.bpel.epr.MutableEndpoint;
import org.apache.ode.bpel.epr.WSAEndpoint;
import org.apache.ode.utils.DOMUtils;
import org.apache.ode.utils.Namespaces;
import org.apache.ode.utils.stl.CollectionsX;
import org.apache.ode.utils.uuid.UUID;
import org.apache.ode.store.DeploymentUnitDir;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.wso2.carbon.bpel.b4p.extension.B4PExtensionOperation;
import org.wso2.carbon.bpel.b4p.utils.*;
import javax.wsdl.*;
import javax.wsdl.extensions.ExtensibilityElement;
import javax.wsdl.extensions.http.HTTPBinding;
import javax.wsdl.extensions.soap.SOAPBinding;
import javax.wsdl.extensions.soap.SOAPOperation;
import javax.wsdl.extensions.soap12.SOAP12Binding;
import javax.xml.namespace.QName;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.util.Collection;
import java.util.ArrayList;
public class HumanService implements PartnerRoleChannel {
private static final Log log = LogFactory.getLog(HumanService.class);
private static final int EXPIRE_SERVICE_CLIENT = 3000;
private Definition wsdlDefinition;
private QName serviceName;
private String portName;
private ConfigurationContext clientConfigCtx;
private WSAEndpoint endpointReference;
private URL endpointUrl;
private String mexEndPointUrl;
private String operationName;
private Operation operation;
private Element inputMessage;
private AxisConfiguration axisConfig;
private SOAPHelper soapHelper;
private Binding binding;
private DeploymentUnitDir du;
private boolean isTwoWay;
public HumanService(PeopleActivity peopleActivity, ConfigurationContext clientConfigCtx,
MultiThreadedHttpConnectionManager connManager) throws AxisFault {
this.wsdlDefinition = peopleActivity.getHiWSDL();
this.serviceName = peopleActivity.getServiceName();
this.portName = peopleActivity.getServicePort();
this.operationName = peopleActivity.getOperationName();
this.operation = peopleActivity.getOperation();
this.isTwoWay = peopleActivity.getActivityType().equals(InteractionType.TASK);
this.mexEndPointUrl = peopleActivity.getEPRURL();
try {
this.inputMessage = peopleActivity.getInputMessage();
} catch (FaultException e) {
throw new AxisFault(e.getMessage(), e);
}
this.clientConfigCtx = clientConfigCtx;
du = peopleActivity.getDu();
this.axisConfig = clientConfigCtx.getAxisConfiguration();
this.soapHelper = createSOAPHelper();
this.clientConfigCtx.setProperty(HTTPConstants.MULTITHREAD_HTTP_CONNECTION_MANAGER, connManager);
this.clientConfigCtx.setProperty(HTTPConstants.REUSE_HTTP_CLIENT, "false");
Element eprEle = HumanServiceUtil.genEPRfromWSDL(this.wsdlDefinition, this.serviceName, this.portName);
if (eprEle == null)
throw new IllegalArgumentException("Service Port definition not found for service:" + this.serviceName +
" and port:" + this.portName);
this.endpointReference = EndpointFactory.convertToWSA(HumanServiceUtil.createServiceRef(eprEle));
try {
endpointUrl = new URL(endpointReference.getUrl());
} catch (MalformedURLException e) {
throw new AxisFault("Malformed URL.");
}
}
//public Message invoke() {
public String invoke() {
try {
// Override options are passed to the axis MessageContext so we can
// retrieve them in our session out changeHandler.
final MessageContext mctx = new MessageContext();
OperationClient opClient = getOperationClient(isTwoWay, mctx);
try {
/* make the given options the parent so it becomes the defaults of the MessageContexgt. That allows the user to override
* specific options on a given message context and not affect the overall options.
*/
mctx.getOptions().setParent(opClient.getOptions());
String action = getAction(operationName);
mctx.setSoapAction(action);
// populateWSAddressingOptions(mctx, partnerRoleMessageExchange);
B4PMessage msg = new B4PMessage();
msg.setMessage(inputMessage);
org.apache.axis2.addressing.EndpointReference axisEPR;
soapHelper.createSoapRequest(mctx,
msg,
operation);
String mexEndpointUrl = this.mexEndPointUrl;
axisEPR = new org.apache.axis2.addressing.EndpointReference(mexEndpointUrl);
opClient.addMessageContext(mctx);
// this Options can be alter without impacting the ServiceClient options (which is a requirement)
Options operationOptions = opClient.getOptions();
operationOptions.setAction(mctx.getSoapAction());
operationOptions.setTo(axisEPR);
opClient.execute(true);
if (isTwoWay) {
MessageContext response = opClient.getMessageContext(WSDLConstants.MESSAGE_LABEL_IN_VALUE);
MessageContext flt = opClient.getMessageContext(WSDLConstants.MESSAGE_LABEL_FAULT_VALUE);
if (response != null && log.isDebugEnabled())
log.debug("Service feedback response:\n" + response.getEnvelope().toString());
if (flt != null) {
//TODO Handle error
} else {
//B4PMessage outPut = new B4PMessage();
//SOAPBody sbody = response.getEnvelope().getBody();
return SOAPHelper.parseResponseFeedback(response.getEnvelope().getBody());
//sbody.getChildrenWithLocalName()
//soapHelper.parseSoapResponse(outPut, response.getEnvelope(), operation);
}
} else { /* one-way case */
if (log.isDebugEnabled()) {
log.debug("Notification Sent to: " + this.mexEndPointUrl);
}
//TODO NOTIFICATION DONE
}
} finally {
// make sure the HTTP connection is released to the pool!
TransportOutDescription out = mctx.getTransportOut();
if (out != null && out.getSender() != null) {
out.getSender().cleanup(mctx);
}
}
} catch (Throwable t) {
log.error(t.getMessage(), t);
//TODO handle error
}
return null;
}
public String getPortName() {
return portName;
}
public QName getServiceName() {
return serviceName;
}
public EndpointReference getInitialEndpointReference() {
return endpointReference;
}
public void close() {
}
private SOAPHelper createSOAPHelper() throws AxisFault {
SOAPFactory soapFactory;
boolean isRPC = false;
Service serviceDef = wsdlDefinition.getService(serviceName);
if (serviceDef == null) {
throw new AxisFault(Messages.msgServiceDefinitionNotFound(serviceName.getLocalPart()));
}
Port port = serviceDef.getPort(portName);
if (port == null) {
throw new AxisFault(Messages.msgServicePortNotFound(serviceName.getLocalPart(), portName));
}
binding = port.getBinding();
if (binding == null)
throw new AxisFault(Messages.msgBindingNotFound(serviceName.getLocalPart(), portName));
ExtensibilityElement bindingType = HumanServiceUtil.getBindingExtension(binding);
if (!(bindingType instanceof SOAPBinding || bindingType instanceof SOAP12Binding || bindingType instanceof HTTPBinding)) {
throw new AxisFault(Messages.msgBindingNotSupported(serviceName.getLocalPart(), portName));
}
if (bindingType instanceof SOAPBinding) {
soapFactory = OMAbstractFactory.getSOAP11Factory();
} else {
soapFactory = OMAbstractFactory.getSOAP12Factory();
}
if (bindingType instanceof SOAPBinding) {
isRPC = ((SOAPBinding) bindingType).getStyle() != null &&
((SOAPBinding) bindingType).getStyle().equals("rpc");
} else if (bindingType instanceof SOAP12Binding) {
isRPC = ((SOAP12Binding) bindingType).getStyle() != null &&
((SOAP12Binding) bindingType).getStyle().equals("rpc");
}
return new SOAPHelper(wsdlDefinition, binding, serviceName.getLocalPart(), portName, soapFactory, isRPC);
}
private void replyWithFailure(final PartnerRoleMessageExchange odeMex, final MessageExchange.FailureType error, final String errmsg) {
try {
odeMex.replyWithFailure(error, errmsg, null);
} catch (Exception e) {
String emsg = "Error executing replyWithFailure; reply will be lost.";
log.error(emsg, e);
}
}
private void reply(final Operation operation, final MessageContext reply, final boolean isFault) {
try {
if (log.isDebugEnabled()) log.debug("Received response");
if (isFault) {
Document odeMsg = DOMUtils.newDocument();
Element odeMsgEl = odeMsg.createElementNS(null, "message");
odeMsg.appendChild(odeMsgEl);
Fault fault = soapHelper.parseSoapFault(odeMsgEl, reply.getEnvelope(), operation);
if (fault != null) {
if (log.isWarnEnabled())
log.warn("Fault response: faultName=" + fault.getName() + " faultType=" + fault.getMessage().getQName() + "\n" + DOMUtils.domToString(odeMsgEl));
QName faultType = fault.getMessage().getQName();
QName faultName = new QName(wsdlDefinition.getTargetNamespace(), fault.getName());
} else {
if (log.isWarnEnabled())
log.warn("Fault response: faultType=(unkown)\n" + reply.getEnvelope().toString());
}
} else {
}
} catch (Exception ex) {
String errmsg = "Unable to process response: " + ex.getMessage();
log.error(errmsg, ex);
}
}
/**
* Extracts endpoint information from ODE message exchange to stuff them into Axis MessageContext.
*/
private void populateWSAddressingOptions(MessageContext ctxt, PartnerRoleMessageExchange odeMex) {
Options options = ctxt.getOptions();
WSAEndpoint targetWSAEPR = EndpointFactory.convertToWSA((MutableEndpoint) odeMex.getEndpointReference());
WSAEndpoint myRoleWSAEPR = EndpointFactory.convertToWSA((MutableEndpoint) odeMex.getMyRoleEndpointReference());
WSAEndpoint targetEPR = new WSAEndpoint(targetWSAEPR);
String partnerSessionId = odeMex.getProperty(MessageExchange.PROPERTY_SEP_PARTNERROLE_SESSIONID);
String myRoleSessionId = odeMex.getProperty(MessageExchange.PROPERTY_SEP_MYROLE_SESSIONID);
if (partnerSessionId != null) {
if (log.isDebugEnabled()) {
log.debug("Partner session identifier found for WSA endpoint: " + partnerSessionId);
}
targetEPR.setSessionId(partnerSessionId);
}
options.setProperty(HumanServiceUtil.TARGET_SESSION_ENDPOINT, targetEPR);
if (myRoleWSAEPR != null) {
WSAEndpoint myRoleEPR = new WSAEndpoint(myRoleWSAEPR);
if (myRoleSessionId != null) {
if (log.isDebugEnabled()) {
log.debug("MyRole session identifier found for myrole (callback) WSA endpoint: " + myRoleSessionId);
}
myRoleEPR.setSessionId(myRoleSessionId);
}
options.setProperty(HumanServiceUtil.CALLBACK_SESSION_ENDPOINT, myRoleEPR);
} else {
if (log.isDebugEnabled()) {
log.debug("My-Role EPR not specified, SEP will not be used.");
}
}
String action = getAction(odeMex.getOperationName());
ctxt.setSoapAction(action);
if (MessageExchange.MessageExchangePattern.REQUEST_RESPONSE == odeMex.getMessageExchangePattern()) {
org.apache.axis2.addressing.EndpointReference annonEpr =
new org.apache.axis2.addressing.EndpointReference(Namespaces.WS_ADDRESSING_ANON_URI);
ctxt.setReplyTo(annonEpr);
ctxt.setMessageID("uuid:" + new UUID().toString());
}
}
private OperationClient getOperationClient(boolean isTwoWay, MessageContext msgCtx) throws AxisFault {
AxisService anonymousService = AnonymousServiceFactory.getAnonymousService(serviceName, portName, axisConfig);
anonymousService.getParent().addParameter(org.wso2.carbon.bpel.b4p.utils.Constants.HIDDEN_SERVICE_PARAM, "true");
ServiceGroupContext sgc = new ServiceGroupContext(
clientConfigCtx, (AxisServiceGroup) anonymousService.getParent());
ServiceContext serviceCtx = sgc.getServiceContext(anonymousService);
// get a reference to the DYNAMIC operation of the Anonymous Axis2 service
AxisOperation axisAnonymousOperation = anonymousService.getOperation(
isTwoWay ? ServiceClient.ANON_OUT_IN_OP : ServiceClient.ANON_OUT_ONLY_OP);
Options clientOptions = HumanServiceUtil.cloneOptions(msgCtx.getOptions());
clientOptions.setExceptionToBeThrownOnSOAPFault(false);
/* This value doesn't overrideend point config. */
clientOptions.setTimeOutInMilliSeconds(60000);
OperationClient mepClient = axisAnonymousOperation.createClient(serviceCtx, clientOptions);
return mepClient;
}
private Policy loadPolicy(String fileName, String basePath) {
Policy policyDoc = null;
FileLoadingUtil fileLoader = new FileLoadingUtil(basePath);
if (log.isDebugEnabled()) log.debug("Applying security policy: " + fileName);
try {
InputStream policyStream = fileLoader.load(fileName);
try {
policyDoc = PolicyEngine.getPolicy(policyStream);
} finally {
policyStream.close();
}
} catch (IOException e) {
String errMsg = "Exception while parsing policy: " + fileName;
log.error(errMsg, e);
throw new IllegalArgumentException("Exception while parsing policy: " + fileName, e);
} catch (FileLoadingUtilException e) {
String errMsg = "File Loading Exception";
log.error(errMsg, e);
throw new IllegalArgumentException(errMsg, e);
}
return policyDoc;
}
/**
* Extracts the action to be used for the given operation. It first checks to see
* if a value is specified using WS-Addressing in the portType, it then falls back onto
* getting it from the SOAP Binding.
*
* @param operation the name of the operation to get the Action for
* @return The action value for the specified operation
*/
private String getAction(String operation) {
String action = getWSAInputAction(operation);
if (action == null || "".equals(action)) action = getSoapAction(operation);
return action;
}
/**
* Attempts to extract the WS-Addressing "Action" attribute value from the operation definition.
* When WS-Addressing is being used by a service provider, the "Action" is specified in the
* portType->operation instead of the SOAP binding->operation.
*
* @param operation The name of the operation to extract the SOAP Action from
* @return the SOAPAction value if one is specified, otherwise empty string
*/
public String getWSAInputAction(String operation) {
BindingOperation bop = binding.getBindingOperation(operation, null, null);
if (bop == null) return "";
Input input = bop.getOperation().getInput();
if (input != null) {
Object actionQName = input.getExtensionAttribute(new QName(Namespaces.WS_ADDRESSING_NS, "Action"));
if (actionQName != null && actionQName instanceof QName)
return ((QName) actionQName).getLocalPart();
}
return "";
}
/**
* Attempts to extract the SOAP Action is defined in the WSDL document.
*
* @param operation The name of the operation to extract the SOAP Action from
* @return the SOAPAction value if one is specified, otherwise empty string
*/
public String getSoapAction(String operation) {
BindingOperation bop = binding.getBindingOperation(operation, null, null);
if (bop == null)
return "";
for (SOAPOperation soapOp : CollectionsX.filter(bop.getExtensibilityElements(), SOAPOperation.class))
return soapOp.getSoapActionURI();
return "";
}
}