Package org.apache.sandesha2.workers

Source Code of org.apache.sandesha2.workers.Sender

/*
* Copyright 1999-2004 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.sandesha2.workers;

import java.util.Collection;
import java.util.Iterator;
import org.apache.axis2.AxisFault;
import org.apache.axis2.context.ConfigurationContext;
import org.apache.axis2.context.MessageContext;
import org.apache.axis2.context.ServiceContext;
import org.apache.axis2.engine.AxisEngine;
import org.apache.axis2.soap.SOAPEnvelope;
import org.apache.sandesha2.AcknowledgementManager;
import org.apache.sandesha2.Sandesha2ClientAPI;
import org.apache.sandesha2.Sandesha2Constants;
import org.apache.sandesha2.RMMsgContext;
import org.apache.sandesha2.SandeshaException;
import org.apache.sandesha2.TerminateManager;
import org.apache.sandesha2.storage.StorageManager;
import org.apache.sandesha2.storage.beanmanagers.SenderBeanMgr;
import org.apache.sandesha2.storage.beans.SenderBean;
import org.apache.sandesha2.util.MessageRetransmissionAdjuster;
import org.apache.sandesha2.util.MsgInitializer;
import org.apache.sandesha2.util.SandeshaUtil;
import org.apache.sandesha2.wsrm.TerminateSequence;

/**
* This is responsible for sending and re-sending messages of Sandesha2. This represent a thread that keep running all
* the time. This keep looking at the Sender table to find out any entries that should be sent.
*
* @author Chamikara Jayalath <chamikaramj@gmail.com>
*/

public class Sender extends Thread {

  private boolean senderStarted = false;

  private ConfigurationContext context = null;

  public synchronized void stopSender() {
    senderStarted = false;
  }

  public synchronized boolean isSenderStarted() {
    return senderStarted;
  }

  public void run() {

    while (senderStarted) {
      try {
        if (context == null)
          throw new SandeshaException(
              "Can't continue the Sender. Context is null");

        StorageManager storageManager = SandeshaUtil
            .getSandeshaStorageManager(context);

        SenderBeanMgr mgr = storageManager
            .getRetransmitterBeanMgr();
        Collection coll = mgr.findMsgsToSend();
        Iterator iter = coll.iterator();

        while (iter.hasNext()) {

          SenderBean bean = (SenderBean) iter.next();
          String key = (String) bean.getKey();
          MessageContext msgCtx = SandeshaUtil
              .getStoredMessageContext(key);

          try {
            RMMsgContext rmMsgCtx = MsgInitializer
                .initializeMessage(msgCtx);

            updateMessage(msgCtx);

            ServiceContext serviceContext = msgCtx
                .getServiceContext();
            Object debug = null;
            if (serviceContext != null) {
              debug = msgCtx
                  .getProperty(Sandesha2ClientAPI.SANDESHA_DEBUG_MODE);
              if (debug != null && "on".equals(debug)) {
                System.out
                    .println("DEBUG: Sender is sending a '"
                        + SandeshaUtil
                            .getMessageTypeString(rmMsgCtx
                                .getMessageType())
                        + "' message.");
              }
            }

            if (rmMsgCtx.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
              //piggybacking if an ack if available for the same
              // sequence.
              AcknowledgementManager
                  .piggybackAckIfPresent(rmMsgCtx);
            }

            try {
              new AxisEngine(context).send(msgCtx);
            } catch (Exception e) {
              //Exception is sending. retry later
              System.out
                  .println("Exception thrown in sending...");
              e.printStackTrace();
            }

            MessageRetransmissionAdjuster retransmitterAdjuster = new MessageRetransmissionAdjuster();
            retransmitterAdjuster.adjustRetransmittion(bean);

            mgr.update(bean);

            if (!msgCtx.isServerSide())
              checkForSyncResponses(msgCtx);
           
           
            if (rmMsgCtx.getMessageType()==Sandesha2Constants.MessageTypes.TERMINATE_SEQ) {
              //terminate sending side.
              TerminateSequence terminateSequence = (TerminateSequence) rmMsgCtx.getMessagePart(Sandesha2Constants.MessageParts.TERMINATE_SEQ);
              String sequenceID = terminateSequence.getIdentifier().getIdentifier();
              ConfigurationContext configContext = msgCtx.getConfigurationContext();
             
              TerminateManager.terminateSendingSide(configContext,sequenceID);
            }

          } catch (AxisFault e1) {
            e1.printStackTrace();
          } catch (Exception e3) {
            e3.printStackTrace();
          }

          //changing the values of the sent bean.
          //bean.setLastSentTime(System.currentTimeMillis());
          //bean.setSentCount(bean.getSentCount() + 1);

          //update if resend=true otherwise delete. (reSend=false
          // means
          // send only once).
          if (bean.isReSend())
            mgr.update(bean);
          else
            mgr.delete(bean.getMessageId());

        }
      } catch (SandeshaException e) {
        e.printStackTrace();
        return;
      }

      try {
        Thread.sleep(2000);
      } catch (InterruptedException e1) {
        //e1.printStackTrace();
        System.out.println("Sender was interupted...");
        e1.printStackTrace();
        System.out.println("End printing Interrupt...");
      }
    }

  }

  private boolean isResponseExpected(RMMsgContext rmMsgCtx) {
    boolean responseExpected = false;

    if (rmMsgCtx.getMessageType() == Sandesha2Constants.MessageTypes.CREATE_SEQ) {
      responseExpected = true;
    }
    if (rmMsgCtx.getMessageType() == Sandesha2Constants.MessageTypes.APPLICATION) {
      //a ack may arrive. (not a application response)
      if (rmMsgCtx.getMessageContext().getAxisOperation()
          .getMessageExchangePattern().equals(
              org.apache.wsdl.WSDLConstants.MEP_URI_IN_OUT)) {
        responseExpected = true;
      }
    }

    return true;
  }

  public void start(ConfigurationContext context) {
    senderStarted = true;
    this.context = context;
    super.start();
  }

  private void updateMessage(MessageContext msgCtx1) throws SandeshaException {
    try {
      RMMsgContext rmMsgCtx1 = MsgInitializer.initializeMessage(msgCtx1);
      rmMsgCtx1.addSOAPEnvelope();

    } catch (AxisFault e) {
      throw new SandeshaException("Exception in updating contexts");
    }

  }

  private void checkForSyncResponses(MessageContext msgCtx) throws AxisFault {

    boolean responsePresent = (msgCtx
        .getProperty(MessageContext.TRANSPORT_IN) != null);

    if (responsePresent) {
      //create the response
      MessageContext response = new MessageContext(msgCtx
          .getConfigurationContext(), msgCtx.getSessionContext(), msgCtx
          .getTransportIn(), msgCtx.getTransportOut());
      response.setProperty(MessageContext.TRANSPORT_IN, msgCtx
          .getProperty(MessageContext.TRANSPORT_IN));

      response.setServerSide(false);

      //If request is REST we assume the response is REST, so set the
      // variable
      response.setDoingREST(msgCtx.isDoingREST());
      response
          .setServiceGroupContextId(msgCtx.getServiceGroupContextId());
      response.setServiceGroupContext(msgCtx.getServiceGroupContext());
      response.setServiceContext(msgCtx.getServiceContext());
      response.setAxisService(msgCtx.getAxisService());
      response.setAxisServiceGroup(msgCtx.getAxisServiceGroup());

      //setting the in-flow.
      //ArrayList inPhaseHandlers =
      // response.getAxisOperation().getRemainingPhasesInFlow();
      /*
       * if (inPhaseHandlers==null || inPhaseHandlers.isEmpty()) {
       * ArrayList phases =
       * msgCtx.getSystemContext().getAxisConfiguration().getInPhasesUptoAndIncludingPostDispatch();
       * response.getAxisOperation().setRemainingPhasesInFlow(phases); }
       */

      //Changed following from TransportUtils to SandeshaUtil since op.
      // context is anavailable.
      SOAPEnvelope resenvelope = null;
      try {
        resenvelope = SandeshaUtil.createSOAPMessage(response, msgCtx
            .getEnvelope().getNamespace().getName());
      } catch (AxisFault e) {
        //TODO: change to log.debug
        e.printStackTrace();
      }

      if (resenvelope != null) {
        AxisEngine engine = new AxisEngine(msgCtx.getConfigurationContext());
        response.setEnvelope(resenvelope);
        engine.receive(response);
      }
    }
  }

}
TOP

Related Classes of org.apache.sandesha2.workers.Sender

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.