Package com.sun.enterprise.ee.web.sessmgmt

Source Code of com.sun.enterprise.ee.web.sessmgmt.MessageProcessor$ReceivedMessageWrapper

/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
*
* Copyright 1997-2007 Sun Microsystems, Inc. All rights reserved.
*
* The contents of this file are subject to the terms of either the GNU
* General Public License Version 2 only ("GPL") or the Common Development
* and Distribution License("CDDL") (collectively, the "License").  You
* may not use this file except in compliance with the License. You can obtain
* a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
* or glassfish/bootstrap/legal/LICENSE.txt.  See the License for the specific
* language governing permissions and limitations under the License.
*
* When distributing the software, include this License Header Notice in each
* file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
* Sun designates this particular file as subject to the "Classpath" exception
* as provided by Sun in the GPL Version 2 section of the License file that
* accompanied this code.  If applicable, add the following below the License
* Header, with the fields enclosed by brackets [] replaced by your own
* identifying information: "Portions Copyrighted [year]
* [name of copyright owner]"
*
* Contributor(s):
*
* If you wish your version of this file to be governed by only the CDDL or
* only the GPL Version 2, indicate your decision by adding "[Contributor]
* elects to include this software in this distribution under the [CDDL or GPL
* Version 2] license."  If you don't indicate a single choice of license, a
* recipient has the option to distribute your version of this file under
* either the CDDL, the GPL Version 2 or to extend the choice of license to
* its licensees as provided above.  However, if you add GPL Version 2 code
* and therefore, elected the GPL Version 2 license, then the option applies
* only if the new code is made subject to such option by the copyright
* holder.
*/

package com.sun.enterprise.ee.web.sessmgmt;

import net.jxta.endpoint.Message;
import net.jxta.endpoint.MessageElement;
import net.jxta.endpoint.StringMessageElement;
import net.jxta.util.JxtaBiDiPipe;
import net.jxta.pipe.PipeMsgEvent;
import net.jxta.protocol.RouteAdvertisement;
import net.jxta.document.XMLDocument;
import net.jxta.document.StructuredDocumentFactory;
import net.jxta.document.AdvertisementFactory;

import java.util.logging.Level;
import java.util.logging.Logger;
import java.util.List;
import java.security.AccessController;
import java.io.IOException;

import com.sun.logging.LogDomains;
import com.sun.enterprise.web.ServerConfigLookup;
import org.apache.catalina.Globals;

/**
* @author Mahesh Kannan
*/
public class MessageProcessor {
    private final static Logger _logger
            = LogDomains.getLogger(LogDomains.WEB_LOGGER);

    private static final Logger _pipelogger = ReplicationUtil.getPipeLogger();

    private final static String SenderMessage = "pipe_tutorial";


    private final static Level TRACE_LEVEL = Level.FINE;


    private static Boolean _replicationMeasurementEnabled = null;


    private static int _replicationMeasurementInterval = -1;

    private static Boolean _waitForFastAckConfigured;

    private static MessageProcessor _me = new MessageProcessor();

    public static MessageProcessor getInstance() {
        return _me;
    }

    public void pipeMsgEvent(PipeMsgEvent event, JxtaBiDiPipe thePipe) {
        if (_logger.isLoggable(Level.FINE)) {
            _logger.fine("PipeWrapper>>pipeMsgEvent");
        }
        Message msg = null;
        try {
            // grab the message from the event
            msg = event.getMessage();
            if (_logger.isLoggable(Level.FINE)) {
                _logger.fine("PipeWrapper>>pipeMsgEvent:msg=" + msg);
            }
            if (msg == null) {
                if (_logger.isLoggable(Level.FINE)) {
                    _logger.fine("Received an empty message, returning");
                }
                return;
            }

            // get the message element named SenderMessage

            MessageElement instanceNameMsgElement =
                    msg.getMessageElement(ReplicationState.InstanceNameMessage, ReplicationState.InstanceNameMessage);
            //ignore broadcasts from yourself
            String returnInstance = null;
            if (instanceNameMsgElement != null) {
                returnInstance = instanceNameMsgElement.toString();
                if (_logger.isLoggable(Level.FINE)) {
                    _logger.fine("InputPipeWrapper:incoming propagated msg from: " + returnInstance);
                }
                if (ReplicationUtil.getInstanceName().equalsIgnoreCase(returnInstance)) {
                    return;
                }
            }

            MessageElement readyMsgElement = msg.getMessageElement(ReplicationState.ReadyMessage, ReplicationState.ReadyMessage);
            if (readyMsgElement != null) {
                try {
                    Thread.currentThread().sleep(2000L);
                } catch (InterruptedException ex) {
                }
                if (_pipelogger.isLoggable(Level.FINE)) {
                    _pipelogger.fine("readyMsgElement=" + readyMsgElement.toString() + " from: " + returnInstance);
                }
                JoinNotificationEventHandler.checkAndDoJoinFor(returnInstance);
                //TODO Can we return??
            }


            MessageElement bulkMsgElement = msg.getMessageElement(ReplicationState.BULK_MESSAGE_MODE, ReplicationState.BULK_MESSAGE_MODE);
            if (bulkMsgElement != null) {
                processBulkMessage(msg, bulkMsgElement, thePipe);
                return;
            }


            MessageElement idMsgElement = msg.getMessageElement(ReplicationState.MESSAGE_ID, ReplicationState.MESSAGE_ID);
            if (idMsgElement != null) {
                if (_logger.isLoggable(Level.FINE)) {
                    _logger.fine("idMsgElement=" + idMsgElement.toString());
                }
            }
            MessageElement commandMsgElement =
                    msg.getMessageElement(ReplicationState.MESSAGE_COMMAND, ReplicationState.MESSAGE_COMMAND);
            if (commandMsgElement != null) {
                String theCommand = commandMsgElement.toString();
                if (isBroadcastMethod(theCommand)) {
                    //processQueryMessage(msg, idMsgElement, returnInstance);
                    ReplicationMessageRouter receiver = getRouter();
                    ReplicationState state = createReplicationState(msg);
                    processQueryState(state, receiver);
                }
            }


        } catch (Exception e) {
            // FIXME evaluate log level
            if (_logger.isLoggable(Level.FINE)) {
                _logger.log(Level.FINE,
                        "Exception occurred processing pipeMsgEvent msg=" + msg,
                        e);
            }
            return;
        }
    }

    //This method is assumed to be called only for broadcast method
    private ReplicationState createReplicationState(Message msg) {
        MessageElement bidiMsgElement =
                msg.getMessageElement(ReplicationState.MESSAGE_BIDI_STYLE,
                        ReplicationState.MESSAGE_BIDI_STYLE);
        boolean isBiDiStyle = (bidiMsgElement == null)
            ? false : Boolean.valueOf(bidiMsgElement.toString());
       
        ReplicationState result
                = isBiDiStyle ? ReplicationState.createReplicationState(msg)
                : ReplicationState.createBroadcastReplicationState(msg);
        result.setBiDiStyle(isBiDiStyle);
        //adding route advertisment to this input query state
        RouteAdvertisement routeAdv = getRouteAdvertisement(msg);
        result.setRouteAdvertisement(routeAdv);
        return result;
    }

    private void processQueryState(ReplicationState state, ReplicationMessageRouter receiver) {
        if (state.isResponseState()) {
            if (_logger.isLoggable(Level.FINE)) {
                _logger.fine("InputPipeWrapper:response command = " + state.getCommand());
            }
            processQueryResponse(state, receiver);
        } else {
            processQueryNonResponse(state, receiver);
        }
    }

    private void processQueryResponse(ReplicationState state, ReplicationMessageRouter receiver) {
        if (state.getCommand() != null && state.getCommand().equalsIgnoreCase("unicastResponse")) {
            receiver.processUnicastQueryResponse(state);
        } else {
            //this is a load response which gets processed quickly
            receiver.processQueryResponse(state);
        }
    }

    private void processQueryNonResponse(ReplicationState state, ReplicationMessageRouter receiver) {
        //not a response - only use separate thread for expensive commands
        if (isExpensiveMethod(state.getCommand())) {
            Runnable r = new ReceivedMessageWrapper(receiver, state);
            JxtaReplicationSender.executeTask(r);
        } else {
            receiver.processQueryMessage(state, state.getInstanceName());
        }
    }

    private boolean isBroadcastMethod(String theCommand) {
        ReplicationMessageRouter receiver = getRouter();
        return receiver.isBroadcastMethod(theCommand);
    }

    private boolean isExpensiveMethod(String theCommand) {
        ReplicationMessageRouter receiver = getRouter();
        return receiver.isExpensiveMethod(theCommand);
    }

    private RouteAdvertisement getRouteAdvertisement(Message msg) {
        RouteAdvertisement routeAdv = null;
        MessageElement routeElement = msg.getMessageElement(ReplicationState.NAMESPACE, ReplicationState.ROUTEADV);
        if (routeElement != null) {
            try {
                XMLDocument asDoc = (XMLDocument) StructuredDocumentFactory.newStructuredDocument(
                        routeElement.getMimeType(), routeElement.getStream());
                routeAdv = (RouteAdvertisement)
                        AdvertisementFactory.newAdvertisement(asDoc);
            } catch (IOException io) {
                io.printStackTrace();
            }
        }
        return routeAdv;
    }

    protected void processStartupMessage(Message msg, MessageElement msgElement) {
        // Get message
        if (msgElement.toString() == null) {
            if (_logger.isLoggable(Level.FINE)) {
                _logger.fine("null msg received");
            }
        } else {
            if (_logger.isLoggable(Level.FINE)) {
                _logger.fine("Message  :" + msgElement.toString());
            }
            //put comment start back here
            //send back response if it isn't already a response
            if (!(msgElement.toString()).startsWith("RETURN_MSG_COMMAND")) {
                Message returnMsg = new Message();
                String returnData = "ReturnMessage  :" + msgElement.toString();
                returnMsg.addMessageElement(SenderMessage,
                        new StringMessageElement(SenderMessage,
                                returnData,
                                null));
                if (_logger.isLoggable(Level.FINE)) {
                    _logger.fine("SendingResponse :" + returnData);
                }
            }
        }
    }

    protected void processIdMessage(Message msg, MessageElement msgElement) {

    }

    private void processBulkMessage(Message msg, MessageElement idMsgElement,
                                           JxtaBiDiPipe ackPipe) {
        if (_logger.isLoggable(Level.FINE)) {
            _logger.log(Level.FINE, "[MessageProcessor]:  processBulkMessage...");
        }
        //_logger.log(Level.INFO, ">>PipeWrapper:  processBulkMessage...");
        printMessageReceiptStats(msg);
        ReplicationMessageRouter receiver = getRouter();
        //check if this is a return bulk message and act accordingly
        //System.out.println("<<processBulkMessage:isVoidReturnMessage=" + ReplicationState.isVoidMethodReturnMessage(msg));
        //System.out.println("<<processBulkMessage:isAckRequiredForMessage=" + ReplicationState.isAckRequiredForMessage(msg));
        //System.out.println("<<processBulkMessage:isResponseMessage=" + ReplicationState.isResponseMessage(msg));

        if (!ReplicationState.isAckRequiredForMessage(msg)
                && !ReplicationState.isVoidMethodReturnMessage(msg)
                && ReplicationState.isResponseMessage(msg)) {
            List<String> ackIds = ReplicationState.extractAckIdsListFromMessage(msg);
            //iterate and deliver acks to each id in ackIds list and return
            processAcks(ackIds, receiver);
            return;
        }

        //send ack if required
        List<ReplicationState> states = ReplicationState.extractBulkReplicationStatesFromMessage(msg);
        if (!isWaitForFastAckConfigured()) {
            this.checkSendImmediateBulkAck(msg, states, ackPipe);
        }
        //_logger.log(Level.INFO, ">>PipeWrapper:  states size = " + states.size());
        for (ReplicationState state : states) {
            if (_logger.isLoggable(TRACE_LEVEL)) {
                _logger.log(TRACE_LEVEL, "<<PipeWrapper:  receiving id: " + state.getId() + "[ver:" + state.getVersion() + "]");
            }

            if (state.isBiDiStyle()) {
                receiver.processMessage(state);
            } else {
                processQueryState(state, receiver);
            }
        }
        if (_logger.isLoggable(Level.FINE)) {
            _logger.log(Level.FINE, "<<PipeWrapper:  processBulkMessage complete...");
        }
    }

    public boolean isWaitForFastAckConfigured() {
        if (_waitForFastAckConfigured == null) {
            ServerConfigLookup lookup = new ServerConfigLookup();
            boolean waitForFastAckProp = lookup.getWaitForFastAckPropertyFromConfig();
            _waitForFastAckConfigured = new Boolean(waitForFastAckProp);
            //System.out.println("isWaitForFastAckConfigured = " + _waitForFastAckConfigured.booleanValue());
        }
        return _waitForFastAckConfigured.booleanValue();
    }

    private void checkSendImmediateBulkAck(Message msg, List<ReplicationState> states,
                                           JxtaBiDiPipe ackPipe) {
        if (_logger.isLoggable(Level.FINE)) {
            _logger.fine("<<checkSendImmediateBulkAck:isVoidReturnMessage=" + ReplicationState.isVoidMethodReturnMessage(msg));
            _logger.fine("<<checkSendImmediateBulkAck:isAckRequiredForMessage=" + ReplicationState.isAckRequiredForMessage(msg));
        }
        if (ReplicationState.isVoidMethodReturnMessage(msg)
                && ReplicationState.isAckRequiredForMessage(msg)) {
            //FIXME send immediate ack
            Message ackMsg = ReplicationState.createBulkAckMessageFrom(msg, states);

            MessageElement origMsgElement = msg.getMessageElement(ReplicationState.ORIGINATING_INSTANCE_NAME,
                    ReplicationState.ORIGINATING_INSTANCE_NAME);
            MessageElement idMsgElement = msg.getMessageElement(ReplicationState.MESSAGE_ID,
                    ReplicationState.MESSAGE_ID);
            MessageElement commandMsgElement = msg.getMessageElement(ReplicationState.MESSAGE_COMMAND,
                    ReplicationState.MESSAGE_COMMAND);
            if (origMsgElement != null && idMsgElement != null && commandMsgElement != null) {
                String originatingInstanceName = origMsgElement.toString();
                JxtaUnicastPipeUtil.sendOverUnicastPipe(null, originatingInstanceName, idMsgElement.toString(),
                            commandMsgElement.toString(), ackMsg);
            }
        }
    }

    private void processAcks(List<String> acksList, ReplicationMessageRouter receiver) {
        //_logger.log(Level.INFO, ">>PipeWrapper:processAcks:  acksList size = " + acksList.size());
        for (int i = 0; i < acksList.size(); i++) {
            //_logger.log(Level.INFO,"processAcks:nextIdToAck:" + acksList.get(i));
            ReplicationState nextState = new ReplicationState(acksList.get(i));
            receiver.processResponse(nextState);
        }
    }

    private void printMessageReceiptStats(Message receivedMessage) {
        if (!this.getReplicationMeasurementEnabled()) {
            return;
        }
        int measurementInterval = this.getReplicationMeasurementInterval();
        long id = -1L;
        MessageElement idMsgElement =
                receivedMessage.getMessageElement(ReplicationState.MESSAGE_ID, ReplicationState.MESSAGE_ID);
        if (idMsgElement != null) {
            id = (Long.decode(idMsgElement.toString())).longValue();
            //System.out.println("messageReceived: bulkId = " + id);
        }
        if (id % measurementInterval != 0) {
            return;
        }
        //get send start time for measurements
        long sendStartTime = -1L;
        MessageElement originatingMsgElement =
                receivedMessage.getMessageElement(ReplicationState.ORIGINATING_INSTANCE_NAME,
                        ReplicationState.ORIGINATING_INSTANCE_NAME);
        MessageElement sendStartMsgElement =
                receivedMessage.getMessageElement(ReplicationState.MESSAGE_SEND_START_TIME, ReplicationState.MESSAGE_SEND_START_TIME);
        if (sendStartMsgElement != null) {
            sendStartTime =
                    (Long.decode(sendStartMsgElement.toString())).longValue();
            if (sendStartTime > 0L) {
                //System.out.println("message receipt time: " + (System.currentTimeMillis() - sendStartTime));
            }
        }
        _logger.log(Level.INFO, "messageReceiptSucceeded: bulkId = " + id + " receiptTime = "
                + (System.currentTimeMillis() - sendStartTime) + " from partner: " +
                originatingMsgElement != null ? originatingMsgElement.toString() : "?");
    }

    private boolean getReplicationMeasurementEnabled() {
        if (_replicationMeasurementEnabled == null) {
            ServerConfigLookup lookup = new ServerConfigLookup();
            _replicationMeasurementEnabled
                    = new Boolean(lookup.getReplicationMeasurementEnabledFromConfig());
        }
        return _replicationMeasurementEnabled.booleanValue();
    }

    private int getReplicationMeasurementInterval() {
        if (_replicationMeasurementInterval == -1) {
            ServerConfigLookup lookup = new ServerConfigLookup();
            _replicationMeasurementInterval
                    = lookup.getReplicationMeasurementIntervalFromConfig();
        }
        return _replicationMeasurementInterval;
    }

    private ReplicationMessageRouter getRouter() {
        ReplicationMessageRouter receiver = null;
        if (Globals.IS_SECURITY_ENABLED) {
            receiver = (ReplicationMessageRouter)
                    AccessController.doPrivileged(
                            new PrivilegedGetReplicationMessageRouter());
        } else {
            receiver = ReplicationMessageRouter.createInstance();
        }
        return receiver;
    }

    class ReceivedMessageWrapper
            implements Runnable {

        ReplicationMessageRouter receiver;
        ReplicationState state;

        ReceivedMessageWrapper(ReplicationMessageRouter receiver, ReplicationState state) {
            this.receiver = receiver;
            this.state = state;
        }

        public void run() {
            try {
                //handleReceivedMessage(msg);
                receiver.processQueryMessage(state, state.getInstanceName());
            } catch (Exception ex) {
                if (_logger.isLoggable(Level.FINE)) {
                    _logger.log(Level.FINE, "caught exception", ex);
                }
            }
        }
    }

}
TOP

Related Classes of com.sun.enterprise.ee.web.sessmgmt.MessageProcessor$ReceivedMessageWrapper

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.