Package org.apache.sandesha2.msgprocessors

Source Code of org.apache.sandesha2.msgprocessors.MakeConnectionProcessor

package org.apache.sandesha2.msgprocessors;

import java.util.Collection;
import java.util.Iterator;
import java.util.Random;

import org.apache.axis2.AxisFault;
import org.apache.axis2.Constants;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.OperationContext;
import org.apache.axis2.description.AxisOperation;
import org.apache.axis2.description.TransportOutDescription;
import org.apache.axis2.transport.RequestResponseTransport;
import org.apache.axis2.transport.TransportUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.i18n.SandeshaMessageHelper;
import org.apache.sandesha2.i18n.SandeshaMessageKeys;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.Transaction;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beans.RMSBean;
import org.apache.sandesha2.storage.beans.SenderBean;
import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.util.SpecSpecificConstants;
import org.apache.sandesha2.workers.SenderWorker;
import org.apache.sandesha2.wsrm.Address;
import org.apache.sandesha2.wsrm.Identifier;
import org.apache.sandesha2.wsrm.MakeConnection;
import org.apache.sandesha2.wsrm.MessagePending;

/**
* This class is responsible for processing MakeConnection request messages that come to the system.
* MakeConnection is only supported by WSRM 1.1
* Here a client can ask for reply messages using a polling mechanism, so even clients without real
* endpoints can ask for reliable response messages.
*/
public class MakeConnectionProcessor implements MsgProcessor {

  private static final Log log = LogFactory.getLog(MakeConnectionProcessor.class);

  /**
   * Prosesses incoming MakeConnection request messages.
   * A message is selected by the set of SenderBeans that are waiting to be sent.
   * This is processed using a SenderWorker.
   */
  public boolean processInMessage(RMMsgContext rmMsgCtx, Transaction transaction) throws AxisFault {
    if(log.isDebugEnabled()) log.debug("Enter: MakeConnectionProcessor::processInMessage " + rmMsgCtx.getSOAPEnvelope().getBody());

    MakeConnection makeConnection = (MakeConnection) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.MAKE_CONNECTION);
    Address address = makeConnection.getAddress();
    Identifier identifier = makeConnection.getIdentifier();
   
    ConfigurationContext configurationContext = rmMsgCtx.getConfigurationContext();
    StorageManager storageManager = SandeshaUtil.getSandeshaStorageManager(configurationContext,configurationContext.getAxisConfiguration());
   
    SenderBeanMgr senderBeanMgr = storageManager.getSenderBeanMgr();
   
    //selecting the set of SenderBeans that suit the given criteria.
    SenderBean findSenderBean = new SenderBean ();
    findSenderBean.setSend(true);
    findSenderBean.setTransportAvailable(false);
   
    if (address!=null)
      findSenderBean.setToAddress(address.getAddress());
   
    if (identifier!=null)
      findSenderBean.setSequenceID(identifier.getIdentifier());
   
    // Set the time to send field to be now
    findSenderBean.setTimeToSend(System.currentTimeMillis());
   
    //finding the beans that go with the criteria of the passed SenderBean
    //The reSend flag is ignored for this selection, so there is no need to
    //set it.
    Collection collection = senderBeanMgr.find(findSenderBean);
   
    //removing beans that does not pass the resend test
    for (Iterator it=collection.iterator();it.hasNext();) {
      SenderBean bean = (SenderBean) it.next();
      if (!bean.isReSend() && bean.getSentCount()>0)
        it.remove();
    }
   
    //selecting a bean to send RANDOMLY. TODO- Should use a better mechanism.
    int size = collection.size();
    int itemToPick=-1;
   
    boolean pending = false;
    if (size>0) {
      Random random = new Random ();
      itemToPick = random.nextInt(size);
    }

    if (size>1)
      pending = true//there are more than one message to be delivered using the makeConnection.
               //So the MessagePending header should have value true;
   
    Iterator it = collection.iterator();
   
    SenderBean senderBean = null;
    for (int item=0;item<size;item++) {
        senderBean = (SenderBean) it.next();
      if (item==itemToPick)
        break;
    }

    if (senderBean==null) {
      if(log.isDebugEnabled()) log.debug("Exit: MakeConnectionProcessor::processInMessage, no matching message found");
      return false;
    }
   
    replyToPoll(rmMsgCtx, senderBean, storageManager, pending, makeConnection.getNamespaceValue(), transaction);
   
    if(log.isDebugEnabled()) log.debug("Exit: MakeConnectionProcessor::processInMessage");
    return false;
  }
 
  public static void replyToPoll(RMMsgContext pollMessage,
      SenderBean matchingMessage,
      StorageManager storageManager,
      boolean pending,
      String makeConnectionNamespace,
      Transaction transaction)
  throws AxisFault
  {
    if(log.isDebugEnabled()) log.debug("Enter: MakeConnectionProcessor::replyToPoll");
    TransportOutDescription transportOut = pollMessage.getMessageContext().getTransportOut();
    if (transportOut==null) {
      String message = SandeshaMessageHelper.getMessage(
          SandeshaMessageKeys.cantSendMakeConnectionNoTransportOut);
      if(log.isDebugEnabled()) log.debug(message);
      throw new SandeshaException (message);
    }
     
    String messageStorageKey = matchingMessage.getMessageContextRefKey();
    MessageContext returnMessage = storageManager.retrieveMessageContext(messageStorageKey,pollMessage.getConfigurationContext());
    if (returnMessage==null) {
      String message = "Cannot find the message stored with the key:" + messageStorageKey;
      if(log.isDebugEnabled()) log.debug(message);
      // Someone else has either removed the sender & message, or another make connection got here first.
      return;
    }
   
    if(pending) addMessagePendingHeader(returnMessage, makeConnectionNamespace);
    boolean continueSending = true;
    RMMsgContext returnRMMsg = MsgInitializer.initializeMessage(returnMessage);
    if(returnRMMsg.getRMNamespaceValue()==null){
      //this is the case when a stored application response msg was not sucecsfully returned
      //on the sending transport's backchannel. Since the msg was stored without a sequence header
      //we need to lookup the namespace using the RMS bean
      if(log.isDebugEnabled()) log.debug("Looking up rmNamespace from RMS bean");
      String sequenceID = matchingMessage.getSequenceID();
      if(sequenceID!=null){
        RMSBean rmsBean = new RMSBean();
        rmsBean.setSequenceID(sequenceID);
        rmsBean = storageManager.getRMSBeanMgr().findUnique(rmsBean);
        if(rmsBean!=null){
          returnRMMsg.setRMNamespaceValue(SpecSpecificConstants.getRMNamespaceValue(rmsBean.getRMVersion()));
        }
        else{
          //we will never be able to reply to this msg - at the moment the best bet is
          //to not process the reply anymore
          if(log.isDebugEnabled()) log.debug("Could not find RMS bean for polled msg");
          continueSending = false;
          //also remove the sender bean so that we do not select this again
          storageManager.getSenderBeanMgr().delete(matchingMessage.getMessageID());
        }
      }
    }
    if(continueSending){
      setTransportProperties (returnMessage, pollMessage);
     
      // Link the response to the request

      AxisOperation operation = SpecSpecificConstants.getWSRMOperation(Sandesha2Constants.MessageTypes.POLL_RESPONSE_MESSAGE, pollMessage.getRMSpecVersion(), pollMessage.getMessageContext().getAxisService());
      OperationContext context = new OperationContext (operation, pollMessage.getMessageContext().getServiceContext());
     
      context.addMessageContext(returnMessage);
      returnMessage.setServiceContext(null);
      returnMessage.setOperationContext(context);
     
      returnMessage.setProperty(Sandesha2Constants.MAKE_CONNECTION_RESPONSE, Boolean.TRUE);
      returnMessage.setProperty(RequestResponseTransport.TRANSPORT_CONTROL, pollMessage.getProperty(RequestResponseTransport.TRANSPORT_CONTROL));
     
      // Commit the current transaction, so that the SenderWorker can do it's own locking
      if(transaction != null && transaction.isActive()) transaction.commit();
     
      //running the MakeConnection through a SenderWorker.
      //This will allow Sandesha2 to consider both of following senarios equally.
      //  1. A message being sent by the Sender thread.
      //  2. A message being sent as a reply to an MakeConnection.
      SenderWorker worker = new SenderWorker (pollMessage.getConfigurationContext(), matchingMessage, pollMessage.getRMSpecVersion());
      worker.setMessage(returnRMMsg);
      worker.run();     
     
      TransportUtils.setResponseWritten(pollMessage.getMessageContext(), true);
    }
   
    if(log.isDebugEnabled()) log.debug("Exit: MakeConnectionProcessor::replyToPoll");
  }
 
  private static void addMessagePendingHeader (MessageContext returnMessage, String namespace) throws SandeshaException {
    MessagePending messagePending = new MessagePending(namespace);
    messagePending.setPending(true);
    messagePending.toSOAPEnvelope(returnMessage.getEnvelope());
  }

  public boolean processOutMessage(RMMsgContext rmMsgCtx) {
    return false;
  }

  private static void setTransportProperties (MessageContext returnMessage, RMMsgContext makeConnectionMessage) {
        returnMessage.setProperty(MessageContext.TRANSPORT_OUT,makeConnectionMessage.getProperty(MessageContext.TRANSPORT_OUT));
        returnMessage.setProperty(Constants.OUT_TRANSPORT_INFO,makeConnectionMessage.getProperty(Constants.OUT_TRANSPORT_INFO));
       
        Object contentType = makeConnectionMessage.getProperty(Constants.Configuration.CONTENT_TYPE);
        returnMessage.setProperty(Constants.Configuration.CONTENT_TYPE, contentType);

        returnMessage.setTransportOut(makeConnectionMessage.getMessageContext().getTransportOut());
  }
}
TOP

Related Classes of org.apache.sandesha2.msgprocessors.MakeConnectionProcessor

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.