/* */ package org.jboss.remoting.transport.bisocket;
/* */
/* */ import java.io.DataInputStream;
/* */ import java.io.DataOutputStream;
/* */ import java.io.IOException;
/* */ import java.net.InetAddress;
/* */ import java.net.ServerSocket;
/* */ import java.net.Socket;
/* */ import java.util.Collection;
/* */ import java.util.Collections;
/* */ import java.util.HashMap;
/* */ import java.util.HashSet;
/* */ import java.util.Iterator;
/* */ import java.util.LinkedList;
/* */ import java.util.Map;
/* */ import java.util.Timer;
/* */ import java.util.TimerTask;
/* */ import javax.net.ServerSocketFactory;
/* */ import javax.net.SocketFactory;
/* */ import org.jboss.logging.Logger;
/* */ import org.jboss.remoting.InvocationRequest;
/* */ import org.jboss.remoting.InvokerLocator;
/* */ import org.jboss.remoting.ServerInvocationHandler;
/* */ import org.jboss.remoting.invocation.InternalInvocation;
/* */ import org.jboss.remoting.socketfactory.CreationListenerServerSocket;
/* */ import org.jboss.remoting.socketfactory.CreationListenerSocketFactory;
/* */ import org.jboss.remoting.socketfactory.SocketCreationListener;
/* */ import org.jboss.remoting.transport.PortUtil;
/* */ import org.jboss.remoting.transport.socket.LRUPool;
/* */ import org.jboss.remoting.transport.socket.ServerThread;
/* */ import org.jboss.remoting.transport.socket.SocketServerInvoker;
/* */
/* */ public class BisocketServerInvoker extends SocketServerInvoker
/* */ {
/* 68 */ private static final Logger log = Logger.getLogger(BisocketServerInvoker.class);
/* */
/* 70 */ private static Map listenerIdToServerInvokerMap = Collections.synchronizedMap(new HashMap());
/* */ private static Timer timer;
/* 72 */ private static Object timerLock = new Object();
/* */
/* 74 */ private Map listenerIdToInvokerLocatorMap = Collections.synchronizedMap(new HashMap());
/* */ private ServerSocket secondaryServerSocket;
/* */ private InvokerLocator secondaryLocator;
/* */ private SecondaryServerSocketThread secondaryServerSocketThread;
/* 78 */ private Map controlConnectionThreadMap = new HashMap();
/* 79 */ private Map controlConnectionRestartsMap = Collections.synchronizedMap(new HashMap());
/* 80 */ private int pingFrequency = 5000;
/* 81 */ private int pingWindowFactor = 2;
/* 82 */ private int pingWindow = this.pingWindowFactor * this.pingFrequency;
/* 83 */ private int socketCreationRetries = 10;
/* 84 */ private int controlConnectionRestarts = 10;
/* */ private ControlMonitorTimerTask controlMonitorTimerTask;
/* 86 */ protected boolean isCallbackServer = false;
/* 87 */ protected int secondaryBindPort = -1;
/* 88 */ protected int secondaryConnectPort = -1;
/* */
/* */ public static BisocketServerInvoker getBisocketServerInvoker(String listenerId)
/* */ {
/* 93 */ return (BisocketServerInvoker)listenerIdToServerInvokerMap.get(listenerId);
/* */ }
/* */
/* */ public BisocketServerInvoker(InvokerLocator locator)
/* */ {
/* 99 */ super(locator);
/* */ }
/* */
/* */ public BisocketServerInvoker(InvokerLocator locator, Map configuration)
/* */ {
/* 105 */ super(locator, configuration);
/* */ }
/* */
/* */ public void start()
/* */ throws IOException
/* */ {
/* 111 */ if (this.isCallbackServer)
/* */ {
/* 113 */ Object val = this.configuration.get("maxRetries");
/* 114 */ if (val != null)
/* */ {
/* */ try
/* */ {
/* 118 */ int nVal = Integer.valueOf((String)val).intValue();
/* 119 */ this.socketCreationRetries = nVal;
/* 120 */ log.debug("Setting socket creation retry limit: " + this.socketCreationRetries);
/* */ }
/* */ catch (Exception e)
/* */ {
/* 124 */ log.warn("Could not convert maxRetries value of " + val + " to an int value.");
/* */ }
/* */
/* */ }
/* */
/* 129 */ val = this.configuration.get("maxControlConnectionRestarts");
/* 130 */ if (val != null)
/* */ {
/* */ try
/* */ {
/* 134 */ int nVal = Integer.valueOf((String)val).intValue();
/* 135 */ this.controlConnectionRestarts = nVal;
/* 136 */ log.debug("Setting control connection restart limit: " + this.controlConnectionRestarts);
/* */ }
/* */ catch (Exception e)
/* */ {
/* 140 */ log.warn("Could not convert maxControlConnectionRestarts value of " + val + " to an int value.");
/* */ }
/* */
/* */ }
/* */
/* 145 */ if (this.maxPoolSize <= 0)
/* */ {
/* 147 */ this.maxPoolSize = MAX_POOL_SIZE_DEFAULT;
/* */ }
/* 149 */ this.clientpool = new LRUPool(2, this.maxPoolSize);
/* 150 */ this.clientpool.create();
/* 151 */ this.threadpool = new LinkedList();
/* 152 */ checkSocketFactoryWrapper();
/* */
/* 154 */ this.controlMonitorTimerTask = new ControlMonitorTimerTask(this);
/* 155 */ synchronized (timerLock)
/* */ {
/* 157 */ if (timer == null)
/* */ {
/* 159 */ timer = new Timer(true);
/* */ }
/* */ try
/* */ {
/* 163 */ timer.schedule(this.controlMonitorTimerTask, this.pingFrequency, this.pingFrequency);
/* */ }
/* */ catch (IllegalStateException e)
/* */ {
/* 167 */ log.debug("Unable to schedule TimerTask on existing Timer", e);
/* 168 */ timer = new Timer(true);
/* 169 */ timer.schedule(this.controlMonitorTimerTask, this.pingFrequency, this.pingFrequency);
/* */ }
/* */ }
/* */
/* 173 */ this.running = true;
/* 174 */ this.started = true;
/* */ }
/* */ else
/* */ {
/* 178 */ super.start();
/* 179 */ InetAddress host = getServerSocket().getInetAddress();
/* 180 */ if (this.secondaryBindPort < 0)
/* */ {
/* 182 */ this.secondaryBindPort = PortUtil.findFreePort(host.getHostAddress());
/* */ }
/* 184 */ if (this.serverSocketFactory != null)
/* */ {
/* 186 */ this.secondaryServerSocket = this.serverSocketFactory.createServerSocket(this.secondaryBindPort, 0, host);
/* */ }
/* */ else
/* */ {
/* 190 */ this.secondaryServerSocket = new ServerSocket(this.secondaryBindPort, 0, host);
/* */ }
/* 192 */ checkSecondaryServerSocketWrapper();
/* */
/* 194 */ String connectAddress = getLocator().getHost();
/* 195 */ int connectPort = this.secondaryConnectPort > 0 ? this.secondaryConnectPort : this.secondaryBindPort;
/* 196 */ this.secondaryLocator = new InvokerLocator(null, connectAddress, connectPort, null, null);
/* 197 */ this.secondaryServerSocketThread = new SecondaryServerSocketThread(this.secondaryServerSocket);
/* 198 */ this.secondaryServerSocketThread.setName("secondaryServerSocketThread");
/* 199 */ this.secondaryServerSocketThread.setDaemon(true);
/* 200 */ this.secondaryServerSocketThread.start();
/* 201 */ log.debug("started secondary port: " + host + ":" + this.secondaryBindPort);
/* */ }
/* */ }
/* */
/* */ public boolean isTransportBiDirectional()
/* */ {
/* 208 */ return true;
/* */ }
/* */
/* */ public void createControlConnection(String listenerId, boolean firstConnection)
/* */ throws IOException
/* */ {
/* 215 */ BisocketClientInvoker clientInvoker = BisocketClientInvoker.getBisocketClientInvoker(listenerId);
/* */
/* 217 */ if (clientInvoker == null)
/* */ {
/* 219 */ log.debug("Unable to retrieve client invoker: must have disconnected");
/* 220 */ throw new ClientUnavailableException();
/* */ }
/* */
/* 223 */ InvokerLocator oldLocator = (InvokerLocator)this.listenerIdToInvokerLocatorMap.get(listenerId);
/* 224 */ InvokerLocator newLocator = null;
/* */ try
/* */ {
/* 228 */ newLocator = clientInvoker.getSecondaryLocator();
/* */ }
/* */ catch (Throwable t)
/* */ {
/* 232 */ log.error("unable to get secondary locator", t);
/* 233 */ throw new IOException("unable to get secondary locator: " + t.getMessage());
/* */ }
/* */
/* 244 */ boolean locatorChanged = !newLocator.equals(oldLocator);
/* 245 */ this.listenerIdToInvokerLocatorMap.put(listenerId, newLocator);
/* 246 */ log.debug("creating control connection: " + newLocator);
/* */
/* 248 */ Socket socket = null;
/* 249 */ IOException savedException = null;
/* */
/* 251 */ for (int i = 0; i < this.socketCreationRetries; i++)
/* */ {
/* */ try
/* */ {
/* 255 */ if (this.socketFactory != null)
/* 256 */ socket = this.socketFactory.createSocket(newLocator.getHost(), newLocator.getPort());
/* */ else
/* 258 */ socket = new Socket(newLocator.getHost(), newLocator.getPort());
/* */ }
/* */ catch (IOException e)
/* */ {
/* 262 */ log.debug("Error creating a control socket", e);
/* 263 */ savedException = e;
/* */ }
/* */
/* 266 */ if (socket != null) {
/* */ break;
/* */ }
/* */ try
/* */ {
/* 271 */ Thread.sleep(1000L);
/* */ }
/* */ catch (InterruptedException e)
/* */ {
/* 275 */ log.debug("received interrupt");
/* */ }
/* */ }
/* */
/* 279 */ if (socket == null)
/* */ {
/* 281 */ log.error("unable to create control connection after " + this.socketCreationRetries + " retries", savedException);
/* */
/* 283 */ throw savedException;
/* */ }
/* */
/* 286 */ DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
/* 287 */ if (firstConnection)
/* */ {
/* 289 */ dos.write(2);
/* */ }
/* */ else
/* */ {
/* 293 */ dos.write(3);
/* */ }
/* 295 */ dos.writeUTF(listenerId);
/* */
/* 297 */ Thread thread = new ControlConnectionThread(socket, listenerId);
/* 298 */ thread.setName("control: " + socket.toString());
/* 299 */ thread.setDaemon(true);
/* */
/* 301 */ synchronized (this.controlConnectionThreadMap)
/* */ {
/* 303 */ this.controlConnectionThreadMap.put(listenerId, thread);
/* */ }
/* */
/* 306 */ Object o = this.controlConnectionRestartsMap.get(listenerId);
/* 307 */ if (o != null)
/* */ {
/* 309 */ int restarts = ((Integer)o).intValue();
/* 310 */ if ((locatorChanged) || (restarts > 0)) {
/* 311 */ restarts++; this.controlConnectionRestartsMap.put(listenerId, new Integer(restarts));
/* */ }
/* */ }
/* */ else {
/* 315 */ this.controlConnectionRestartsMap.put(listenerId, new Integer(0));
/* */ }
/* */
/* 318 */ thread.start();
/* 319 */ log.debug(this + " created control connection (" + listenerId + "): " + socket.toString());
/* */ }
/* */
/* */ public void destroyControlConnection(String listenerId)
/* */ {
/* 325 */ Thread t = null;
/* */
/* 327 */ synchronized (this.controlConnectionThreadMap)
/* */ {
/* 329 */ t = (Thread)this.controlConnectionThreadMap.remove(listenerId);
/* */ }
/* */
/* 332 */ if (t != null)
/* */ {
/* 334 */ ((ControlConnectionThread)t).shutdown();
/* 335 */ log.debug(this + " shutting down control connection: " + listenerId);
/* */ }
/* */ else
/* */ {
/* 339 */ log.debug("unrecognized listener ID: " + listenerId);
/* */ }
/* */
/* 342 */ this.listenerIdToInvokerLocatorMap.remove(listenerId);
/* 343 */ this.controlConnectionRestartsMap.remove(listenerId);
/* */ }
/* */
/* */ public int getControlConnectionRestarts()
/* */ {
/* 349 */ return this.controlConnectionRestarts;
/* */ }
/* */
/* */ public void setControlConnectionRestarts(int controlConnectionRestarts)
/* */ {
/* 355 */ this.controlConnectionRestarts = controlConnectionRestarts;
/* */ }
/* */
/* */ public int getPingFrequency()
/* */ {
/* 361 */ return this.pingFrequency;
/* */ }
/* */
/* */ public void setPingFrequency(int pingFrequency)
/* */ {
/* 367 */ this.pingFrequency = pingFrequency;
/* 368 */ this.pingWindow = (this.pingWindowFactor * pingFrequency);
/* */ }
/* */
/* */ public int getPingWindowFactor()
/* */ {
/* 374 */ return this.pingWindowFactor;
/* */ }
/* */
/* */ public void setPingWindowFactor(int pingWindowFactor)
/* */ {
/* 380 */ this.pingWindowFactor = pingWindowFactor;
/* 381 */ this.pingWindow = (pingWindowFactor * this.pingFrequency);
/* */ }
/* */
/* */ public int getSecondaryBindPort()
/* */ {
/* 387 */ return this.secondaryBindPort;
/* */ }
/* */
/* */ public void setSecondaryBindPort(int secondaryPort)
/* */ {
/* 393 */ this.secondaryBindPort = secondaryPort;
/* */ }
/* */
/* */ public int getSecondaryConnectPort()
/* */ {
/* 399 */ return this.secondaryConnectPort;
/* */ }
/* */
/* */ public void setSecondaryConnectPort(int secondaryConnectPort)
/* */ {
/* 405 */ this.secondaryConnectPort = secondaryConnectPort;
/* */ }
/* */
/* */ public int getSocketCreationRetries()
/* */ {
/* 411 */ return this.socketCreationRetries;
/* */ }
/* */
/* */ public void setSocketCreationRetries(int socketCreationRetries)
/* */ {
/* 417 */ this.socketCreationRetries = socketCreationRetries;
/* */ }
/* */
/* */ protected void setup()
/* */ throws Exception
/* */ {
/* 423 */ Object o = this.configuration.get("isCallbackServer");
/* */
/* 425 */ if (o != null)
/* */ {
/* 427 */ if ((o instanceof String))
/* 428 */ this.isCallbackServer = Boolean.valueOf((String)o).booleanValue();
/* 429 */ else if ((o instanceof Boolean))
/* 430 */ this.isCallbackServer = ((Boolean)o).booleanValue();
/* */ else {
/* 432 */ log.error("unrecognized value for configuration key \"isCallbackServer\": " + o);
/* */ }
/* */ }
/* */
/* 436 */ o = this.configuration.get("pingFrequency");
/* 437 */ if (((o instanceof String)) && (((String)o).length() > 0))
/* */ {
/* */ try
/* */ {
/* 441 */ this.pingFrequency = Integer.valueOf((String)o).intValue();
/* 442 */ log.debug(this + " setting pingFrequency to " + this.pingFrequency);
/* */ }
/* */ catch (NumberFormatException e)
/* */ {
/* 446 */ log.warn("Invalid format for \"pingFrequency\": " + o);
/* */ }
/* */ }
/* 449 */ else if (o != null)
/* */ {
/* 451 */ log.warn("\"pingFrequency\" must be specified as a String");
/* */ }
/* */
/* 454 */ o = this.configuration.get("pingWindowFactor");
/* 455 */ if (((o instanceof String)) && (((String)o).length() > 0))
/* */ {
/* */ try
/* */ {
/* 459 */ this.pingWindowFactor = Integer.valueOf((String)o).intValue();
/* 460 */ log.debug(this + " setting pingWindowFactor to " + this.pingWindowFactor);
/* */ }
/* */ catch (NumberFormatException e)
/* */ {
/* 464 */ log.warn("Invalid format for \"pingWindowFactor\": " + o);
/* */ }
/* */ }
/* 467 */ else if (o != null)
/* */ {
/* 469 */ log.warn("\"pingWindowFactor\" must be specified as a String");
/* */ }
/* */
/* 472 */ this.pingWindow = (this.pingWindowFactor * this.pingFrequency);
/* */
/* 474 */ o = this.configuration.get("secondaryBindPort");
/* 475 */ if (((o instanceof String)) && (((String)o).length() > 0))
/* */ {
/* */ try
/* */ {
/* 479 */ this.secondaryBindPort = Integer.valueOf((String)o).intValue();
/* 480 */ log.debug(this + " setting secondaryBindPort to " + this.secondaryBindPort);
/* */ }
/* */ catch (NumberFormatException e)
/* */ {
/* 484 */ log.warn("Invalid format for \"secondaryBindPort\": " + o);
/* */ }
/* */ }
/* 487 */ else if (o != null)
/* */ {
/* 489 */ log.warn("\"secondaryBindPort\" must be specified as a String");
/* */ }
/* */
/* 492 */ o = this.configuration.get("secondaryConnectPort");
/* 493 */ if (((o instanceof String)) && (((String)o).length() > 0))
/* */ {
/* */ try
/* */ {
/* 497 */ this.secondaryConnectPort = Integer.valueOf((String)o).intValue();
/* 498 */ log.debug(this + " setting secondaryConnectPort to " + this.secondaryConnectPort);
/* */ }
/* */ catch (NumberFormatException e)
/* */ {
/* 502 */ log.warn("Invalid format for \"secondaryConnectPort\": " + o);
/* */ }
/* */ }
/* 505 */ else if (o != null)
/* */ {
/* 507 */ log.warn("\"secondaryConnectPort\" must be specified as a String");
/* */ }
/* */
/* 510 */ super.setup();
/* */ }
/* */
/* */ protected void cleanup()
/* */ {
/* 516 */ super.cleanup();
/* */
/* 518 */ if (this.controlMonitorTimerTask != null) {
/* 519 */ this.controlMonitorTimerTask.shutdown();
/* */ }
/* 521 */ synchronized (this.controlConnectionThreadMap)
/* */ {
/* 523 */ Iterator it = this.controlConnectionThreadMap.values().iterator();
/* 524 */ while (it.hasNext())
/* */ {
/* 526 */ ControlConnectionThread t = (ControlConnectionThread)it.next();
/* 527 */ it.remove();
/* 528 */ t.shutdown();
/* */ }
/* */ }
/* */
/* 532 */ if (this.secondaryServerSocketThread != null) {
/* 533 */ this.secondaryServerSocketThread.shutdown();
/* */ }
/* 535 */ if (this.secondaryServerSocket != null)
/* */ {
/* */ try
/* */ {
/* 539 */ this.secondaryServerSocket.close();
/* */ }
/* */ catch (IOException e)
/* */ {
/* 543 */ log.info("Error closing secondary server socket: " + e.getMessage());
/* */ }
/* */ }
/* */ }
/* */
/* */ protected InvokerLocator getSecondaryLocator()
/* */ {
/* 551 */ return this.secondaryLocator;
/* */ }
/* */
/* */ protected ServerSocket getServerSocket()
/* */ {
/* 557 */ return this.serverSocket;
/* */ }
/* */
/* */ protected void checkSocketFactoryWrapper()
/* */ throws IOException
/* */ {
/* 564 */ Object o = this.configuration.get("socketCreationServerListener");
/* 565 */ if (o != null)
/* */ {
/* 567 */ if ((o instanceof SocketCreationListener))
/* */ {
/* 569 */ SocketCreationListener listener = (SocketCreationListener)o;
/* 570 */ if ((this.socketFactory instanceof CreationListenerSocketFactory))
/* */ {
/* 572 */ CreationListenerSocketFactory clsf = (CreationListenerSocketFactory)this.socketFactory;
/* 573 */ clsf.setListener(listener);
/* */ }
/* */ else
/* */ {
/* 577 */ this.socketFactory = new CreationListenerSocketFactory(this.socketFactory, listener);
/* */ }
/* */ }
/* */ else
/* */ {
/* 582 */ log.error("socket creation listener of invalid type: " + o);
/* */ }
/* */
/* */ }
/* 587 */ else if ((this.socketFactory instanceof CreationListenerSocketFactory))
/* */ {
/* 589 */ CreationListenerSocketFactory clsf = (CreationListenerSocketFactory)this.socketFactory;
/* 590 */ this.socketFactory = clsf.getFactory();
/* */ }
/* */ }
/* */
/* */ protected void checkSecondaryServerSocketWrapper()
/* */ throws IOException
/* */ {
/* 599 */ Object o = this.configuration.get("socketCreationClientListener");
/* 600 */ if (o != null)
/* */ {
/* 602 */ if ((o instanceof SocketCreationListener))
/* */ {
/* 604 */ SocketCreationListener listener = (SocketCreationListener)o;
/* 605 */ if ((this.secondaryServerSocket instanceof CreationListenerServerSocket))
/* */ {
/* 607 */ CreationListenerServerSocket clss = (CreationListenerServerSocket)this.secondaryServerSocket;
/* 608 */ clss.setListener(listener);
/* */ }
/* */ else
/* */ {
/* 612 */ this.secondaryServerSocket = new CreationListenerServerSocket(this.secondaryServerSocket, listener);
/* */ }
/* */ }
/* */ else
/* */ {
/* 617 */ log.error("socket creation listener of invalid type: " + o);
/* */ }
/* */
/* */ }
/* 622 */ else if ((this.secondaryServerSocket instanceof CreationListenerServerSocket))
/* */ {
/* 624 */ CreationListenerServerSocket clss = (CreationListenerServerSocket)this.secondaryServerSocket;
/* 625 */ this.secondaryServerSocket = clss.getServerSocket();
/* */ }
/* */ }
/* */
/* */ protected Object handleInternalInvocation(InternalInvocation ii, InvocationRequest ir, ServerInvocationHandler handler)
/* */ throws Throwable
/* */ {
/* 636 */ if ("getSecondaryInvokerLocator".equals(ii.getMethodName()))
/* */ {
/* 638 */ return this.secondaryLocator;
/* */ }
/* */
/* 641 */ Object response = super.handleInternalInvocation(ii, ir, handler);
/* */
/* 643 */ if ("addClientListener".equals(ii.getMethodName()))
/* */ {
/* 645 */ Map metadata = ir.getRequestPayload();
/* 646 */ if (metadata != null)
/* */ {
/* 648 */ String listenerId = (String)metadata.get("listenerId");
/* 649 */ if (listenerId != null)
/* */ {
/* 651 */ listenerIdToServerInvokerMap.put(listenerId, this);
/* */ }
/* */ }
/* */ }
/* 655 */ else if ("removeClientListener".equals(ii.getMethodName()))
/* */ {
/* 657 */ Map metadata = ir.getRequestPayload();
/* 658 */ if (metadata != null)
/* */ {
/* 660 */ String listenerId = (String)metadata.get("listenerId");
/* 661 */ if (listenerId != null)
/* */ {
/* 663 */ listenerIdToServerInvokerMap.remove(listenerId);
/* 664 */ BisocketClientInvoker.removeBisocketClientInvoker(listenerId);
/* 665 */ destroyControlConnection(listenerId);
/* */ }
/* */ }
/* */ }
/* */
/* 670 */ return response;
/* */ }
/* */ static class ClientUnavailableException extends IOException {
/* */ private static final long serialVersionUID = 2846502029152028732L;
/* */ }
/* */ static class ControlMonitorTimerTask extends TimerTask {
/* 947 */ private boolean running = true;
/* */ private BisocketServerInvoker invoker;
/* */ private Map listenerIdToInvokerLocatorMap;
/* */ private Map controlConnectionThreadMap;
/* */ private Map controlConnectionRestartsMap;
/* */ private int controlConnectionRestarts;
/* */
/* 956 */ ControlMonitorTimerTask(BisocketServerInvoker invoker) { this.invoker = invoker;
/* 957 */ this.listenerIdToInvokerLocatorMap = invoker.listenerIdToInvokerLocatorMap;
/* 958 */ this.controlConnectionThreadMap = invoker.controlConnectionThreadMap;
/* 959 */ this.controlConnectionRestartsMap = invoker.controlConnectionRestartsMap;
/* 960 */ this.controlConnectionRestarts = invoker.controlConnectionRestarts;
/* */ }
/* */
/* */ synchronized void shutdown()
/* */ {
/* 971 */ this.running = false;
/* 972 */ this.invoker = null;
/* 973 */ this.listenerIdToInvokerLocatorMap = null;
/* 974 */ this.controlConnectionThreadMap = null;
/* 975 */ cancel();
/* */ }
/* */
/* */ public void run()
/* */ {
/* 980 */ if (!this.running) {
/* 981 */ return;
/* */ }
/* 983 */ if (BisocketServerInvoker.log.isTraceEnabled()) {
/* 984 */ BisocketServerInvoker.log.trace("checking connections");
/* */ }
/* 986 */ Collection controlConnectionThreads = null;
/* 987 */ synchronized (this)
/* */ {
/* 989 */ if (!this.running) {
/* 990 */ return;
/* */ }
/* 992 */ controlConnectionThreads = new HashSet(this.controlConnectionThreadMap.values());
/* */ }
/* */
/* 995 */ Iterator it = controlConnectionThreads.iterator();
/* 996 */ while (it.hasNext())
/* */ {
/* 998 */ BisocketServerInvoker.ControlConnectionThread t = (BisocketServerInvoker.ControlConnectionThread)it.next();
/* 999 */ String listenerId = t.getListenerId();
/* */ Object locator;
/* 1002 */ synchronized (this)
/* */ {
/* 1004 */ if (!this.running) {
/* 1005 */ return;
/* */ }
/* 1007 */ locator = this.listenerIdToInvokerLocatorMap.get(listenerId);
/* */ }
/* */ Object locator;
/* 1010 */ if (!t.checkConnection())
/* */ {
/* 1012 */ t.shutdown();
/* */
/* 1014 */ synchronized (this)
/* */ {
/* 1016 */ if (!this.running) {
/* 1017 */ return;
/* */ }
/* 1019 */ this.controlConnectionThreadMap.remove(listenerId);
/* 1020 */ Object o = this.controlConnectionRestartsMap.get(listenerId);
/* 1021 */ int restarts = ((Integer)o).intValue();
/* */
/* 1023 */ if (restarts + 1 > this.controlConnectionRestarts)
/* */ {
/* 1025 */ BisocketServerInvoker.log.warn(this + ": detected failure on control connection " + t);
/* 1026 */ BisocketServerInvoker.log.warn("Control connection " + listenerId + " has been recreated " + restarts + " times.");
/* 1027 */ BisocketServerInvoker.log.warn("Assuming it is a connection to an old server, and will not restart");
/* 1028 */ this.controlConnectionRestartsMap.remove(listenerId);
/* 1029 */ continue;
/* */ }
/* */
/* 1032 */ BisocketServerInvoker.log.warn(this + ": detected failure on control connection " + t + " (" + listenerId + ": requesting new control connection");
/* */ }
/* */
/* 1037 */ Thread t2 = new BisocketServerInvoker.1(this, listenerId, locator);
/* */
/* 1061 */ t2.setName("controlConnectionRecreate:" + t.getName());
/* 1062 */ t2.start();
/* */ }
/* */ }
/* */ }
/* */ }
/* */
/* */ class SecondaryServerSocketThread extends Thread
/* */ {
/* */ private ServerSocket secondaryServerSocket;
/* 870 */ boolean running = true;
/* */
/* */ SecondaryServerSocketThread(ServerSocket secondaryServerSocket) throws IOException
/* */ {
/* 874 */ this.secondaryServerSocket = secondaryServerSocket;
/* */ }
/* */
/* */ void shutdown()
/* */ {
/* 879 */ this.running = false;
/* 880 */ interrupt();
/* */ }
/* */
/* */ public void run()
/* */ {
/* 885 */ while (this.running)
/* */ {
/* */ try
/* */ {
/* 889 */ Socket socket = this.secondaryServerSocket.accept();
/* 890 */ if (BisocketServerInvoker.log.isTraceEnabled()) BisocketServerInvoker.log.trace("accepted: " + socket);
/* 891 */ DataInputStream dis = new DataInputStream(socket.getInputStream());
/* 892 */ int action = dis.read();
/* 893 */ String listenerId = dis.readUTF();
/* */
/* 895 */ switch (action)
/* */ {
/* */ case 2:
/* 898 */ BisocketClientInvoker.transferSocket(listenerId, socket, true);
/* 899 */ if (!BisocketServerInvoker.log.isTraceEnabled()) break;
/* 900 */ BisocketServerInvoker.log.trace("SecondaryServerSocketThread: created control socket: (" + socket + ")" + listenerId); break;
/* */ case 3:
/* 904 */ BisocketClientInvoker invoker = BisocketClientInvoker.getBisocketCallbackClientInvoker(listenerId);
/* 905 */ if (invoker == null)
/* */ {
/* 907 */ BisocketServerInvoker.log.error("received new control socket for unrecognized listenerId: " + listenerId);
/* */ }
/* */ else
/* */ {
/* 911 */ invoker.replaceControlSocket(socket);
/* 912 */ if (!BisocketServerInvoker.log.isTraceEnabled()) break;
/* 913 */ BisocketServerInvoker.log.trace("SecondaryServerSocketThread: recreated control socket: " + listenerId); } break;
/* */ case 4:
/* 918 */ BisocketClientInvoker.transferSocket(listenerId, socket, false);
/* 919 */ if (!BisocketServerInvoker.log.isTraceEnabled()) break;
/* 920 */ BisocketServerInvoker.log.trace("SecondaryServerSocketThread: transferred socket: " + listenerId); break;
/* */ default:
/* 924 */ BisocketServerInvoker.log.error("unrecognized action on SecondaryServerSocketThread: " + action);
/* */ }
/* */ }
/* */ catch (IOException e)
/* */ {
/* 929 */ if (this.running)
/* 930 */ BisocketServerInvoker.log.error("Failed to accept socket connection", e);
/* */ else
/* 932 */ return;
/* */ }
/* */ }
/* */ }
/* */
/* */ ServerSocket getServerSocket()
/* */ {
/* 940 */ return this.secondaryServerSocket;
/* */ }
/* */ }
/* */
/* */ class ControlConnectionThread extends Thread
/* */ {
/* */ private static final int MAX_INITIAL_ATTEMPTS = 5;
/* */ private Socket controlSocket;
/* */ private String listenerId;
/* */ private DataInputStream dis;
/* */ private boolean running;
/* */ private int errorCount;
/* 682 */ private long lastPing = -1L;
/* */ private int initialAttempts;
/* */
/* */ ControlConnectionThread(Socket socket, String listenerId)
/* */ throws IOException
/* */ {
/* 687 */ this.controlSocket = socket;
/* 688 */ this.listenerId = listenerId;
/* 689 */ this.dis = new DataInputStream(socket.getInputStream());
/* */ }
/* */
/* */ void shutdown()
/* */ {
/* 694 */ this.running = false;
/* */ try
/* */ {
/* 698 */ this.controlSocket.close();
/* */ }
/* */ catch (IOException e)
/* */ {
/* 702 */ BisocketServerInvoker.log.warn("unable to close controlSocket");
/* */ }
/* 704 */ interrupt();
/* */ }
/* */
/* */ boolean checkConnection()
/* */ {
/* 709 */ if ((this.lastPing < 0L) && (this.initialAttempts++ < 5))
/* */ {
/* 711 */ return true;
/* */ }
/* 713 */ if (this.lastPing < 0L)
/* */ {
/* 715 */ return false;
/* */ }
/* */
/* 718 */ long currentTime = System.currentTimeMillis();
/* */
/* 720 */ if (BisocketServerInvoker.log.isTraceEnabled())
/* */ {
/* 722 */ BisocketServerInvoker.log.trace("elapsed: " + (currentTime - this.lastPing));
/* */ }
/* 724 */ return currentTime - this.lastPing <= BisocketServerInvoker.this.pingWindow;
/* */ }
/* */
/* */ String getListenerId()
/* */ {
/* 729 */ return this.listenerId;
/* */ }
/* */
/* */ public void run()
/* */ {
/* 734 */ this.running = true;
/* 735 */ while (this.running)
/* */ {
/* 737 */ Socket socket = null;
/* */ try
/* */ {
/* 741 */ int action = this.dis.read();
/* 742 */ this.lastPing = System.currentTimeMillis();
/* */
/* 744 */ switch (action)
/* */ {
/* */ case 4:
/* 747 */ InvokerLocator locator = (InvokerLocator)BisocketServerInvoker.this.listenerIdToInvokerLocatorMap.get(this.listenerId);
/* */
/* 749 */ IOException savedException = null;
/* */
/* 751 */ for (int i = 0; i < BisocketServerInvoker.this.socketCreationRetries; i++)
/* */ {
/* */ try
/* */ {
/* 755 */ if (BisocketServerInvoker.this.socketFactory != null)
/* 756 */ socket = BisocketServerInvoker.this.socketFactory.createSocket(locator.getHost(), locator.getPort());
/* */ else
/* 758 */ socket = new Socket(locator.getHost(), locator.getPort());
/* */ }
/* */ catch (IOException e)
/* */ {
/* 762 */ BisocketServerInvoker.log.debug("Error creating a socket", e);
/* 763 */ savedException = e;
/* */ }
/* */
/* 766 */ if (socket != null) {
/* */ break;
/* */ }
/* */ try
/* */ {
/* 771 */ Thread.sleep(1000L);
/* */ }
/* */ catch (InterruptedException e)
/* */ {
/* 775 */ if (this.running)
/* */ {
/* 777 */ BisocketServerInvoker.log.debug("received unexpected interrupt");
/* */ }
/* */ else
/* */ {
/* 782 */ return;
/* */ }
/* */ }
/* */ }
/* */
/* 787 */ if (socket == null)
/* */ {
/* 789 */ BisocketServerInvoker.log.error("Unable to create socket after " + BisocketServerInvoker.this.socketCreationRetries + " retries", savedException);
/* */
/* 791 */ continue;
/* */ }
/* */
/* 794 */ DataOutputStream dos = new DataOutputStream(socket.getOutputStream());
/* 795 */ dos.write(4);
/* 796 */ dos.writeUTF(this.listenerId);
/* 797 */ break;
/* */ case 1:
/* 800 */ break;
/* */ case -1:
/* 803 */ shutdown();
/* 804 */ return;
/* */ default:
/* 807 */ BisocketServerInvoker.log.error("unrecognized action on ControlConnectionThread (" + this.listenerId + "): " + action);
/* */
/* 809 */ continue;
/* */ }
/* */ }
/* */ catch (IOException e)
/* */ {
/* 814 */ if (this.running)
/* */ {
/* 816 */ if (("Socket closed".equalsIgnoreCase(e.getMessage())) || ("Socket is closed".equalsIgnoreCase(e.getMessage())) || ("Connection reset".equalsIgnoreCase(e.getMessage())))
/* */ {
/* 820 */ shutdown();
/* 821 */ return;
/* */ }
/* 823 */ BisocketServerInvoker.log.error("Unable to process control connection: " + e.getMessage(), e);
/* 824 */ if (++this.errorCount > 5)
/* */ {
/* 826 */ shutdown();
/* 827 */ return;
/* */ }
/* 829 */ continue;
/* */ }
/* */
/* 832 */ return;
/* */ }
/* */
/* 835 */ synchronized (BisocketServerInvoker.this.clientpool)
/* */ {
/* 837 */ if (BisocketServerInvoker.this.clientpool.size() < BisocketServerInvoker.this.maxPoolSize)
/* */ {
/* 839 */ Thread thread = null;
/* */ try
/* */ {
/* 842 */ thread = new ServerThread(socket, BisocketServerInvoker.this, BisocketServerInvoker.this.clientpool, BisocketServerInvoker.this.threadpool, BisocketServerInvoker.this.getTimeout(), BisocketServerInvoker.this.serverSocketClass);
/* */
/* 845 */ thread.start();
/* */
/* 847 */ if (BisocketServerInvoker.log.isDebugEnabled())
/* 848 */ BisocketServerInvoker.log.debug("created: " + thread);
/* */ }
/* */ catch (Exception e)
/* */ {
/* 852 */ BisocketServerInvoker.log.error("Unable to create new ServerThread: " + e.getMessage());
/* 853 */ e.printStackTrace();
/* */ }
/* */
/* 856 */ synchronized (BisocketServerInvoker.this.threadpool)
/* */ {
/* 858 */ BisocketServerInvoker.this.threadpool.add(thread);
/* */ }
/* */ }
/* */ }
/* */ }
/* */ }
/* */ }
/* */ }
/* Location: /home/mnovotny/projects/EMBEDDED_JBOSS_BETA3_COMMUNITY/embedded/output/lib/embedded-jboss/lib/jboss-embedded-all.jar
* Qualified Name: org.jboss.remoting.transport.bisocket.BisocketServerInvoker
* JD-Core Version: 0.6.0
*/