Package org.jboss.remoting.transport.bisocket

Source Code of org.jboss.remoting.transport.bisocket.BisocketServerInvoker

/*     */ package org.jboss.remoting.transport.bisocket;
/*     */
/*     */ import java.io.DataInputStream;
/*     */ import java.io.DataOutputStream;
/*     */ import java.io.IOException;
/*     */ import java.net.InetAddress;
/*     */ import java.net.ServerSocket;
/*     */ import java.net.Socket;
/*     */ import java.util.Collection;
/*     */ import java.util.Collections;
/*     */ import java.util.HashMap;
/*     */ import java.util.HashSet;
/*     */ import java.util.Iterator;
/*     */ import java.util.LinkedList;
/*     */ import java.util.Map;
/*     */ import java.util.Timer;
/*     */ import java.util.TimerTask;
/*     */ import javax.net.ServerSocketFactory;
/*     */ import javax.net.SocketFactory;
/*     */ import org.jboss.logging.Logger;
/*     */ import org.jboss.remoting.InvocationRequest;
/*     */ import org.jboss.remoting.InvokerLocator;
/*     */ import org.jboss.remoting.ServerInvocationHandler;
/*     */ import org.jboss.remoting.invocation.InternalInvocation;
/*     */ import org.jboss.remoting.socketfactory.CreationListenerServerSocket;
/*     */ import org.jboss.remoting.socketfactory.CreationListenerSocketFactory;
/*     */ import org.jboss.remoting.socketfactory.SocketCreationListener;
/*     */ import org.jboss.remoting.transport.PortUtil;
/*     */ import org.jboss.remoting.transport.socket.LRUPool;
/*     */ import org.jboss.remoting.transport.socket.ServerThread;
/*     */ import org.jboss.remoting.transport.socket.SocketServerInvoker;
/*     */
/*     */ public class BisocketServerInvoker extends SocketServerInvoker
/*     */ {
/*  68 */   private static final Logger log = Logger.getLogger(BisocketServerInvoker.class);
/*     */
/*  70 */   private static Map listenerIdToServerInvokerMap = Collections.synchronizedMap(new HashMap());
/*     */   private static Timer timer;
/*  72 */   private static Object timerLock = new Object();
/*     */
/*  74 */   private Map listenerIdToInvokerLocatorMap = Collections.synchronizedMap(new HashMap());
/*     */   private ServerSocket secondaryServerSocket;
/*     */   private InvokerLocator secondaryLocator;
/*     */   private SecondaryServerSocketThread secondaryServerSocketThread;
/*  78 */   private Map controlConnectionThreadMap = new HashMap();
/*  79 */   private Map controlConnectionRestartsMap = Collections.synchronizedMap(new HashMap());
/*  80 */   private int pingFrequency = 5000;
/*  81 */   private int pingWindowFactor = 2;
/*  82 */   private int pingWindow = this.pingWindowFactor * this.pingFrequency;
/*  83 */   private int socketCreationRetries = 10;
/*  84 */   private int controlConnectionRestarts = 10;
/*     */   private ControlMonitorTimerTask controlMonitorTimerTask;
/*  86 */   protected boolean isCallbackServer = false;
/*  87 */   protected int secondaryBindPort = -1;
/*  88 */   protected int secondaryConnectPort = -1;
/*     */
/*     */   public static BisocketServerInvoker getBisocketServerInvoker(String listenerId)
/*     */   {
/*  93 */     return (BisocketServerInvoker)listenerIdToServerInvokerMap.get(listenerId);
/*     */   }
/*     */
/*     */   public BisocketServerInvoker(InvokerLocator locator)
/*     */   {
/*  99 */     super(locator);
/*     */   }
/*     */
/*     */   public BisocketServerInvoker(InvokerLocator locator, Map configuration)
/*     */   {
/* 105 */     super(locator, configuration);
/*     */   }
/*     */
/*     */   public void start()
/*     */     throws IOException
/*     */   {
/* 111 */     if (this.isCallbackServer)
/*     */     {
/* 113 */       Object val = this.configuration.get("maxRetries");
/* 114 */       if (val != null)
/*     */       {
/*     */         try
/*     */         {
/* 118 */           int nVal = Integer.valueOf((String)val).intValue();
/* 119 */           this.socketCreationRetries = nVal;
/* 120 */           log.debug("Setting socket creation retry limit: " + this.socketCreationRetries);
/*     */         }
/*     */         catch (Exception e)
/*     */         {
/* 124 */           log.warn("Could not convert maxRetries value of " + val + " to an int value.");
/*     */         }
/*     */
/*     */       }
/*     */
/* 129 */       val = this.configuration.get("maxControlConnectionRestarts");
/* 130 */       if (val != null)
/*     */       {
/*     */         try
/*     */         {
/* 134 */           int nVal = Integer.valueOf((String)val).intValue();
/* 135 */           this.controlConnectionRestarts = nVal;
/* 136 */           log.debug("Setting control connection restart limit: " + this.controlConnectionRestarts);
/*     */         }
/*     */         catch (Exception e)
/*     */         {
/* 140 */           log.warn("Could not convert maxControlConnectionRestarts value of " + val + " to an int value.");
/*     */         }
/*     */
/*     */       }
/*     */
/* 145 */       if (this.maxPoolSize <= 0)
/*     */       {
/* 147 */         this.maxPoolSize = MAX_POOL_SIZE_DEFAULT;
/*     */       }
/* 149 */       this.clientpool = new LRUPool(2, this.maxPoolSize);
/* 150 */       this.clientpool.create();
/* 151 */       this.threadpool = new LinkedList();
/* 152 */       checkSocketFactoryWrapper();
/*     */
/* 154 */       this.controlMonitorTimerTask = new ControlMonitorTimerTask(this);
/* 155 */       synchronized (timerLock)
/*     */       {
/* 157 */         if (timer == null)
/*     */         {
/* 159 */           timer = new Timer(true);
/*     */         }
/*     */         try
/*     */         {
/* 163 */           timer.schedule(this.controlMonitorTimerTask, this.pingFrequency, this.pingFrequency);
/*     */         }
/*     */         catch (IllegalStateException e)
/*     */         {
/* 167 */           log.debug("Unable to schedule TimerTask on existing Timer", e);
/* 168 */           timer = new Timer(true);
/* 169 */           timer.schedule(this.controlMonitorTimerTask, this.pingFrequency, this.pingFrequency);
/*     */         }
/*     */       }
/*     */
/* 173 */       this.running = true;
/* 174 */       this.started = true;
/*     */     }
/*     */     else
/*     */     {
/* 178 */       super.start();
/* 179 */       InetAddress host = getServerSocket().getInetAddress();
/* 180 */       if (this.secondaryBindPort < 0)
/*     */       {
/* 182 */         this.secondaryBindPort = PortUtil.findFreePort(host.getHostAddress());
/*     */       }
/* 184 */       if (this.serverSocketFactory != null)
/*     */       {
/* 186 */         this.secondaryServerSocket = this.serverSocketFactory.createServerSocket(this.secondaryBindPort, 0, host);
/*     */       }
/*     */       else
/*     */       {
/* 190 */         this.secondaryServerSocket = new ServerSocket(this.secondaryBindPort, 0, host);
/*     */       }
/* 192 */       checkSecondaryServerSocketWrapper();
/*     */
/* 194 */       String connectAddress = getLocator().getHost();
/* 195 */       int connectPort = this.secondaryConnectPort > 0 ? this.secondaryConnectPort : this.secondaryBindPort;
/* 196 */       this.secondaryLocator = new InvokerLocator(null, connectAddress, connectPort, null, null);
/* 197 */       this.secondaryServerSocketThread = new SecondaryServerSocketThread(this.secondaryServerSocket);
/* 198 */       this.secondaryServerSocketThread.setName("secondaryServerSocketThread");
/* 199 */       this.secondaryServerSocketThread.setDaemon(true);
/* 200 */       this.secondaryServerSocketThread.start();
/* 201 */       log.debug("started secondary port: " + host + ":" + this.secondaryBindPort);
/*     */     }
/*     */   }
/*     */
/*     */   public boolean isTransportBiDirectional()
/*     */   {
/* 208 */     return true;
/*     */   }
/*     */
/*     */   public void createControlConnection(String listenerId, boolean firstConnection)
/*     */     throws IOException
/*     */   {
/* 215 */     BisocketClientInvoker clientInvoker = BisocketClientInvoker.getBisocketClientInvoker(listenerId);
/*     */
/* 217 */     if (clientInvoker == null)
/*     */     {
/* 219 */       log.debug("Unable to retrieve client invoker: must have disconnected");
/* 220 */       throw new ClientUnavailableException();
/*     */     }
/*     */
/* 223 */     InvokerLocator oldLocator = (InvokerLocator)this.listenerIdToInvokerLocatorMap.get(listenerId);
/* 224 */     InvokerLocator newLocator = null;
/*     */     try
/*     */     {
/* 228 */       newLocator = clientInvoker.getSecondaryLocator();
/*     */     }
/*     */     catch (Throwable t)
/*     */     {
/* 232 */       log.error("unable to get secondary locator", t);
/* 233 */       throw new IOException("unable to get secondary locator: " + t.getMessage());
/*     */     }
/*     */
/* 244 */     boolean locatorChanged = !newLocator.equals(oldLocator);
/* 245 */     this.listenerIdToInvokerLocatorMap.put(listenerId, newLocator);
/* 246 */     log.debug("creating control connection: " + newLocator);
/*     */
/* 248 */     Socket socket = null;
/* 249 */     IOException savedException = null;
/*     */
/* 251 */     for (int i = 0; i < this.socketCreationRetries; i++)
/*     */     {
/*     */       try
/*     */       {
/* 255 */         if (this.socketFactory != null)
/* 256 */           socket = this.socketFactory.createSocket(newLocator.getHost(), newLocator.getPort());
/*     */         else
/* 258 */           socket = new Socket(newLocator.getHost(), newLocator.getPort());
/*     */       }
/*     */       catch (IOException e)
/*     */       {
/* 262 */         log.debug("Error creating a control socket", e);
/* 263 */         savedException = e;
/*     */       }
/*     */
/* 266 */       if (socket != null) {
/*     */         break;
/*     */       }
/*     */       try
/*     */       {
/* 271 */         Thread.sleep(1000L);
/*     */       }
/*     */       catch (InterruptedException e)
/*     */       {
/* 275 */         log.debug("received interrupt");
/*     */       }
/*     */     }
/*     */
/* 279 */     if (socket == null)
/*     */     {
/* 281 */       log.error("unable to create control connection after " + this.socketCreationRetries + " retries", savedException);
/*     */
/* 283 */       throw savedException;
/*     */     }
/*     */
/* 286 */     DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
/* 287 */     if (firstConnection)
/*     */     {
/* 289 */       dos.write(2);
/*     */     }
/*     */     else
/*     */     {
/* 293 */       dos.write(3);
/*     */     }
/* 295 */     dos.writeUTF(listenerId);
/*     */
/* 297 */     Thread thread = new ControlConnectionThread(socket, listenerId);
/* 298 */     thread.setName("control: " + socket.toString());
/* 299 */     thread.setDaemon(true);
/*     */
/* 301 */     synchronized (this.controlConnectionThreadMap)
/*     */     {
/* 303 */       this.controlConnectionThreadMap.put(listenerId, thread);
/*     */     }
/*     */
/* 306 */     Object o = this.controlConnectionRestartsMap.get(listenerId);
/* 307 */     if (o != null)
/*     */     {
/* 309 */       int restarts = ((Integer)o).intValue();
/* 310 */       if ((locatorChanged) || (restarts > 0)) {
/* 311 */         restarts++; this.controlConnectionRestartsMap.put(listenerId, new Integer(restarts));
/*     */       }
/*     */     }
/*     */     else {
/* 315 */       this.controlConnectionRestartsMap.put(listenerId, new Integer(0));
/*     */     }
/*     */
/* 318 */     thread.start();
/* 319 */     log.debug(this + " created control connection (" + listenerId + "): " + socket.toString());
/*     */   }
/*     */
/*     */   public void destroyControlConnection(String listenerId)
/*     */   {
/* 325 */     Thread t = null;
/*     */
/* 327 */     synchronized (this.controlConnectionThreadMap)
/*     */     {
/* 329 */       t = (Thread)this.controlConnectionThreadMap.remove(listenerId);
/*     */     }
/*     */
/* 332 */     if (t != null)
/*     */     {
/* 334 */       ((ControlConnectionThread)t).shutdown();
/* 335 */       log.debug(this + " shutting down control connection: " + listenerId);
/*     */     }
/*     */     else
/*     */     {
/* 339 */       log.debug("unrecognized listener ID: " + listenerId);
/*     */     }
/*     */
/* 342 */     this.listenerIdToInvokerLocatorMap.remove(listenerId);
/* 343 */     this.controlConnectionRestartsMap.remove(listenerId);
/*     */   }
/*     */
/*     */   public int getControlConnectionRestarts()
/*     */   {
/* 349 */     return this.controlConnectionRestarts;
/*     */   }
/*     */
/*     */   public void setControlConnectionRestarts(int controlConnectionRestarts)
/*     */   {
/* 355 */     this.controlConnectionRestarts = controlConnectionRestarts;
/*     */   }
/*     */
/*     */   public int getPingFrequency()
/*     */   {
/* 361 */     return this.pingFrequency;
/*     */   }
/*     */
/*     */   public void setPingFrequency(int pingFrequency)
/*     */   {
/* 367 */     this.pingFrequency = pingFrequency;
/* 368 */     this.pingWindow = (this.pingWindowFactor * pingFrequency);
/*     */   }
/*     */
/*     */   public int getPingWindowFactor()
/*     */   {
/* 374 */     return this.pingWindowFactor;
/*     */   }
/*     */
/*     */   public void setPingWindowFactor(int pingWindowFactor)
/*     */   {
/* 380 */     this.pingWindowFactor = pingWindowFactor;
/* 381 */     this.pingWindow = (pingWindowFactor * this.pingFrequency);
/*     */   }
/*     */
/*     */   public int getSecondaryBindPort()
/*     */   {
/* 387 */     return this.secondaryBindPort;
/*     */   }
/*     */
/*     */   public void setSecondaryBindPort(int secondaryPort)
/*     */   {
/* 393 */     this.secondaryBindPort = secondaryPort;
/*     */   }
/*     */
/*     */   public int getSecondaryConnectPort()
/*     */   {
/* 399 */     return this.secondaryConnectPort;
/*     */   }
/*     */
/*     */   public void setSecondaryConnectPort(int secondaryConnectPort)
/*     */   {
/* 405 */     this.secondaryConnectPort = secondaryConnectPort;
/*     */   }
/*     */
/*     */   public int getSocketCreationRetries()
/*     */   {
/* 411 */     return this.socketCreationRetries;
/*     */   }
/*     */
/*     */   public void setSocketCreationRetries(int socketCreationRetries)
/*     */   {
/* 417 */     this.socketCreationRetries = socketCreationRetries;
/*     */   }
/*     */
/*     */   protected void setup()
/*     */     throws Exception
/*     */   {
/* 423 */     Object o = this.configuration.get("isCallbackServer");
/*     */
/* 425 */     if (o != null)
/*     */     {
/* 427 */       if ((o instanceof String))
/* 428 */         this.isCallbackServer = Boolean.valueOf((String)o).booleanValue();
/* 429 */       else if ((o instanceof Boolean))
/* 430 */         this.isCallbackServer = ((Boolean)o).booleanValue();
/*     */       else {
/* 432 */         log.error("unrecognized value for configuration key \"isCallbackServer\": " + o);
/*     */       }
/*     */     }
/*     */
/* 436 */     o = this.configuration.get("pingFrequency");
/* 437 */     if (((o instanceof String)) && (((String)o).length() > 0))
/*     */     {
/*     */       try
/*     */       {
/* 441 */         this.pingFrequency = Integer.valueOf((String)o).intValue();
/* 442 */         log.debug(this + " setting pingFrequency to " + this.pingFrequency);
/*     */       }
/*     */       catch (NumberFormatException e)
/*     */       {
/* 446 */         log.warn("Invalid format for \"pingFrequency\": " + o);
/*     */       }
/*     */     }
/* 449 */     else if (o != null)
/*     */     {
/* 451 */       log.warn("\"pingFrequency\" must be specified as a String");
/*     */     }
/*     */
/* 454 */     o = this.configuration.get("pingWindowFactor");
/* 455 */     if (((o instanceof String)) && (((String)o).length() > 0))
/*     */     {
/*     */       try
/*     */       {
/* 459 */         this.pingWindowFactor = Integer.valueOf((String)o).intValue();
/* 460 */         log.debug(this + " setting pingWindowFactor to " + this.pingWindowFactor);
/*     */       }
/*     */       catch (NumberFormatException e)
/*     */       {
/* 464 */         log.warn("Invalid format for \"pingWindowFactor\": " + o);
/*     */       }
/*     */     }
/* 467 */     else if (o != null)
/*     */     {
/* 469 */       log.warn("\"pingWindowFactor\" must be specified as a String");
/*     */     }
/*     */
/* 472 */     this.pingWindow = (this.pingWindowFactor * this.pingFrequency);
/*     */
/* 474 */     o = this.configuration.get("secondaryBindPort");
/* 475 */     if (((o instanceof String)) && (((String)o).length() > 0))
/*     */     {
/*     */       try
/*     */       {
/* 479 */         this.secondaryBindPort = Integer.valueOf((String)o).intValue();
/* 480 */         log.debug(this + " setting secondaryBindPort to " + this.secondaryBindPort);
/*     */       }
/*     */       catch (NumberFormatException e)
/*     */       {
/* 484 */         log.warn("Invalid format for \"secondaryBindPort\": " + o);
/*     */       }
/*     */     }
/* 487 */     else if (o != null)
/*     */     {
/* 489 */       log.warn("\"secondaryBindPort\" must be specified as a String");
/*     */     }
/*     */
/* 492 */     o = this.configuration.get("secondaryConnectPort");
/* 493 */     if (((o instanceof String)) && (((String)o).length() > 0))
/*     */     {
/*     */       try
/*     */       {
/* 497 */         this.secondaryConnectPort = Integer.valueOf((String)o).intValue();
/* 498 */         log.debug(this + " setting secondaryConnectPort to " + this.secondaryConnectPort);
/*     */       }
/*     */       catch (NumberFormatException e)
/*     */       {
/* 502 */         log.warn("Invalid format for \"secondaryConnectPort\": " + o);
/*     */       }
/*     */     }
/* 505 */     else if (o != null)
/*     */     {
/* 507 */       log.warn("\"secondaryConnectPort\" must be specified as a String");
/*     */     }
/*     */
/* 510 */     super.setup();
/*     */   }
/*     */
/*     */   protected void cleanup()
/*     */   {
/* 516 */     super.cleanup();
/*     */
/* 518 */     if (this.controlMonitorTimerTask != null) {
/* 519 */       this.controlMonitorTimerTask.shutdown();
/*     */     }
/* 521 */     synchronized (this.controlConnectionThreadMap)
/*     */     {
/* 523 */       Iterator it = this.controlConnectionThreadMap.values().iterator();
/* 524 */       while (it.hasNext())
/*     */       {
/* 526 */         ControlConnectionThread t = (ControlConnectionThread)it.next();
/* 527 */         it.remove();
/* 528 */         t.shutdown();
/*     */       }
/*     */     }
/*     */
/* 532 */     if (this.secondaryServerSocketThread != null) {
/* 533 */       this.secondaryServerSocketThread.shutdown();
/*     */     }
/* 535 */     if (this.secondaryServerSocket != null)
/*     */     {
/*     */       try
/*     */       {
/* 539 */         this.secondaryServerSocket.close();
/*     */       }
/*     */       catch (IOException e)
/*     */       {
/* 543 */         log.info("Error closing secondary server socket: " + e.getMessage());
/*     */       }
/*     */     }
/*     */   }
/*     */
/*     */   protected InvokerLocator getSecondaryLocator()
/*     */   {
/* 551 */     return this.secondaryLocator;
/*     */   }
/*     */
/*     */   protected ServerSocket getServerSocket()
/*     */   {
/* 557 */     return this.serverSocket;
/*     */   }
/*     */
/*     */   protected void checkSocketFactoryWrapper()
/*     */     throws IOException
/*     */   {
/* 564 */     Object o = this.configuration.get("socketCreationServerListener");
/* 565 */     if (o != null)
/*     */     {
/* 567 */       if ((o instanceof SocketCreationListener))
/*     */       {
/* 569 */         SocketCreationListener listener = (SocketCreationListener)o;
/* 570 */         if ((this.socketFactory instanceof CreationListenerSocketFactory))
/*     */         {
/* 572 */           CreationListenerSocketFactory clsf = (CreationListenerSocketFactory)this.socketFactory;
/* 573 */           clsf.setListener(listener);
/*     */         }
/*     */         else
/*     */         {
/* 577 */           this.socketFactory = new CreationListenerSocketFactory(this.socketFactory, listener);
/*     */         }
/*     */       }
/*     */       else
/*     */       {
/* 582 */         log.error("socket creation listener of invalid type: " + o);
/*     */       }
/*     */
/*     */     }
/* 587 */     else if ((this.socketFactory instanceof CreationListenerSocketFactory))
/*     */     {
/* 589 */       CreationListenerSocketFactory clsf = (CreationListenerSocketFactory)this.socketFactory;
/* 590 */       this.socketFactory = clsf.getFactory();
/*     */     }
/*     */   }
/*     */
/*     */   protected void checkSecondaryServerSocketWrapper()
/*     */     throws IOException
/*     */   {
/* 599 */     Object o = this.configuration.get("socketCreationClientListener");
/* 600 */     if (o != null)
/*     */     {
/* 602 */       if ((o instanceof SocketCreationListener))
/*     */       {
/* 604 */         SocketCreationListener listener = (SocketCreationListener)o;
/* 605 */         if ((this.secondaryServerSocket instanceof CreationListenerServerSocket))
/*     */         {
/* 607 */           CreationListenerServerSocket clss = (CreationListenerServerSocket)this.secondaryServerSocket;
/* 608 */           clss.setListener(listener);
/*     */         }
/*     */         else
/*     */         {
/* 612 */           this.secondaryServerSocket = new CreationListenerServerSocket(this.secondaryServerSocket, listener);
/*     */         }
/*     */       }
/*     */       else
/*     */       {
/* 617 */         log.error("socket creation listener of invalid type: " + o);
/*     */       }
/*     */
/*     */     }
/* 622 */     else if ((this.secondaryServerSocket instanceof CreationListenerServerSocket))
/*     */     {
/* 624 */       CreationListenerServerSocket clss = (CreationListenerServerSocket)this.secondaryServerSocket;
/* 625 */       this.secondaryServerSocket = clss.getServerSocket();
/*     */     }
/*     */   }
/*     */
/*     */   protected Object handleInternalInvocation(InternalInvocation ii, InvocationRequest ir, ServerInvocationHandler handler)
/*     */     throws Throwable
/*     */   {
/* 636 */     if ("getSecondaryInvokerLocator".equals(ii.getMethodName()))
/*     */     {
/* 638 */       return this.secondaryLocator;
/*     */     }
/*     */
/* 641 */     Object response = super.handleInternalInvocation(ii, ir, handler);
/*     */
/* 643 */     if ("addClientListener".equals(ii.getMethodName()))
/*     */     {
/* 645 */       Map metadata = ir.getRequestPayload();
/* 646 */       if (metadata != null)
/*     */       {
/* 648 */         String listenerId = (String)metadata.get("listenerId");
/* 649 */         if (listenerId != null)
/*     */         {
/* 651 */           listenerIdToServerInvokerMap.put(listenerId, this);
/*     */         }
/*     */       }
/*     */     }
/* 655 */     else if ("removeClientListener".equals(ii.getMethodName()))
/*     */     {
/* 657 */       Map metadata = ir.getRequestPayload();
/* 658 */       if (metadata != null)
/*     */       {
/* 660 */         String listenerId = (String)metadata.get("listenerId");
/* 661 */         if (listenerId != null)
/*     */         {
/* 663 */           listenerIdToServerInvokerMap.remove(listenerId);
/* 664 */           BisocketClientInvoker.removeBisocketClientInvoker(listenerId);
/* 665 */           destroyControlConnection(listenerId);
/*     */         }
/*     */       }
/*     */     }
/*     */
/* 670 */     return response;
/*     */   }
/*     */   static class ClientUnavailableException extends IOException {
/*     */     private static final long serialVersionUID = 2846502029152028732L;
/*     */   }
/*     */   static class ControlMonitorTimerTask extends TimerTask {
/* 947 */     private boolean running = true;
/*     */     private BisocketServerInvoker invoker;
/*     */     private Map listenerIdToInvokerLocatorMap;
/*     */     private Map controlConnectionThreadMap;
/*     */     private Map controlConnectionRestartsMap;
/*     */     private int controlConnectionRestarts;
/*     */
/* 956 */     ControlMonitorTimerTask(BisocketServerInvoker invoker) { this.invoker = invoker;
/* 957 */       this.listenerIdToInvokerLocatorMap = invoker.listenerIdToInvokerLocatorMap;
/* 958 */       this.controlConnectionThreadMap = invoker.controlConnectionThreadMap;
/* 959 */       this.controlConnectionRestartsMap = invoker.controlConnectionRestartsMap;
/* 960 */       this.controlConnectionRestarts = invoker.controlConnectionRestarts;
/*     */     }
/*     */
/*     */     synchronized void shutdown()
/*     */     {
/* 971 */       this.running = false;
/* 972 */       this.invoker = null;
/* 973 */       this.listenerIdToInvokerLocatorMap = null;
/* 974 */       this.controlConnectionThreadMap = null;
/* 975 */       cancel();
/*     */     }
/*     */
/*     */     public void run()
/*     */     {
/* 980 */       if (!this.running) {
/* 981 */         return;
/*     */       }
/* 983 */       if (BisocketServerInvoker.log.isTraceEnabled()) {
/* 984 */         BisocketServerInvoker.log.trace("checking connections");
/*     */       }
/* 986 */       Collection controlConnectionThreads = null;
/* 987 */       synchronized (this)
/*     */       {
/* 989 */         if (!this.running) {
/* 990 */           return;
/*     */         }
/* 992 */         controlConnectionThreads = new HashSet(this.controlConnectionThreadMap.values());
/*     */       }
/*     */
/* 995 */       Iterator it = controlConnectionThreads.iterator();
/* 996 */       while (it.hasNext())
/*     */       {
/* 998 */         BisocketServerInvoker.ControlConnectionThread t = (BisocketServerInvoker.ControlConnectionThread)it.next();
/* 999 */         String listenerId = t.getListenerId();
/*     */         Object locator;
/* 1002 */         synchronized (this)
/*     */         {
/* 1004 */           if (!this.running) {
/* 1005 */             return;
/*     */           }
/* 1007 */           locator = this.listenerIdToInvokerLocatorMap.get(listenerId);
/*     */         }
/*     */         Object locator;
/* 1010 */         if (!t.checkConnection())
/*     */         {
/* 1012 */           t.shutdown();
/*     */
/* 1014 */           synchronized (this)
/*     */           {
/* 1016 */             if (!this.running) {
/* 1017 */               return;
/*     */             }
/* 1019 */             this.controlConnectionThreadMap.remove(listenerId);
/* 1020 */             Object o = this.controlConnectionRestartsMap.get(listenerId);
/* 1021 */             int restarts = ((Integer)o).intValue();
/*     */
/* 1023 */             if (restarts + 1 > this.controlConnectionRestarts)
/*     */             {
/* 1025 */               BisocketServerInvoker.log.warn(this + ": detected failure on control connection " + t);
/* 1026 */               BisocketServerInvoker.log.warn("Control connection " + listenerId + " has been recreated " + restarts + " times.");
/* 1027 */               BisocketServerInvoker.log.warn("Assuming it is a connection to an old server, and will not restart");
/* 1028 */               this.controlConnectionRestartsMap.remove(listenerId);
/* 1029 */               continue;
/*     */             }
/*     */
/* 1032 */             BisocketServerInvoker.log.warn(this + ": detected failure on control connection " + t + " (" + listenerId + ": requesting new control connection");
/*     */           }
/*     */
/* 1037 */           Thread t2 = new BisocketServerInvoker.1(this, listenerId, locator);
/*     */
/* 1061 */           t2.setName("controlConnectionRecreate:" + t.getName());
/* 1062 */           t2.start();
/*     */         }
/*     */       }
/*     */     }
/*     */   }
/*     */
/*     */   class SecondaryServerSocketThread extends Thread
/*     */   {
/*     */     private ServerSocket secondaryServerSocket;
/* 870 */     boolean running = true;
/*     */
/*     */     SecondaryServerSocketThread(ServerSocket secondaryServerSocket) throws IOException
/*     */     {
/* 874 */       this.secondaryServerSocket = secondaryServerSocket;
/*     */     }
/*     */
/*     */     void shutdown()
/*     */     {
/* 879 */       this.running = false;
/* 880 */       interrupt();
/*     */     }
/*     */
/*     */     public void run()
/*     */     {
/* 885 */       while (this.running)
/*     */       {
/*     */         try
/*     */         {
/* 889 */           Socket socket = this.secondaryServerSocket.accept();
/* 890 */           if (BisocketServerInvoker.log.isTraceEnabled()) BisocketServerInvoker.log.trace("accepted: " + socket);
/* 891 */           DataInputStream dis = new DataInputStream(socket.getInputStream());
/* 892 */           int action = dis.read();
/* 893 */           String listenerId = dis.readUTF();
/*     */
/* 895 */           switch (action)
/*     */           {
/*     */           case 2:
/* 898 */             BisocketClientInvoker.transferSocket(listenerId, socket, true);
/* 899 */             if (!BisocketServerInvoker.log.isTraceEnabled()) break;
/* 900 */             BisocketServerInvoker.log.trace("SecondaryServerSocketThread: created control socket: (" + socket + ")" + listenerId); break;
/*     */           case 3:
/* 904 */             BisocketClientInvoker invoker = BisocketClientInvoker.getBisocketCallbackClientInvoker(listenerId);
/* 905 */             if (invoker == null)
/*     */             {
/* 907 */               BisocketServerInvoker.log.error("received new control socket for unrecognized listenerId: " + listenerId);
/*     */             }
/*     */             else
/*     */             {
/* 911 */               invoker.replaceControlSocket(socket);
/* 912 */               if (!BisocketServerInvoker.log.isTraceEnabled()) break;
/* 913 */               BisocketServerInvoker.log.trace("SecondaryServerSocketThread: recreated control socket: " + listenerId); } break;
/*     */           case 4:
/* 918 */             BisocketClientInvoker.transferSocket(listenerId, socket, false);
/* 919 */             if (!BisocketServerInvoker.log.isTraceEnabled()) break;
/* 920 */             BisocketServerInvoker.log.trace("SecondaryServerSocketThread: transferred socket: " + listenerId); break;
/*     */           default:
/* 924 */             BisocketServerInvoker.log.error("unrecognized action on SecondaryServerSocketThread: " + action);
/*     */           }
/*     */         }
/*     */         catch (IOException e)
/*     */         {
/* 929 */           if (this.running)
/* 930 */             BisocketServerInvoker.log.error("Failed to accept socket connection", e);
/*     */           else
/* 932 */             return;
/*     */         }
/*     */       }
/*     */     }
/*     */
/*     */     ServerSocket getServerSocket()
/*     */     {
/* 940 */       return this.secondaryServerSocket;
/*     */     }
/*     */   }
/*     */
/*     */   class ControlConnectionThread extends Thread
/*     */   {
/*     */     private static final int MAX_INITIAL_ATTEMPTS = 5;
/*     */     private Socket controlSocket;
/*     */     private String listenerId;
/*     */     private DataInputStream dis;
/*     */     private boolean running;
/*     */     private int errorCount;
/* 682 */     private long lastPing = -1L;
/*     */     private int initialAttempts;
/*     */
/*     */     ControlConnectionThread(Socket socket, String listenerId)
/*     */       throws IOException
/*     */     {
/* 687 */       this.controlSocket = socket;
/* 688 */       this.listenerId = listenerId;
/* 689 */       this.dis = new DataInputStream(socket.getInputStream());
/*     */     }
/*     */
/*     */     void shutdown()
/*     */     {
/* 694 */       this.running = false;
/*     */       try
/*     */       {
/* 698 */         this.controlSocket.close();
/*     */       }
/*     */       catch (IOException e)
/*     */       {
/* 702 */         BisocketServerInvoker.log.warn("unable to close controlSocket");
/*     */       }
/* 704 */       interrupt();
/*     */     }
/*     */
/*     */     boolean checkConnection()
/*     */     {
/* 709 */       if ((this.lastPing < 0L) && (this.initialAttempts++ < 5))
/*     */       {
/* 711 */         return true;
/*     */       }
/* 713 */       if (this.lastPing < 0L)
/*     */       {
/* 715 */         return false;
/*     */       }
/*     */
/* 718 */       long currentTime = System.currentTimeMillis();
/*     */
/* 720 */       if (BisocketServerInvoker.log.isTraceEnabled())
/*     */       {
/* 722 */         BisocketServerInvoker.log.trace("elapsed: " + (currentTime - this.lastPing));
/*     */       }
/* 724 */       return currentTime - this.lastPing <= BisocketServerInvoker.this.pingWindow;
/*     */     }
/*     */
/*     */     String getListenerId()
/*     */     {
/* 729 */       return this.listenerId;
/*     */     }
/*     */
/*     */     public void run()
/*     */     {
/* 734 */       this.running = true;
/* 735 */       while (this.running)
/*     */       {
/* 737 */         Socket socket = null;
/*     */         try
/*     */         {
/* 741 */           int action = this.dis.read();
/* 742 */           this.lastPing = System.currentTimeMillis();
/*     */
/* 744 */           switch (action)
/*     */           {
/*     */           case 4:
/* 747 */             InvokerLocator locator = (InvokerLocator)BisocketServerInvoker.this.listenerIdToInvokerLocatorMap.get(this.listenerId);
/*     */
/* 749 */             IOException savedException = null;
/*     */
/* 751 */             for (int i = 0; i < BisocketServerInvoker.this.socketCreationRetries; i++)
/*     */             {
/*     */               try
/*     */               {
/* 755 */                 if (BisocketServerInvoker.this.socketFactory != null)
/* 756 */                   socket = BisocketServerInvoker.this.socketFactory.createSocket(locator.getHost(), locator.getPort());
/*     */                 else
/* 758 */                   socket = new Socket(locator.getHost(), locator.getPort());
/*     */               }
/*     */               catch (IOException e)
/*     */               {
/* 762 */                 BisocketServerInvoker.log.debug("Error creating a socket", e);
/* 763 */                 savedException = e;
/*     */               }
/*     */
/* 766 */               if (socket != null) {
/*     */                 break;
/*     */               }
/*     */               try
/*     */               {
/* 771 */                 Thread.sleep(1000L);
/*     */               }
/*     */               catch (InterruptedException e)
/*     */               {
/* 775 */                 if (this.running)
/*     */                 {
/* 777 */                   BisocketServerInvoker.log.debug("received unexpected interrupt");
/*     */                 }
/*     */                 else
/*     */                 {
/* 782 */                   return;
/*     */                 }
/*     */               }
/*     */             }
/*     */
/* 787 */             if (socket == null)
/*     */             {
/* 789 */               BisocketServerInvoker.log.error("Unable to create socket after " + BisocketServerInvoker.this.socketCreationRetries + " retries", savedException);
/*     */
/* 791 */               continue;
/*     */             }
/*     */
/* 794 */             DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
/* 795 */             dos.write(4);
/* 796 */             dos.writeUTF(this.listenerId);
/* 797 */             break;
/*     */           case 1:
/* 800 */             break;
/*     */           case -1:
/* 803 */             shutdown();
/* 804 */             return;
/*     */           default:
/* 807 */             BisocketServerInvoker.log.error("unrecognized action on ControlConnectionThread (" + this.listenerId + "): " + action);
/*     */
/* 809 */             continue;
/*     */           }
/*     */         }
/*     */         catch (IOException e)
/*     */         {
/* 814 */           if (this.running)
/*     */           {
/* 816 */             if (("Socket closed".equalsIgnoreCase(e.getMessage())) || ("Socket is closed".equalsIgnoreCase(e.getMessage())) || ("Connection reset".equalsIgnoreCase(e.getMessage())))
/*     */             {
/* 820 */               shutdown();
/* 821 */               return;
/*     */             }
/* 823 */             BisocketServerInvoker.log.error("Unable to process control connection: " + e.getMessage(), e);
/* 824 */             if (++this.errorCount > 5)
/*     */             {
/* 826 */               shutdown();
/* 827 */               return;
/*     */             }
/* 829 */             continue;
/*     */           }
/*     */
/* 832 */           return;
/*     */         }
/*     */
/* 835 */         synchronized (BisocketServerInvoker.this.clientpool)
/*     */         {
/* 837 */           if (BisocketServerInvoker.this.clientpool.size() < BisocketServerInvoker.this.maxPoolSize)
/*     */           {
/* 839 */             Thread thread = null;
/*     */             try
/*     */             {
/* 842 */               thread = new ServerThread(socket, BisocketServerInvoker.this, BisocketServerInvoker.this.clientpool, BisocketServerInvoker.this.threadpool, BisocketServerInvoker.this.getTimeout(), BisocketServerInvoker.this.serverSocketClass);
/*     */
/* 845 */               thread.start();
/*     */
/* 847 */               if (BisocketServerInvoker.log.isDebugEnabled())
/* 848 */                 BisocketServerInvoker.log.debug("created: " + thread);
/*     */             }
/*     */             catch (Exception e)
/*     */             {
/* 852 */               BisocketServerInvoker.log.error("Unable to create new ServerThread: " + e.getMessage());
/* 853 */               e.printStackTrace();
/*     */             }
/*     */
/* 856 */             synchronized (BisocketServerInvoker.this.threadpool)
/*     */             {
/* 858 */               BisocketServerInvoker.this.threadpool.add(thread);
/*     */             }
/*     */           }
/*     */         }
/*     */       }
/*     */     }
/*     */   }
/*     */ }

/* Location:           /home/mnovotny/projects/EMBEDDED_JBOSS_BETA3_COMMUNITY/embedded/output/lib/embedded-jboss/lib/jboss-embedded-all.jar
* Qualified Name:     org.jboss.remoting.transport.bisocket.BisocketServerInvoker
* JD-Core Version:    0.6.0
*/
TOP

Related Classes of org.jboss.remoting.transport.bisocket.BisocketServerInvoker

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.