Package com.sun.enterprise.ee.web.sessmgmt

Source Code of com.sun.enterprise.ee.web.sessmgmt.ReplicationMessageRouter

/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
*
* Copyright 1997-2007 Sun Microsystems, Inc. All rights reserved.
*
* The contents of this file are subject to the terms of either the GNU
* General Public License Version 2 only ("GPL") or the Common Development
* and Distribution License("CDDL") (collectively, the "License").  You
* may not use this file except in compliance with the License. You can obtain
* a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
* or glassfish/bootstrap/legal/LICENSE.txt.  See the License for the specific
* language governing permissions and limitations under the License.
*
* When distributing the software, include this License Header Notice in each
* file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
* Sun designates this particular file as subject to the "Classpath" exception
* as provided by Sun in the GPL Version 2 section of the License file that
* accompanied this code.  If applicable, add the following below the License
* Header, with the fields enclosed by brackets [] replaced by your own
* identifying information: "Portions Copyrighted [year]
* [name of copyright owner]"
*
* Contributor(s):
*
* If you wish your version of this file to be governed by only the CDDL or
* only the GPL Version 2, indicate your decision by adding "[Contributor]
* elects to include this software in this distribution under the [CDDL or GPL
* Version 2] license."  If you don't indicate a single choice of license, a
* recipient has the option to distribute your version of this file under
* either the CDDL, the GPL Version 2 or to extend the choice of license to
* its licensees as provided above.  However, if you add GPL Version 2 code
* and therefore, elected the GPL Version 2 license, then the option applies
* only if the new code is made subject to such option by the copyright
* holder.
*/
/*
* ReplicationMessageRouter.java
*
* Created on October 12, 2006, 12:03 PM
*
*/

package com.sun.enterprise.ee.web.sessmgmt;

import java.security.SecurityPermission;
import java.util.Collection;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.apache.catalina.Context;
import org.apache.catalina.Container;
import org.apache.catalina.Engine;
import org.apache.catalina.Manager;
import org.apache.catalina.Valve;
import org.apache.catalina.core.ContainerBase;

import com.sun.appserv.ha.util.RepairManager;
import com.sun.appserv.ha.util.RepairRegistry;

import com.sun.enterprise.ee.web.sessmgmt.RepairAgent;
import com.sun.enterprise.ee.web.sessmgmt.ReplicationState;

import com.sun.enterprise.web.EmbeddedWebContainer;

/**
*
* @author Larry White
*/
public class ReplicationMessageRouter implements RepairAgent, PurgeAgent {
   
    public final static String LOGGER_MEM_REP
        = ReplicationState.LOGGER_MEM_REP;   
   
    /**
     * The logger to use for logging ALL web container related messages.
     */
    //protected static final Logger _logger
    //    = LogDomains.getLogger(LogDomains.WEB_LOGGER);
    private static final Logger _logger
        = Logger.getLogger(LOGGER_MEM_REP);   
       
    /**
     * applicationId vs. logical ReplicationManager
     * for web: web appid vs. Manager
     * for sso: virtual server id vs. SingleSignOn valve
     * for ejb: containerid vs. container
     */
    protected static final ConcurrentHashMap _appId2Container
        = new ConcurrentHashMap();
   
    /**
     * The singleton instance of ReplicationMessageRouter
     */   
    private static ReplicationMessageRouter _soleInstance = null;

    /**
     * The singleton instance of ReplicationMessageRouter
     */   
    private static final RepairRegistry _repairRegistry
        = RepairRegistry.getInstance();   

    private static final SecurityPermission REPLICATION_MESSAGE_ROUTER_PERMISSION =
        new SecurityPermission("getReplicationMessageRouter");   
   
    private final static String MESSAGE_BROADCAST_QUERY
        = ReplicationState.MESSAGE_BROADCAST_QUERY;
    private final static String RETURN_BROADCAST_MSG_COMMAND
        = ReplicationState.RETURN_BROADCAST_MSG_COMMAND;
    private final static String RETURN_UNICAST_MSG_COMMAND
        = ReplicationState.RETURN_UNICAST_MSG_COMMAND;
    private final static String MESSAGE_BROADCAST_LOAD_RECEIVED
        = ReplicationState.MESSAGE_BROADCAST_LOAD_RECEIVED;
    private final static String MESSAGE_BROADCAST_PURGE_ADVISORY
        = ReplicationState.MESSAGE_BROADCAST_PURGE_ADVISORY;
    private final static String MESSAGE_APPLICATION_STATUS_QUERY
        = ReplicationState.MESSAGE_APPLICATION_STATUS_QUERY;
    private final static String MESSAGE_BROADCAST_NETWORK_PARTITION_ADVISORY
        = ReplicationState.MESSAGE_BROADCAST_NETWORK_PARTITION_ADVISORY;
   
    private final static long DEFAULT_SLEEP_TIME = 15 * 1000L; //15 seconds
   
    private static final HashMap broadcastMethodMap = new HashMap();
    private static final HashMap expensiveMethodMap = new HashMap();
    private static final HashMap removeMethodMap = new HashMap();
    private static final HashSet saveMethodSet = new HashSet();
    static{
        broadcastMethodMap.put(MESSAGE_BROADCAST_QUERY, MESSAGE_BROADCAST_QUERY);
        broadcastMethodMap.put(RETURN_BROADCAST_MSG_COMMAND, RETURN_BROADCAST_MSG_COMMAND);
        broadcastMethodMap.put(RETURN_UNICAST_MSG_COMMAND, RETURN_UNICAST_MSG_COMMAND);
        broadcastMethodMap.put(MESSAGE_BROADCAST_LOAD_RECEIVED, MESSAGE_BROADCAST_LOAD_RECEIVED);
        broadcastMethodMap.put(MESSAGE_BROADCAST_PURGE_ADVISORY, MESSAGE_BROADCAST_PURGE_ADVISORY);
        broadcastMethodMap.put(MESSAGE_APPLICATION_STATUS_QUERY, MESSAGE_APPLICATION_STATUS_QUERY);
        broadcastMethodMap.put(MESSAGE_BROADCAST_NETWORK_PARTITION_ADVISORY, MESSAGE_BROADCAST_NETWORK_PARTITION_ADVISORY);
    }   
   
    /**
     * The embedded Catalina object.
     */
    protected EmbeddedWebContainer _embedded = null;    
   
    /**
     * a monitor obj for synchronization
     */   
    private static final Object _monitor = new Object();   
   
    /** Return the singleton instance
     *  assume it already exists
     */
    public static ReplicationMessageRouter createInstance() {
        if(_soleInstance != null) {
            return _soleInstance;
        }
        SecurityManager sm = System.getSecurityManager();
        if (sm != null) {
            sm.checkPermission(REPLICATION_MESSAGE_ROUTER_PERMISSION);
        }       
        synchronized (_monitor) {
            if (_soleInstance == null) {
                _soleInstance = new ReplicationMessageRouter();
            }
        }       
        return _soleInstance;
    }   
   
    /** Return the singleton instance
     *  lazily creates a new instance of ReplicationMessageRouter if not created yet
     * @param embedded the embedded web container
     */
    public static ReplicationMessageRouter createInstance(EmbeddedWebContainer embedded) {
        SecurityManager sm = System.getSecurityManager();
        if (sm != null) {
            sm.checkPermission(REPLICATION_MESSAGE_ROUTER_PERMISSION);
        }       
        synchronized (_monitor) {
            if (_soleInstance == null) {
                _soleInstance = new ReplicationMessageRouter(embedded);
            } else {
                _soleInstance._embedded = embedded;
            }
        }
        return _soleInstance;
    }
   
    /** Creates a new instance of ReplicationMessageRouter
     * @param embedded the embedded web container
     */
    public ReplicationMessageRouter(EmbeddedWebContainer embedded) {
        _embedded = embedded;
    }
   
    /** Creates a new instance of ReplicationMessageRouter
     */
    public ReplicationMessageRouter() {
    }    
   
    public void addReplicationManager(String appid, ReplicationManager container) {
        _appId2Container.put(appid, container);
        // Fix for sailfin restart related issues eg., 1499
    }
   
    public ReplicationManager removeReplicationManager(String appid) {
        return (ReplicationManager) _appId2Container.remove(appid);       
    }
   
    private ReplicationManager getReplicationManager(String appid) {
        return (ReplicationManager) _appId2Container.get(appid);
    }
   
    /**
     * Returns all the ReplicationManagers available in this registry.
     *
     * @return   a collection of ReplicationManagers
     */
    private static Collection getReplicationManagers() {
        Collection replicationManagers = null;
        if (_appId2Container != null) {
            synchronized(_appId2Container) {
                replicationManagers = _appId2Container.values();
            }
        }
        return replicationManagers;
    }
   
    public ReplicationManager[] getReplicationManagerArray() {

        ReplicationManager results[] = null;
        synchronized (_appId2Container) {
            results = new ReplicationManager[_appId2Container.size()];
            results = (ReplicationManager[]) _appId2Container.values().toArray(results);
        }
        return (results);

    }
   
    public void checkAppDeployment(String applicationId) {
        long startTime = System.currentTimeMillis();
        ApplicationStatusChecker appStatusChecker
            = new ApplicationStatusChecker();
        long sleepTime = 100L;
        boolean shouldContinue = true;
        while(shouldContinue) {
            if((System.currentTimeMillis() - startTime) > 60000) {
                _logger.warning("checkAppDeployment: Timed out waiting for replicas application" + applicationId + " to finished deploying");
                shouldContinue = false;
                break;
            }
            boolean appStatus = appStatusChecker.isApplicationDeployed(applicationId, "WEB");
            if(_logger.isLoggable(Level.FINE)) {
                _logger.fine("app:" + applicationId + " isDeployed: " + appStatus);
            }
            if(appStatus) {
                shouldContinue = false;
            } else {
                try {
                    Thread.currentThread().sleep(sleepTime);
                } catch (InterruptedException ex) {
                    ;
                }
            }
            sleepTime *= 2;
        }
        if(_logger.isLoggable(Level.FINE)) {
            _logger.fine("checkAppDeployment for application " + applicationId + "took " + (System.currentTimeMillis() - startTime) + "ms");
        }
    }   
   
    /**
     * Returns all the applicationIds available in this registry.
     *
     * @return   a collection of applicationIds
     */
   public static Enumeration getReplicationAppIds(boolean printIt) {
        Enumeration applicationIds = null;
        if (_appId2Container != null) {
            synchronized(_appId2Container) {
                applicationIds = _appId2Container.keys();
            }
        }
        //for testing
        //if(printIt && _logger.isLoggable(Level.FINE)) {
        if(printIt) {
            int i=0;
            while(applicationIds.hasMoreElements()) {
                String nextAppId = (String)applicationIds.nextElement();
                _logger.fine("appid-key[" + i + "]= " + nextAppId +
                             " appid-key-length[" + i + "]= " + nextAppId.length());
                i++;
            }
        }
        //end for testing       
        return applicationIds;
    }
   
    public ReplicationManager findApp(String appName) /*throws IOException*/ {
//        if (_logger.isLoggable(Level.FINER)) {
//            _logger.finer("ReplicationMessageRouter>>findApp:appName = " + appName +
//                          " length = " + appName.length());
//        }
        if (appName == null)
            return (null);
        ReplicationManager mgr = getReplicationManager(appName);
//        if (_logger.isLoggable(Level.FINER)) {
//            _logger.finer("ReplicationMessageRouter>>findApp:mgr = " + mgr);
//        }
        return (mgr);
    }
   
    public void repairApps(long repairStartTime) {
        if (_logger.isLoggable(Level.FINEST)) {
            _logger.finest("ReplicationMessageRouter:repairApps");
        }       
        ReplicationManager[] apps = this.getReplicationManagerArray();
        for(int i=0; i<apps.length; i++) {
            if(ReplicationHealthChecker.isStopping()) {
                break;
            }
            ReplicationManager nextMgr = apps[i];
            nextMgr.repair(repairStartTime);
        }
       
        //now consult _repairRegistry
        RepairManager[] repairApps = _repairRegistry.getRepairManagerArray();
        for(int i=0; i<repairApps.length; i++) {
            if(ReplicationHealthChecker.isStopping()) {
                break;
            }
            RepairManager nextMgr = repairApps[i];
            nextMgr.repair(repairStartTime);
        }       
    }
   
    public void repairApps(long repairStartTime, boolean checkForStopping) {
        if (_logger.isLoggable(Level.FINEST)) {
            _logger.finest("ReplicationMessageRouter:repairApps: checkForStopping: " + checkForStopping);
        }
        if(!checkForStopping) {
            eagerlyLoadRoutingMap();
        }
        ReplicationManager[] apps = this.getReplicationManagerArray();
        //if (_logger.isLoggable(Level.FINEST)) {
        //    _logger.finest("ReplicationMessageRouter:apps length: " + apps.length);
        //}
        for(int i=0; i<apps.length; i++) {
            if(checkForStopping && ReplicationHealthChecker.isStopping()) {
                break;
            }
            ReplicationManager nextMgr = apps[i];
            //if (_logger.isLoggable(Level.FINEST)) {
            //    _logger.finest("ReplicationMessageRouter:repairApps: nextMgr: " + nextMgr);
            //}
            nextMgr.repair(repairStartTime, checkForStopping);
        }
       
        //now consult _repairRegistry
        RepairManager[] repairApps = _repairRegistry.getRepairManagerArray();
        for(int i=0; i<repairApps.length; i++) {
            if(checkForStopping && ReplicationHealthChecker.isStopping()) {
                break;
            }
            RepairManager nextMgr = repairApps[i];
            nextMgr.repair(repairStartTime);
        }       
    }
   
    public void respondToFailure(String failedInstanceName, boolean checkForStopping) {
        if (_logger.isLoggable(Level.FINEST)) {
            _logger.finest("ReplicationMessageRouter:respondToFailure: failedInstance: " + failedInstanceName + " checkForStopping: " + checkForStopping);
        }
        if(!checkForStopping) {
            eagerlyLoadRoutingMap();
        }       
        ReplicationManager[] apps = this.getReplicationManagerArray();
        for(int i=0; i<apps.length; i++) {
            if(checkForStopping && ReplicationHealthChecker.isStopping()) {
                break;
            }
            ReplicationManager nextMgr = apps[i];
            nextMgr.respondToFailure(failedInstanceName, checkForStopping);
        }
    }
   
    public void purge(String owningInstanceName, long purgeStartTime) {
        if (_logger.isLoggable(Level.FINEST)) {
            _logger.finest("ReplicationMessageRouter:purge");
        }       
        ReplicationManager[] apps = this.getReplicationManagerArray();
        for(int i=0; i<apps.length; i++) {
            if(ReplicationHealthChecker.isStopping()) {
                break;
            }
            ReplicationManager nextMgr = apps[i];
            if(nextMgr instanceof PurgeCapable) {
                ((PurgeCapable)nextMgr).purge(owningInstanceName, purgeStartTime);
            }
        }
    }
   
    public void handleDynamicOwnershipChanges(ReplicationManager mgr,
                                              DynamicOwnershipManager.Event event,
                                              String triggeringInstance,
                boolean isLbStateChange
                                              ) {
        if (mgr instanceof DynamicOwnershipManager) {
            DynamicOwnershipChangeHandler handler
                    = new DynamicOwnershipChangeHandler(
                    (DynamicOwnershipManager) mgr, event,
        triggeringInstance, isLbStateChange);
            Thread handleDynamicOwnershipChangeThread
                    = new Thread(handler);
            handleDynamicOwnershipChangeThread.setDaemon(true);
            handleDynamicOwnershipChangeThread.start();
        }
    }

    public void handleDynamicOwnershipChanges(DynamicOwnershipManager.Event event,
                                              String triggeringInstance,
                boolean isLbStateChange) {
        if (_logger.isLoggable(Level.FINEST)) {
            _logger.finest("ReplicationMessageRouter:handleDynamicOwnershipChanges");
        }
        List<String> lbEnabledList = ExpatListHandler.getLbEnabledList(
                triggeringInstance, event, isLbStateChange);
        // skip triggering expat calculation in the lb-disabled instance.
        if (lbEnabledList.contains(ReplicationUtil.getInstanceName())) {
            ReplicationManager[] apps = this.getReplicationManagerArray();
            for (int i = 0; i < apps.length; i++) {
                if (ReplicationHealthChecker.isStopping()) {
                    break;
                }
                ReplicationManager nextMgr = apps[i];
                handleDynamicOwnershipChanges(nextMgr, event,
                        triggeringInstance, isLbStateChange);
            }
        }
    }

    public void syncPointApps(long waitTime) throws RollingUpgradeException {
        //System.out.println("entering syncPointApps:waitTime = " + waitTime + " seconds");
        if (_logger.isLoggable(Level.FINEST)) {
            _logger.finest("ReplicationMessageRouter:syncPointApps");
        }
        RollingUpgradeContext ctx = new RollingUpgradeContext();
        ReplicationManager[] apps = this.getReplicationManagerArray();
        CountDownLatch doneSignal = new CountDownLatch(apps.length);
        for(int i=0; i<apps.length; i++) {
            /*
            if(ReplicationHealthChecker.isStopping()) {
                break;
            }
             */
            ReplicationManager nextMgr = apps[i];
            handleSyncPointApps(nextMgr, waitTime, doneSignal, ctx);
        }
        try {
            //wait time is passed into the command
            doneSignal.await(waitTime, TimeUnit.SECONDS);
            if(doneSignal.getCount() != 0) {
                throw new RollingUpgradeException(ctx.getErrors());
            }
        } catch(InterruptedException ex) {
            ;
        } finally {
            if(doneSignal.getCount() != 0) {
                _logger.log(Level.WARNING, ctx.getErrors());
            } else {
                if (_logger.isLoggable(Level.FINE)) {
                    _logger.fine("ReplicationMessageRouter>>syncPointApps successful after a wait: wait time = " + (System.currentTimeMillis() - ctx.getStartTime()));
                }
            }
        }
    }

    public void handleSyncPointApps(ReplicationManager mgr, long waitTime, CountDownLatch doneSignal, RollingUpgradeContext ctx) {
        SyncPointSaveHandler handler
                = new SyncPointSaveHandler(mgr, waitTime, doneSignal, ctx);
        RollingUpgradeHandler.executeTask(handler);
    }
   
    public void reconcileNetworkPartitionedApps(long waitTime) {
       
        if (_logger.isLoggable(Level.FINEST)) {
            _logger.finest("ReplicationMessageRouter:reconcileNetworkPartitionedApps");
        }
        RollingUpgradeContext ctx = new RollingUpgradeContext();
        ReplicationManager[] apps = this.getReplicationManagerArray();
        CountDownLatch doneSignal = new CountDownLatch(apps.length);
        for(int i=0; i<apps.length; i++) {
            /*
            if(ReplicationHealthChecker.isStopping()) {
                break;
            }
             */
            ReplicationManager nextMgr = apps[i];
            nextMgr.doPostJoinReconciliation(waitTime, ctx);
        }       
    }

    public void restoreApps(long waitTime, String rollingUpgradeType) throws RollingUpgradeException {
        //System.out.println("entering restoreApps:waitTime = " + waitTime + " seconds");
        if (_logger.isLoggable(Level.FINEST)) {
            _logger.finest("ReplicationMessageRouter:restoreApps");
        }
        RollingUpgradeContext ctx = new RollingUpgradeContext();
        ReplicationManager[] apps = this.getReplicationManagerArray();
        CountDownLatch doneSignal = new CountDownLatch(apps.length);
        for(int i=0; i<apps.length; i++) {
            /*
            if(ReplicationHealthChecker.isStopping()) {
                break;
            }
             */
            ReplicationManager nextMgr = apps[i];
            handleSyncPointRestoreApps(nextMgr, rollingUpgradeType, waitTime, doneSignal, ctx);
        }
        try {
            //wait time is passed into the command
            doneSignal.await(waitTime, TimeUnit.SECONDS);
            if(doneSignal.getCount() != 0) {
                throw new RollingUpgradeException(ctx.getErrors());
            }
        } catch(InterruptedException ex) {
            ;
        } finally {
            _logger.info(ctx.getMessages());
            if(doneSignal.getCount() != 0) {
                _logger.log(Level.WARNING, ctx.getErrors());
            } else {
                if (_logger.isLoggable(Level.FINE)) {
                    _logger.fine("ReplicationMessageRouter>>restoreApps successful after a wait: wait time = " + (System.currentTimeMillis() - ctx.getStartTime()));
                }
            }
        }
    }

    public void handleSyncPointRestoreApps(ReplicationManager mgr, String rollingUpgradeType, long waitTime, CountDownLatch doneSignal, RollingUpgradeContext ctx) {
        SyncPointLoadHandler handler
                = new SyncPointLoadHandler(mgr, rollingUpgradeType, waitTime, doneSignal, ctx);
        RollingUpgradeHandler.executeTask(handler);
    }

    public void reconcileApps(long waitTime) throws RollingUpgradeException {
        //System.out.println("entering reconcileApps:waitTime = " + waitTime + " seconds");
        if (_logger.isLoggable(Level.FINEST)) {
            _logger.finest("ReplicationMessageRouter:reconcileApps");
        }
        RollingUpgradeContext ctx = new RollingUpgradeContext();
        ReplicationManager[] apps = this.getReplicationManagerArray();
        CountDownLatch doneSignal = new CountDownLatch(apps.length);
        for(int i=0; i<apps.length; i++) {
            /*
            if(ReplicationHealthChecker.isStopping()) {
                break;
            }
             */
            ReplicationManager nextMgr = apps[i];
            handleSyncPointReconcileApps(nextMgr, waitTime, doneSignal, ctx);
        }
        try {
            //wait time is passed into the command
            doneSignal.await(waitTime, TimeUnit.SECONDS);
            if(doneSignal.getCount() != 0) {
                throw new RollingUpgradeException(ctx.getErrors());
            }
        } catch(InterruptedException ex) {
            ;
        } finally {
            _logger.info(ctx.getMessages());
            if(doneSignal.getCount() != 0) {
                _logger.log(Level.WARNING, ctx.getErrors());
            } else {
                if (_logger.isLoggable(Level.FINE)) {
                    _logger.fine("ReplicationMessageRouter>>reconcileApps successful after a wait: wait time = " + (System.currentTimeMillis() - ctx.getStartTime()));
                }
            }
        }
    }

    public void handleSyncPointReconcileApps(ReplicationManager mgr, long waitTime, CountDownLatch doneSignal, RollingUpgradeContext ctx) {
        SyncPointReconciliationHandler handler
                = new SyncPointReconciliationHandler(mgr, waitTime, doneSignal, ctx);
        RollingUpgradeHandler.executeTask(handler);
    }
   
    public void processMessage(ReplicationState state) {
        if (_logger.isLoggable(Level.FINE)) {
            _logger.fine("ReplicationMessageRouter:processMessage:state=" + state);
        }       
        //send ack if not a return msg and is a void return
        if(!state.isReturnMessage() && state.isVoidMethodReturnState()) {
            //FIXME: can send acknowledgement back immediately
        }
        //this is used only for health check message
        if(state.isHCReturnMessage()) {           
            //bypass normal routing in this case that is
            //why we return here
            processResponse(state);
            return;
        }
        //otherwise do normal routing to app
        this.routeMessageForApp(state.getAppId(), state);
    }
   
    public void processResponse(ReplicationState message) {
        //complete processing response - not sending response to a response
        if(_logger.isLoggable(Level.FINE)) {
            _logger.fine("IN" + this.getClass().getName() + ">>processResponse");           
        }       
        ReplicationResponseRepository.putEntry(message);
    }
   
    public void processQueryResponse(ReplicationState message) {
        //complete processing response - not sending response to a response
        if(_logger.isLoggable(Level.FINE)) {
            _logger.fine("IN" + this.getClass().getName() + ">>processResponse");           
        }       
        ReplicationResponseRepository.putFederatedEntry(message);
    }

    public ReplicationState processUnicastQueryResponse(ReplicationState queryResponseState) {
        //complete processing response - not sending response to a response
        if(_logger.isLoggable(Level.FINE)) {
            _logger.fine("IN" + this.getClass().getName() + ">>processUnicastQueryResponse");
        }
        String originalCommand = queryResponseState.getExtraParam();
        if(_logger.isLoggable(Level.FINE)) {
            _logger.fine("processUnicastQueryResponse:originalCommand = "
                + originalCommand);
        }
        ReplicationResponseRepository.putEntry(queryResponseState, originalCommand);
        return queryResponseState;
    }
   
    public void routeMessageForApp(String appName, ReplicationState message) {
        if (_logger.isLoggable(Level.FINEST)) {
            _logger.finest("ReplicationMessageRouter:routeMessageForApp" + appName);
        }       

        ReplicationManager mgr = null;
        if((mgr = this.findApp(appName)) != null) {
            mgr.processMessage(message);
            return;
        }
    }
   
    public void eagerlyLoadRoutingMap() {
        if (_logger.isLoggable(Level.FINEST)) {
            _logger.finest("ReplicationMessageRouter>>eagerlyLoadRoutingMap");
        }       
        ReplicationManager mgr = null;

        boolean continueProcessing = true;
        if(_embedded == null) {
            return;
        }
        try {
            Engine[] engines = _embedded.getEngines();
            for(int h=0; h<engines.length; h++) {
                Container engine = (Container) engines[h];
                Container[] hosts = engine.findChildren();
                for(int i=0; i<hosts.length; i++) {
                    Container nextHost = hosts[i];
                    Container [] webModules = nextHost.findChildren();
                    for (int j=0; j<webModules.length; j++) {
                        Container nextWebModule = webModules[j];
                        Context ctx = (Context)nextWebModule;
                        //this code gets managers
                        Manager nextManager = nextWebModule.getManager();                      
                        if(nextManager instanceof ReplicationManager) {
                            //let this manager process the message
                            //if it is a ReplicationManager
                            //and app names match
                            String nextAppName = ((ReplicationManagerBase)nextManager).getApplicationId();
                            if (_logger.isLoggable(Level.FINEST)) {
                                _logger.finest("ReplicationMessageRouter:eagerlyLoadRoutingMap:nextAppName = " + nextAppName);
                            }                           
                            if (_logger.isLoggable(Level.FINEST)) {
                                _logger.finest("ReplicationMessageRouter:eagerlyLoadRoutingMap:found our manager:" + nextManager.getClass().getName());
                            }
                            this.addReplicationManager(nextAppName, (ReplicationManager)nextManager);

                        }
                    }
                    if(!continueProcessing) {
                        break;
                    }
                    //now get ReplicationManager valves installed in virtual hosts
                    Valve[] valves = ((ContainerBase)nextHost).getValves();                  
                    for(int k=0; k<valves.length; k++) {
                        Valve nextValve = valves[k];
                        if (_logger.isLoggable(Level.FINEST)) {
                            _logger.finest("ReplicationMessageRouter:eagerlyLoadRoutingMap:VALVE = " + nextValve.getClass().getName());
                        }                                                                 
                        if(nextValve instanceof ReplicationManager) {
                            //let this manager process the message
                            //if it is a ReplicationManager
                            //and app names match
                            String nextAppName = ((ReplicationManager)nextValve).getApplicationId();                           
                            if (_logger.isLoggable(Level.FINEST)) {
                                _logger.finest("ReplicationMessageRouter:eagerlyLoadRoutingMap:found our manager:" + nextValve.getClass().getName());
                            }                                                                
                            this.addReplicationManager(nextAppName, (ReplicationManager)nextValve);                                                        
                        }                      
                    }                   
                   
                }
                if(!continueProcessing) {
                    break;
                }               
            }
        } catch (Throwable th) {
            _logger.log(Level.SEVERE, "Exception thrown", th);
        }      
               
    }   
   
    public void processQueryMessage(ReplicationState message,
                                    String returnInstance) {
        if (_logger.isLoggable(Level.FINER)) {
            _logger.entering("ReplicationMessageRouter", "processQueryMessage",
                             new Object[] { message, returnInstance});
        }
        if(isPurgeAdvisoryMessage(message)) {
            processPurgeAdvisoryMessage(message);
            return;
        }
        if(isApplicationStatusQueryMessage(message)) {
            processApplicationStatusQueryMessage(message);
            return;
       
        if(isNetworkPartitionAdvisoryMessage(message)) {
            processNetworkPartitionAdvisoryMessage(message);
            return;
        }       
        this.routeQueryMessageForApp(message.getAppId(), message, returnInstance);
    }
   
    boolean isNetworkPartitionAdvisoryMessage(ReplicationState message) {
        return((ReplicationState.MESSAGE_BROADCAST_NETWORK_PARTITION_ADVISORY).equalsIgnoreCase(message.getCommand()));
    }
   
    void processNetworkPartitionAdvisoryMessage(ReplicationState message) {
        //first reconnect to our replica partner
        ReplicationHealthChecker healthChecker
            = ReplicationHealthChecker.getInstance();
        String replicaPartnerInstance
            = healthChecker.getReshapeReplicateToInstanceName(null, 0L);       
        JoinNotificationEventHandler.checkAndDoJoinFor(replicaPartnerInstance);
        //then reconcile our caches
        reconcileNetworkPartitionedApps(0L);   //todo figure out proper wait time if really needed
    }
   
    boolean isPurgeAdvisoryMessage(ReplicationState message) {
        return((ReplicationState.MESSAGE_BROADCAST_PURGE_ADVISORY).equalsIgnoreCase(message.getCommand()));
    }
   
    void processPurgeAdvisoryMessage(ReplicationState message) {
        //owning instance is extraParam in this message
        if(message == null) {
            return;
        }       
        String owningInstanceName = message.getExtraParam();
        if(owningInstanceName == null) {
            return;
        }
        ReplicationHealthChecker healthChecker
            = ReplicationHealthChecker.getInstance();
        String ourReplicationOwner
            = healthChecker.getReshapeReplicateFromInstanceName();
        //do not proceed if we are the replica partner
        //for the owning instance
        if(ourReplicationOwner == null) {
            return;
        }
        if(ourReplicationOwner.equalsIgnoreCase(owningInstanceName)) {
            return;
        }       

        purgeOnCurrentThread(owningInstanceName);
    }
   
    private void purge(String owningInstanceName) {
        if(ReplicationHealthChecker.isStopping()) {
            return;
        }       
        //do the purge on background thread
        Thread purgeThread = new Thread(new JxtaPurge(owningInstanceName));
        purgeThread.setDaemon(true);
        purgeThread.start();      
    }
   
    void purgeOnCurrentThread(String owningInstanceName) {
        if (_logger.isLoggable(Level.FINE)) {
            _logger.fine("purgeOnCurrentThread");
        }        
        //if cluster is stopping do not proceed
        if (_logger.isLoggable(Level.FINE)) {
            _logger.fine("purgeOnCurrentThread:will skip if ClusterStopping: " + ReplicationHealthChecker.isClusterStopping());
        }        
        if(ReplicationHealthChecker.isClusterStopping()) {
            return;
        }
        if (_logger.isLoggable(Level.FINE)) {
            _logger.fine("purgeOnCurrentThread: setting flushing true");
        }       
        ReplicationHealthChecker.setFlushing(true);
        try {
            this.purge(owningInstanceName, System.currentTimeMillis());
        } finally {
            ReplicationHealthChecker.setFlushing(false);
        }
    }
   
    boolean isApplicationStatusQueryMessage(ReplicationState message) {
        return((ReplicationState.MESSAGE_APPLICATION_STATUS_QUERY).equalsIgnoreCase(message.getCommand()));
    }
   
    void processApplicationStatusQueryMessage(ReplicationState message) {
        //find out if app is registered in router
        //sourceInstanceName returned in _extraParam String
        //"Y" or "N" answer returned in _queryResult String
        String isAppRegistered = "Y";
        String applicationId = message.getAppId();
        if(_logger.isLoggable(Level.FINE)) {
            _logger.fine("processApplicationStatusQueryMessage:appId: " + applicationId + " found: " + (findApp(applicationId) == null));
        }
        if(findApp(applicationId) == null) {
            isAppRegistered = "N";           
        }
        ReplicationState responseState
            = ReplicationState.createQueryResponseFrom(message, ReplicationUtil.getInstanceName(), isAppRegistered);
        sendQueryResponse(responseState, message.getInstanceName());
    }
   
    protected ReplicationState sendQueryResponse(ReplicationState transmitState, String returnInstance) {
        JxtaReplicationSender replicationSender =
                JxtaReplicationSender.createInstance();
        ReplicationState resultState =
                replicationSender.sendReplicationStateQueryResponse(transmitState, returnInstance);
        return resultState;
    }   
   
    public void routeQueryMessageForApp(String appName,
                                        ReplicationState message,
                                        String returnInstance) {
        if (_logger.isLoggable(Level.FINEST)) {
            _logger.finest("ReplicationMessageRouter:routeQueryMessageForApp" + appName);
        }
        ReplicationManager mgr = null;
        if((mgr = this.findApp(appName)) != null) {
            mgr.processQueryMessage(message, returnInstance);
            return;
        }
    }
   
    public boolean isBroadcastMethod(String theCommand) {
        return (broadcastMethodMap.get(theCommand) != null);
    }
   
    public void registerBroadcastMethodList(List aListOfBroadcastMethods) {
        for(int i=0; i<aListOfBroadcastMethods.size(); i++ ) {
            String nextMethod = (String)aListOfBroadcastMethods.get(i);
            broadcastMethodMap.put(nextMethod, nextMethod);
        }
    }

    boolean isExpensiveMethod(String theCommand) {
        return (expensiveMethodMap.get(theCommand) != null);
    }

    public void registerExpensiveMethodList(List aListOfExpensiveMethods) {
        for(int i=0; i<aListOfExpensiveMethods.size(); i++ ) {
            String nextMethod = (String)aListOfExpensiveMethods.get(i);
            expensiveMethodMap.put(nextMethod, nextMethod);
        }
    }

    boolean isRemoveMethod(String theCommand) {
        return (removeMethodMap.get(theCommand) != null);
    }

    boolean isSaveMethod(String theCommand) {
  return saveMethodSet.contains(theCommand);
    }

    public void registerRemoveMethodList(List aListOfRemoveMethods) {
        for(int i=0; i<aListOfRemoveMethods.size(); i++ ) {
            String nextMethod = (String)aListOfRemoveMethods.get(i);
            removeMethodMap.put(nextMethod, nextMethod);
        }
    }
   
    public void registerSaveMethodList(List l) {
  for (Object o : l) {
      saveMethodSet.add(o);
  }
    }
}
TOP

Related Classes of com.sun.enterprise.ee.web.sessmgmt.ReplicationMessageRouter

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.