/*
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS HEADER.
*
* Copyright 1997-2007 Sun Microsystems, Inc. All rights reserved.
*
* The contents of this file are subject to the terms of either the GNU
* General Public License Version 2 only ("GPL") or the Common Development
* and Distribution License("CDDL") (collectively, the "License"). You
* may not use this file except in compliance with the License. You can obtain
* a copy of the License at https://glassfish.dev.java.net/public/CDDL+GPL.html
* or glassfish/bootstrap/legal/LICENSE.txt. See the License for the specific
* language governing permissions and limitations under the License.
*
* When distributing the software, include this License Header Notice in each
* file and include the License file at glassfish/bootstrap/legal/LICENSE.txt.
* Sun designates this particular file as subject to the "Classpath" exception
* as provided by Sun in the GPL Version 2 section of the License file that
* accompanied this code. If applicable, add the following below the License
* Header, with the fields enclosed by brackets [] replaced by your own
* identifying information: "Portions Copyrighted [year]
* [name of copyright owner]"
*
* Contributor(s):
*
* If you wish your version of this file to be governed by only the CDDL or
* only the GPL Version 2, indicate your decision by adding "[Contributor]
* elects to include this software in this distribution under the [CDDL or GPL
* Version 2] license." If you don't indicate a single choice of license, a
* recipient has the option to distribute your version of this file under
* either the CDDL, the GPL Version 2 or to extend the choice of license to
* its licensees as provided above. However, if you add GPL Version 2 code
* and therefore, elected the GPL Version 2 license, then the option applies
* only if the new code is made subject to such option by the copyright
* holder.
*/
/*
* ReplicationMessageRouter.java
*
* Created on October 12, 2006, 12:03 PM
*
*/
package com.sun.enterprise.ee.web.sessmgmt;
import java.security.SecurityPermission;
import java.util.Collection;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.catalina.Context;
import org.apache.catalina.Container;
import org.apache.catalina.Engine;
import org.apache.catalina.Manager;
import org.apache.catalina.Valve;
import org.apache.catalina.core.ContainerBase;
import com.sun.appserv.ha.util.RepairManager;
import com.sun.appserv.ha.util.RepairRegistry;
import com.sun.enterprise.ee.web.sessmgmt.RepairAgent;
import com.sun.enterprise.ee.web.sessmgmt.ReplicationState;
import com.sun.enterprise.web.EmbeddedWebContainer;
/**
*
* @author Larry White
*/
public class ReplicationMessageRouter implements RepairAgent, PurgeAgent {
public final static String LOGGER_MEM_REP
= ReplicationState.LOGGER_MEM_REP;
/**
* The logger to use for logging ALL web container related messages.
*/
//protected static final Logger _logger
// = LogDomains.getLogger(LogDomains.WEB_LOGGER);
private static final Logger _logger
= Logger.getLogger(LOGGER_MEM_REP);
/**
* applicationId vs. logical ReplicationManager
* for web: web appid vs. Manager
* for sso: virtual server id vs. SingleSignOn valve
* for ejb: containerid vs. container
*/
protected static final ConcurrentHashMap _appId2Container
= new ConcurrentHashMap();
/**
* The singleton instance of ReplicationMessageRouter
*/
private static ReplicationMessageRouter _soleInstance = null;
/**
* The singleton instance of ReplicationMessageRouter
*/
private static final RepairRegistry _repairRegistry
= RepairRegistry.getInstance();
private static final SecurityPermission REPLICATION_MESSAGE_ROUTER_PERMISSION =
new SecurityPermission("getReplicationMessageRouter");
private final static String MESSAGE_BROADCAST_QUERY
= ReplicationState.MESSAGE_BROADCAST_QUERY;
private final static String RETURN_BROADCAST_MSG_COMMAND
= ReplicationState.RETURN_BROADCAST_MSG_COMMAND;
private final static String RETURN_UNICAST_MSG_COMMAND
= ReplicationState.RETURN_UNICAST_MSG_COMMAND;
private final static String MESSAGE_BROADCAST_LOAD_RECEIVED
= ReplicationState.MESSAGE_BROADCAST_LOAD_RECEIVED;
private final static String MESSAGE_BROADCAST_PURGE_ADVISORY
= ReplicationState.MESSAGE_BROADCAST_PURGE_ADVISORY;
private final static String MESSAGE_APPLICATION_STATUS_QUERY
= ReplicationState.MESSAGE_APPLICATION_STATUS_QUERY;
private final static String MESSAGE_BROADCAST_NETWORK_PARTITION_ADVISORY
= ReplicationState.MESSAGE_BROADCAST_NETWORK_PARTITION_ADVISORY;
private final static long DEFAULT_SLEEP_TIME = 15 * 1000L; //15 seconds
private static final HashMap broadcastMethodMap = new HashMap();
private static final HashMap expensiveMethodMap = new HashMap();
private static final HashMap removeMethodMap = new HashMap();
private static final HashSet saveMethodSet = new HashSet();
static{
broadcastMethodMap.put(MESSAGE_BROADCAST_QUERY, MESSAGE_BROADCAST_QUERY);
broadcastMethodMap.put(RETURN_BROADCAST_MSG_COMMAND, RETURN_BROADCAST_MSG_COMMAND);
broadcastMethodMap.put(RETURN_UNICAST_MSG_COMMAND, RETURN_UNICAST_MSG_COMMAND);
broadcastMethodMap.put(MESSAGE_BROADCAST_LOAD_RECEIVED, MESSAGE_BROADCAST_LOAD_RECEIVED);
broadcastMethodMap.put(MESSAGE_BROADCAST_PURGE_ADVISORY, MESSAGE_BROADCAST_PURGE_ADVISORY);
broadcastMethodMap.put(MESSAGE_APPLICATION_STATUS_QUERY, MESSAGE_APPLICATION_STATUS_QUERY);
broadcastMethodMap.put(MESSAGE_BROADCAST_NETWORK_PARTITION_ADVISORY, MESSAGE_BROADCAST_NETWORK_PARTITION_ADVISORY);
}
/**
* The embedded Catalina object.
*/
protected EmbeddedWebContainer _embedded = null;
/**
* a monitor obj for synchronization
*/
private static final Object _monitor = new Object();
/** Return the singleton instance
* assume it already exists
*/
public static ReplicationMessageRouter createInstance() {
if(_soleInstance != null) {
return _soleInstance;
}
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkPermission(REPLICATION_MESSAGE_ROUTER_PERMISSION);
}
synchronized (_monitor) {
if (_soleInstance == null) {
_soleInstance = new ReplicationMessageRouter();
}
}
return _soleInstance;
}
/** Return the singleton instance
* lazily creates a new instance of ReplicationMessageRouter if not created yet
* @param embedded the embedded web container
*/
public static ReplicationMessageRouter createInstance(EmbeddedWebContainer embedded) {
SecurityManager sm = System.getSecurityManager();
if (sm != null) {
sm.checkPermission(REPLICATION_MESSAGE_ROUTER_PERMISSION);
}
synchronized (_monitor) {
if (_soleInstance == null) {
_soleInstance = new ReplicationMessageRouter(embedded);
} else {
_soleInstance._embedded = embedded;
}
}
return _soleInstance;
}
/** Creates a new instance of ReplicationMessageRouter
* @param embedded the embedded web container
*/
public ReplicationMessageRouter(EmbeddedWebContainer embedded) {
_embedded = embedded;
}
/** Creates a new instance of ReplicationMessageRouter
*/
public ReplicationMessageRouter() {
}
public void addReplicationManager(String appid, ReplicationManager container) {
_appId2Container.put(appid, container);
// Fix for sailfin restart related issues eg., 1499
}
public ReplicationManager removeReplicationManager(String appid) {
return (ReplicationManager) _appId2Container.remove(appid);
}
private ReplicationManager getReplicationManager(String appid) {
return (ReplicationManager) _appId2Container.get(appid);
}
/**
* Returns all the ReplicationManagers available in this registry.
*
* @return a collection of ReplicationManagers
*/
private static Collection getReplicationManagers() {
Collection replicationManagers = null;
if (_appId2Container != null) {
synchronized(_appId2Container) {
replicationManagers = _appId2Container.values();
}
}
return replicationManagers;
}
public ReplicationManager[] getReplicationManagerArray() {
ReplicationManager results[] = null;
synchronized (_appId2Container) {
results = new ReplicationManager[_appId2Container.size()];
results = (ReplicationManager[]) _appId2Container.values().toArray(results);
}
return (results);
}
public void checkAppDeployment(String applicationId) {
long startTime = System.currentTimeMillis();
ApplicationStatusChecker appStatusChecker
= new ApplicationStatusChecker();
long sleepTime = 100L;
boolean shouldContinue = true;
while(shouldContinue) {
if((System.currentTimeMillis() - startTime) > 60000) {
_logger.warning("checkAppDeployment: Timed out waiting for replicas application" + applicationId + " to finished deploying");
shouldContinue = false;
break;
}
boolean appStatus = appStatusChecker.isApplicationDeployed(applicationId, "WEB");
if(_logger.isLoggable(Level.FINE)) {
_logger.fine("app:" + applicationId + " isDeployed: " + appStatus);
}
if(appStatus) {
shouldContinue = false;
} else {
try {
Thread.currentThread().sleep(sleepTime);
} catch (InterruptedException ex) {
;
}
}
sleepTime *= 2;
}
if(_logger.isLoggable(Level.FINE)) {
_logger.fine("checkAppDeployment for application " + applicationId + "took " + (System.currentTimeMillis() - startTime) + "ms");
}
}
/**
* Returns all the applicationIds available in this registry.
*
* @return a collection of applicationIds
*/
public static Enumeration getReplicationAppIds(boolean printIt) {
Enumeration applicationIds = null;
if (_appId2Container != null) {
synchronized(_appId2Container) {
applicationIds = _appId2Container.keys();
}
}
//for testing
//if(printIt && _logger.isLoggable(Level.FINE)) {
if(printIt) {
int i=0;
while(applicationIds.hasMoreElements()) {
String nextAppId = (String)applicationIds.nextElement();
_logger.fine("appid-key[" + i + "]= " + nextAppId +
" appid-key-length[" + i + "]= " + nextAppId.length());
i++;
}
}
//end for testing
return applicationIds;
}
public ReplicationManager findApp(String appName) /*throws IOException*/ {
// if (_logger.isLoggable(Level.FINER)) {
// _logger.finer("ReplicationMessageRouter>>findApp:appName = " + appName +
// " length = " + appName.length());
// }
if (appName == null)
return (null);
ReplicationManager mgr = getReplicationManager(appName);
// if (_logger.isLoggable(Level.FINER)) {
// _logger.finer("ReplicationMessageRouter>>findApp:mgr = " + mgr);
// }
return (mgr);
}
public void repairApps(long repairStartTime) {
if (_logger.isLoggable(Level.FINEST)) {
_logger.finest("ReplicationMessageRouter:repairApps");
}
ReplicationManager[] apps = this.getReplicationManagerArray();
for(int i=0; i<apps.length; i++) {
if(ReplicationHealthChecker.isStopping()) {
break;
}
ReplicationManager nextMgr = apps[i];
nextMgr.repair(repairStartTime);
}
//now consult _repairRegistry
RepairManager[] repairApps = _repairRegistry.getRepairManagerArray();
for(int i=0; i<repairApps.length; i++) {
if(ReplicationHealthChecker.isStopping()) {
break;
}
RepairManager nextMgr = repairApps[i];
nextMgr.repair(repairStartTime);
}
}
public void repairApps(long repairStartTime, boolean checkForStopping) {
if (_logger.isLoggable(Level.FINEST)) {
_logger.finest("ReplicationMessageRouter:repairApps: checkForStopping: " + checkForStopping);
}
if(!checkForStopping) {
eagerlyLoadRoutingMap();
}
ReplicationManager[] apps = this.getReplicationManagerArray();
//if (_logger.isLoggable(Level.FINEST)) {
// _logger.finest("ReplicationMessageRouter:apps length: " + apps.length);
//}
for(int i=0; i<apps.length; i++) {
if(checkForStopping && ReplicationHealthChecker.isStopping()) {
break;
}
ReplicationManager nextMgr = apps[i];
//if (_logger.isLoggable(Level.FINEST)) {
// _logger.finest("ReplicationMessageRouter:repairApps: nextMgr: " + nextMgr);
//}
nextMgr.repair(repairStartTime, checkForStopping);
}
//now consult _repairRegistry
RepairManager[] repairApps = _repairRegistry.getRepairManagerArray();
for(int i=0; i<repairApps.length; i++) {
if(checkForStopping && ReplicationHealthChecker.isStopping()) {
break;
}
RepairManager nextMgr = repairApps[i];
nextMgr.repair(repairStartTime);
}
}
public void respondToFailure(String failedInstanceName, boolean checkForStopping) {
if (_logger.isLoggable(Level.FINEST)) {
_logger.finest("ReplicationMessageRouter:respondToFailure: failedInstance: " + failedInstanceName + " checkForStopping: " + checkForStopping);
}
if(!checkForStopping) {
eagerlyLoadRoutingMap();
}
ReplicationManager[] apps = this.getReplicationManagerArray();
for(int i=0; i<apps.length; i++) {
if(checkForStopping && ReplicationHealthChecker.isStopping()) {
break;
}
ReplicationManager nextMgr = apps[i];
nextMgr.respondToFailure(failedInstanceName, checkForStopping);
}
}
public void purge(String owningInstanceName, long purgeStartTime) {
if (_logger.isLoggable(Level.FINEST)) {
_logger.finest("ReplicationMessageRouter:purge");
}
ReplicationManager[] apps = this.getReplicationManagerArray();
for(int i=0; i<apps.length; i++) {
if(ReplicationHealthChecker.isStopping()) {
break;
}
ReplicationManager nextMgr = apps[i];
if(nextMgr instanceof PurgeCapable) {
((PurgeCapable)nextMgr).purge(owningInstanceName, purgeStartTime);
}
}
}
public void handleDynamicOwnershipChanges(ReplicationManager mgr,
DynamicOwnershipManager.Event event,
String triggeringInstance,
boolean isLbStateChange
) {
if (mgr instanceof DynamicOwnershipManager) {
DynamicOwnershipChangeHandler handler
= new DynamicOwnershipChangeHandler(
(DynamicOwnershipManager) mgr, event,
triggeringInstance, isLbStateChange);
Thread handleDynamicOwnershipChangeThread
= new Thread(handler);
handleDynamicOwnershipChangeThread.setDaemon(true);
handleDynamicOwnershipChangeThread.start();
}
}
public void handleDynamicOwnershipChanges(DynamicOwnershipManager.Event event,
String triggeringInstance,
boolean isLbStateChange) {
if (_logger.isLoggable(Level.FINEST)) {
_logger.finest("ReplicationMessageRouter:handleDynamicOwnershipChanges");
}
List<String> lbEnabledList = ExpatListHandler.getLbEnabledList(
triggeringInstance, event, isLbStateChange);
// skip triggering expat calculation in the lb-disabled instance.
if (lbEnabledList.contains(ReplicationUtil.getInstanceName())) {
ReplicationManager[] apps = this.getReplicationManagerArray();
for (int i = 0; i < apps.length; i++) {
if (ReplicationHealthChecker.isStopping()) {
break;
}
ReplicationManager nextMgr = apps[i];
handleDynamicOwnershipChanges(nextMgr, event,
triggeringInstance, isLbStateChange);
}
}
}
public void syncPointApps(long waitTime) throws RollingUpgradeException {
//System.out.println("entering syncPointApps:waitTime = " + waitTime + " seconds");
if (_logger.isLoggable(Level.FINEST)) {
_logger.finest("ReplicationMessageRouter:syncPointApps");
}
RollingUpgradeContext ctx = new RollingUpgradeContext();
ReplicationManager[] apps = this.getReplicationManagerArray();
CountDownLatch doneSignal = new CountDownLatch(apps.length);
for(int i=0; i<apps.length; i++) {
/*
if(ReplicationHealthChecker.isStopping()) {
break;
}
*/
ReplicationManager nextMgr = apps[i];
handleSyncPointApps(nextMgr, waitTime, doneSignal, ctx);
}
try {
//wait time is passed into the command
doneSignal.await(waitTime, TimeUnit.SECONDS);
if(doneSignal.getCount() != 0) {
throw new RollingUpgradeException(ctx.getErrors());
}
} catch(InterruptedException ex) {
;
} finally {
if(doneSignal.getCount() != 0) {
_logger.log(Level.WARNING, ctx.getErrors());
} else {
if (_logger.isLoggable(Level.FINE)) {
_logger.fine("ReplicationMessageRouter>>syncPointApps successful after a wait: wait time = " + (System.currentTimeMillis() - ctx.getStartTime()));
}
}
}
}
public void handleSyncPointApps(ReplicationManager mgr, long waitTime, CountDownLatch doneSignal, RollingUpgradeContext ctx) {
SyncPointSaveHandler handler
= new SyncPointSaveHandler(mgr, waitTime, doneSignal, ctx);
RollingUpgradeHandler.executeTask(handler);
}
public void reconcileNetworkPartitionedApps(long waitTime) {
if (_logger.isLoggable(Level.FINEST)) {
_logger.finest("ReplicationMessageRouter:reconcileNetworkPartitionedApps");
}
RollingUpgradeContext ctx = new RollingUpgradeContext();
ReplicationManager[] apps = this.getReplicationManagerArray();
CountDownLatch doneSignal = new CountDownLatch(apps.length);
for(int i=0; i<apps.length; i++) {
/*
if(ReplicationHealthChecker.isStopping()) {
break;
}
*/
ReplicationManager nextMgr = apps[i];
nextMgr.doPostJoinReconciliation(waitTime, ctx);
}
}
public void restoreApps(long waitTime, String rollingUpgradeType) throws RollingUpgradeException {
//System.out.println("entering restoreApps:waitTime = " + waitTime + " seconds");
if (_logger.isLoggable(Level.FINEST)) {
_logger.finest("ReplicationMessageRouter:restoreApps");
}
RollingUpgradeContext ctx = new RollingUpgradeContext();
ReplicationManager[] apps = this.getReplicationManagerArray();
CountDownLatch doneSignal = new CountDownLatch(apps.length);
for(int i=0; i<apps.length; i++) {
/*
if(ReplicationHealthChecker.isStopping()) {
break;
}
*/
ReplicationManager nextMgr = apps[i];
handleSyncPointRestoreApps(nextMgr, rollingUpgradeType, waitTime, doneSignal, ctx);
}
try {
//wait time is passed into the command
doneSignal.await(waitTime, TimeUnit.SECONDS);
if(doneSignal.getCount() != 0) {
throw new RollingUpgradeException(ctx.getErrors());
}
} catch(InterruptedException ex) {
;
} finally {
_logger.info(ctx.getMessages());
if(doneSignal.getCount() != 0) {
_logger.log(Level.WARNING, ctx.getErrors());
} else {
if (_logger.isLoggable(Level.FINE)) {
_logger.fine("ReplicationMessageRouter>>restoreApps successful after a wait: wait time = " + (System.currentTimeMillis() - ctx.getStartTime()));
}
}
}
}
public void handleSyncPointRestoreApps(ReplicationManager mgr, String rollingUpgradeType, long waitTime, CountDownLatch doneSignal, RollingUpgradeContext ctx) {
SyncPointLoadHandler handler
= new SyncPointLoadHandler(mgr, rollingUpgradeType, waitTime, doneSignal, ctx);
RollingUpgradeHandler.executeTask(handler);
}
public void reconcileApps(long waitTime) throws RollingUpgradeException {
//System.out.println("entering reconcileApps:waitTime = " + waitTime + " seconds");
if (_logger.isLoggable(Level.FINEST)) {
_logger.finest("ReplicationMessageRouter:reconcileApps");
}
RollingUpgradeContext ctx = new RollingUpgradeContext();
ReplicationManager[] apps = this.getReplicationManagerArray();
CountDownLatch doneSignal = new CountDownLatch(apps.length);
for(int i=0; i<apps.length; i++) {
/*
if(ReplicationHealthChecker.isStopping()) {
break;
}
*/
ReplicationManager nextMgr = apps[i];
handleSyncPointReconcileApps(nextMgr, waitTime, doneSignal, ctx);
}
try {
//wait time is passed into the command
doneSignal.await(waitTime, TimeUnit.SECONDS);
if(doneSignal.getCount() != 0) {
throw new RollingUpgradeException(ctx.getErrors());
}
} catch(InterruptedException ex) {
;
} finally {
_logger.info(ctx.getMessages());
if(doneSignal.getCount() != 0) {
_logger.log(Level.WARNING, ctx.getErrors());
} else {
if (_logger.isLoggable(Level.FINE)) {
_logger.fine("ReplicationMessageRouter>>reconcileApps successful after a wait: wait time = " + (System.currentTimeMillis() - ctx.getStartTime()));
}
}
}
}
public void handleSyncPointReconcileApps(ReplicationManager mgr, long waitTime, CountDownLatch doneSignal, RollingUpgradeContext ctx) {
SyncPointReconciliationHandler handler
= new SyncPointReconciliationHandler(mgr, waitTime, doneSignal, ctx);
RollingUpgradeHandler.executeTask(handler);
}
public void processMessage(ReplicationState state) {
if (_logger.isLoggable(Level.FINE)) {
_logger.fine("ReplicationMessageRouter:processMessage:state=" + state);
}
//send ack if not a return msg and is a void return
if(!state.isReturnMessage() && state.isVoidMethodReturnState()) {
//FIXME: can send acknowledgement back immediately
}
//this is used only for health check message
if(state.isHCReturnMessage()) {
//bypass normal routing in this case that is
//why we return here
processResponse(state);
return;
}
//otherwise do normal routing to app
this.routeMessageForApp(state.getAppId(), state);
}
public void processResponse(ReplicationState message) {
//complete processing response - not sending response to a response
if(_logger.isLoggable(Level.FINE)) {
_logger.fine("IN" + this.getClass().getName() + ">>processResponse");
}
ReplicationResponseRepository.putEntry(message);
}
public void processQueryResponse(ReplicationState message) {
//complete processing response - not sending response to a response
if(_logger.isLoggable(Level.FINE)) {
_logger.fine("IN" + this.getClass().getName() + ">>processResponse");
}
ReplicationResponseRepository.putFederatedEntry(message);
}
public ReplicationState processUnicastQueryResponse(ReplicationState queryResponseState) {
//complete processing response - not sending response to a response
if(_logger.isLoggable(Level.FINE)) {
_logger.fine("IN" + this.getClass().getName() + ">>processUnicastQueryResponse");
}
String originalCommand = queryResponseState.getExtraParam();
if(_logger.isLoggable(Level.FINE)) {
_logger.fine("processUnicastQueryResponse:originalCommand = "
+ originalCommand);
}
ReplicationResponseRepository.putEntry(queryResponseState, originalCommand);
return queryResponseState;
}
public void routeMessageForApp(String appName, ReplicationState message) {
if (_logger.isLoggable(Level.FINEST)) {
_logger.finest("ReplicationMessageRouter:routeMessageForApp" + appName);
}
ReplicationManager mgr = null;
if((mgr = this.findApp(appName)) != null) {
mgr.processMessage(message);
return;
}
}
public void eagerlyLoadRoutingMap() {
if (_logger.isLoggable(Level.FINEST)) {
_logger.finest("ReplicationMessageRouter>>eagerlyLoadRoutingMap");
}
ReplicationManager mgr = null;
boolean continueProcessing = true;
if(_embedded == null) {
return;
}
try {
Engine[] engines = _embedded.getEngines();
for(int h=0; h<engines.length; h++) {
Container engine = (Container) engines[h];
Container[] hosts = engine.findChildren();
for(int i=0; i<hosts.length; i++) {
Container nextHost = hosts[i];
Container [] webModules = nextHost.findChildren();
for (int j=0; j<webModules.length; j++) {
Container nextWebModule = webModules[j];
Context ctx = (Context)nextWebModule;
//this code gets managers
Manager nextManager = nextWebModule.getManager();
if(nextManager instanceof ReplicationManager) {
//let this manager process the message
//if it is a ReplicationManager
//and app names match
String nextAppName = ((ReplicationManagerBase)nextManager).getApplicationId();
if (_logger.isLoggable(Level.FINEST)) {
_logger.finest("ReplicationMessageRouter:eagerlyLoadRoutingMap:nextAppName = " + nextAppName);
}
if (_logger.isLoggable(Level.FINEST)) {
_logger.finest("ReplicationMessageRouter:eagerlyLoadRoutingMap:found our manager:" + nextManager.getClass().getName());
}
this.addReplicationManager(nextAppName, (ReplicationManager)nextManager);
}
}
if(!continueProcessing) {
break;
}
//now get ReplicationManager valves installed in virtual hosts
Valve[] valves = ((ContainerBase)nextHost).getValves();
for(int k=0; k<valves.length; k++) {
Valve nextValve = valves[k];
if (_logger.isLoggable(Level.FINEST)) {
_logger.finest("ReplicationMessageRouter:eagerlyLoadRoutingMap:VALVE = " + nextValve.getClass().getName());
}
if(nextValve instanceof ReplicationManager) {
//let this manager process the message
//if it is a ReplicationManager
//and app names match
String nextAppName = ((ReplicationManager)nextValve).getApplicationId();
if (_logger.isLoggable(Level.FINEST)) {
_logger.finest("ReplicationMessageRouter:eagerlyLoadRoutingMap:found our manager:" + nextValve.getClass().getName());
}
this.addReplicationManager(nextAppName, (ReplicationManager)nextValve);
}
}
}
if(!continueProcessing) {
break;
}
}
} catch (Throwable th) {
_logger.log(Level.SEVERE, "Exception thrown", th);
}
}
public void processQueryMessage(ReplicationState message,
String returnInstance) {
if (_logger.isLoggable(Level.FINER)) {
_logger.entering("ReplicationMessageRouter", "processQueryMessage",
new Object[] { message, returnInstance});
}
if(isPurgeAdvisoryMessage(message)) {
processPurgeAdvisoryMessage(message);
return;
}
if(isApplicationStatusQueryMessage(message)) {
processApplicationStatusQueryMessage(message);
return;
}
if(isNetworkPartitionAdvisoryMessage(message)) {
processNetworkPartitionAdvisoryMessage(message);
return;
}
this.routeQueryMessageForApp(message.getAppId(), message, returnInstance);
}
boolean isNetworkPartitionAdvisoryMessage(ReplicationState message) {
return((ReplicationState.MESSAGE_BROADCAST_NETWORK_PARTITION_ADVISORY).equalsIgnoreCase(message.getCommand()));
}
void processNetworkPartitionAdvisoryMessage(ReplicationState message) {
//first reconnect to our replica partner
ReplicationHealthChecker healthChecker
= ReplicationHealthChecker.getInstance();
String replicaPartnerInstance
= healthChecker.getReshapeReplicateToInstanceName(null, 0L);
JoinNotificationEventHandler.checkAndDoJoinFor(replicaPartnerInstance);
//then reconcile our caches
reconcileNetworkPartitionedApps(0L); //todo figure out proper wait time if really needed
}
boolean isPurgeAdvisoryMessage(ReplicationState message) {
return((ReplicationState.MESSAGE_BROADCAST_PURGE_ADVISORY).equalsIgnoreCase(message.getCommand()));
}
void processPurgeAdvisoryMessage(ReplicationState message) {
//owning instance is extraParam in this message
if(message == null) {
return;
}
String owningInstanceName = message.getExtraParam();
if(owningInstanceName == null) {
return;
}
ReplicationHealthChecker healthChecker
= ReplicationHealthChecker.getInstance();
String ourReplicationOwner
= healthChecker.getReshapeReplicateFromInstanceName();
//do not proceed if we are the replica partner
//for the owning instance
if(ourReplicationOwner == null) {
return;
}
if(ourReplicationOwner.equalsIgnoreCase(owningInstanceName)) {
return;
}
purgeOnCurrentThread(owningInstanceName);
}
private void purge(String owningInstanceName) {
if(ReplicationHealthChecker.isStopping()) {
return;
}
//do the purge on background thread
Thread purgeThread = new Thread(new JxtaPurge(owningInstanceName));
purgeThread.setDaemon(true);
purgeThread.start();
}
void purgeOnCurrentThread(String owningInstanceName) {
if (_logger.isLoggable(Level.FINE)) {
_logger.fine("purgeOnCurrentThread");
}
//if cluster is stopping do not proceed
if (_logger.isLoggable(Level.FINE)) {
_logger.fine("purgeOnCurrentThread:will skip if ClusterStopping: " + ReplicationHealthChecker.isClusterStopping());
}
if(ReplicationHealthChecker.isClusterStopping()) {
return;
}
if (_logger.isLoggable(Level.FINE)) {
_logger.fine("purgeOnCurrentThread: setting flushing true");
}
ReplicationHealthChecker.setFlushing(true);
try {
this.purge(owningInstanceName, System.currentTimeMillis());
} finally {
ReplicationHealthChecker.setFlushing(false);
}
}
boolean isApplicationStatusQueryMessage(ReplicationState message) {
return((ReplicationState.MESSAGE_APPLICATION_STATUS_QUERY).equalsIgnoreCase(message.getCommand()));
}
void processApplicationStatusQueryMessage(ReplicationState message) {
//find out if app is registered in router
//sourceInstanceName returned in _extraParam String
//"Y" or "N" answer returned in _queryResult String
String isAppRegistered = "Y";
String applicationId = message.getAppId();
if(_logger.isLoggable(Level.FINE)) {
_logger.fine("processApplicationStatusQueryMessage:appId: " + applicationId + " found: " + (findApp(applicationId) == null));
}
if(findApp(applicationId) == null) {
isAppRegistered = "N";
}
ReplicationState responseState
= ReplicationState.createQueryResponseFrom(message, ReplicationUtil.getInstanceName(), isAppRegistered);
sendQueryResponse(responseState, message.getInstanceName());
}
protected ReplicationState sendQueryResponse(ReplicationState transmitState, String returnInstance) {
JxtaReplicationSender replicationSender =
JxtaReplicationSender.createInstance();
ReplicationState resultState =
replicationSender.sendReplicationStateQueryResponse(transmitState, returnInstance);
return resultState;
}
public void routeQueryMessageForApp(String appName,
ReplicationState message,
String returnInstance) {
if (_logger.isLoggable(Level.FINEST)) {
_logger.finest("ReplicationMessageRouter:routeQueryMessageForApp" + appName);
}
ReplicationManager mgr = null;
if((mgr = this.findApp(appName)) != null) {
mgr.processQueryMessage(message, returnInstance);
return;
}
}
public boolean isBroadcastMethod(String theCommand) {
return (broadcastMethodMap.get(theCommand) != null);
}
public void registerBroadcastMethodList(List aListOfBroadcastMethods) {
for(int i=0; i<aListOfBroadcastMethods.size(); i++ ) {
String nextMethod = (String)aListOfBroadcastMethods.get(i);
broadcastMethodMap.put(nextMethod, nextMethod);
}
}
boolean isExpensiveMethod(String theCommand) {
return (expensiveMethodMap.get(theCommand) != null);
}
public void registerExpensiveMethodList(List aListOfExpensiveMethods) {
for(int i=0; i<aListOfExpensiveMethods.size(); i++ ) {
String nextMethod = (String)aListOfExpensiveMethods.get(i);
expensiveMethodMap.put(nextMethod, nextMethod);
}
}
boolean isRemoveMethod(String theCommand) {
return (removeMethodMap.get(theCommand) != null);
}
boolean isSaveMethod(String theCommand) {
return saveMethodSet.contains(theCommand);
}
public void registerRemoveMethodList(List aListOfRemoveMethods) {
for(int i=0; i<aListOfRemoveMethods.size(); i++ ) {
String nextMethod = (String)aListOfRemoveMethods.get(i);
removeMethodMap.put(nextMethod, nextMethod);
}
}
public void registerSaveMethodList(List l) {
for (Object o : l) {
saveMethodSet.add(o);
}
}
}