/* */ package org.jboss.messaging.core.impl.clusterconnection;
/* */
/* */ import EDU.oswego.cs.dl.util.concurrent.Callable;
/* */ import EDU.oswego.cs.dl.util.concurrent.TimedCallable;
/* */ import java.util.ArrayList;
/* */ import java.util.Collection;
/* */ import java.util.HashMap;
/* */ import java.util.Iterator;
/* */ import java.util.List;
/* */ import java.util.Map;
/* */ import java.util.Map.Entry;
/* */ import java.util.Set;
/* */ import javax.jms.JMSException;
/* */ import org.jboss.jms.client.JBossConnection;
/* */ import org.jboss.jms.client.JBossConnectionFactory;
/* */ import org.jboss.jms.client.delegate.ClientConnectionFactoryDelegate;
/* */ import org.jboss.logging.Logger;
/* */ import org.jboss.messaging.core.contract.Binding;
/* */ import org.jboss.messaging.core.contract.ClusterNotification;
/* */ import org.jboss.messaging.core.contract.ClusterNotificationListener;
/* */ import org.jboss.messaging.core.contract.PostOffice;
/* */ import org.jboss.messaging.core.contract.Queue;
/* */ import org.jboss.messaging.core.contract.Replicator;
/* */
/* */ public class ClusterConnectionManager
/* */ implements ClusterNotificationListener
/* */ {
/* 60 */ private static final Logger log = Logger.getLogger(ClusterConnectionManager.class);
/* */ private static final long CLOSE_TIMEOUT = 2000L;
/* 64 */ private boolean trace = log.isTraceEnabled();
/* */ private Map connections;
/* */ private boolean xa;
/* */ private boolean started;
/* */ private int nodeID;
/* */ private String connectionFactoryUniqueName;
/* */ private Replicator replicator;
/* */ private PostOffice postOffice;
/* */ private boolean preserveOrdering;
/* */ private String suckerUser;
/* */ private String suckerPassword;
/* */
/* */ public ClusterConnectionManager(boolean xa, int nodeID, String connectionFactoryUniqueName, boolean preserveOrdering, String suckerUser, String suckerPassword)
/* */ {
/* 91 */ this.connections = new HashMap();
/* */
/* 93 */ this.xa = xa;
/* */
/* 95 */ this.nodeID = nodeID;
/* */
/* 97 */ this.connectionFactoryUniqueName = connectionFactoryUniqueName;
/* */
/* 99 */ this.preserveOrdering = preserveOrdering;
/* */
/* 101 */ this.suckerUser = suckerUser;
/* */
/* 103 */ this.suckerPassword = suckerPassword;
/* */
/* 105 */ if (this.trace) log.trace("Created " + this);
/* */ }
/* */
/* */ public void injectReplicator(Replicator replicator)
/* */ {
/* 110 */ this.replicator = replicator;
/* */ }
/* */
/* */ public void injectPostOffice(PostOffice postOffice)
/* */ {
/* 115 */ this.postOffice = postOffice;
/* */ }
/* */
/* */ public synchronized void start() throws Exception
/* */ {
/* 120 */ if (this.started)
/* */ {
/* 122 */ return;
/* */ }
/* */
/* 125 */ if (this.trace) log.trace(this + " started");
/* */
/* 127 */ this.started = true;
/* */ }
/* */
/* */ public synchronized void stop()
/* */ {
/* 132 */ if (!this.started)
/* */ {
/* 134 */ return;
/* */ }
/* */
/* 137 */ Iterator iter = this.connections.values().iterator();
/* */
/* 139 */ while (iter.hasNext())
/* */ {
/* 141 */ ConnectionInfo info = (ConnectionInfo)iter.next();
/* */
/* 143 */ info.close();
/* */ }
/* */
/* 146 */ this.connections.clear();
/* */
/* 148 */ this.started = false;
/* */
/* 150 */ if (this.trace) log.trace(this + " stopped");
/* */ }
/* */
/* */ public Map getAllConnections()
/* */ {
/* 155 */ return this.connections;
/* */ }
/* */
/* */ public void resetAllSuckers()
/* */ {
/* 160 */ Iterator iter = this.connections.values().iterator();
/* */
/* 162 */ while (iter.hasNext())
/* */ {
/* 164 */ ConnectionInfo conn = (ConnectionInfo)iter.next();
/* */
/* 166 */ conn.resetAllSuckers();
/* */ }
/* */ }
/* */
/* */ public void setIsXA(boolean xa) throws Exception
/* */ {
/* 172 */ boolean needToClose = this.xa != xa;
/* 173 */ if (needToClose)
/* */ {
/* 175 */ closeAllSuckers();
/* */ }
/* 177 */ this.xa = xa;
/* 178 */ if (needToClose)
/* */ {
/* 180 */ createAllSuckers();
/* */ }
/* */ }
/* */
/* */ public void closeAllSuckers()
/* */ {
/* 186 */ Iterator iter = this.connections.values().iterator();
/* */
/* 188 */ while (iter.hasNext())
/* */ {
/* 190 */ ConnectionInfo conn = (ConnectionInfo)iter.next();
/* */
/* 192 */ conn.closeAllSuckers();
/* */ }
/* */ }
/* */
/* */ public synchronized void notify(ClusterNotification notification)
/* */ {
/* 219 */ if (this.replicator == null)
/* */ {
/* 223 */ return;
/* */ }
/* */
/* 226 */ if (this.trace) log.trace(this + " notification received " + notification);
/* */
/* */ try
/* */ {
/* 230 */ if ((notification.type == 6) && ((notification.data instanceof String)))
/* */ {
/* 232 */ String key = (String)notification.data;
/* */
/* 234 */ if (key.startsWith("CF_"))
/* */ {
/* 239 */ String uniqueName = key.substring("CF_".length());
/* */
/* 241 */ if (uniqueName.equals(this.connectionFactoryUniqueName))
/* */ {
/* 243 */ log.trace(this + " deployment of ClusterConnectionFactory");
/* */
/* 245 */ synchronized (this)
/* */ {
/* 247 */ ensureAllConnectionsCreated();
/* */
/* 252 */ createAllSuckers();
/* */ }
/* */ }
/* */ }
/* */ }
/* 257 */ else if ((notification.type == 7) && ((notification.data instanceof String)))
/* */ {
/* 259 */ String key = (String)notification.data;
/* */
/* 261 */ if (key.startsWith("CF_"))
/* */ {
/* 266 */ String uniqueName = key.substring("CF_".length());
/* */
/* 268 */ if (uniqueName.equals(this.connectionFactoryUniqueName))
/* */ {
/* 270 */ Map updatedReplicantMap = this.replicator.get(key);
/* */
/* 272 */ Object toRemove = new ArrayList();
/* */
/* 274 */ Iterator iter = this.connections.entrySet().iterator();
/* */
/* 276 */ while (iter.hasNext())
/* */ {
/* 278 */ Map.Entry entry = (Map.Entry)iter.next();
/* */
/* 280 */ Integer nid = (Integer)entry.getKey();
/* */
/* 282 */ if (updatedReplicantMap.get(nid) == null)
/* */ {
/* 284 */ ((List)toRemove).add(nid);
/* */ }
/* */ }
/* */
/* 288 */ iter = ((List)toRemove).iterator();
/* */
/* 290 */ while (iter.hasNext())
/* */ {
/* 292 */ Integer nid = (Integer)iter.next();
/* */
/* 294 */ ConnectionInfo info = (ConnectionInfo)this.connections.remove(nid);
/* */
/* 296 */ info.close();
/* */ }
/* */ }
/* */ }
/* */ }
/* 301 */ else if (notification.type == 0)
/* */ {
/* 303 */ String queueName = (String)notification.data;
/* */
/* 305 */ if (this.trace) log.trace(this + " bind of queue " + queueName);
/* */
/* 307 */ if (notification.nodeID == this.nodeID)
/* */ {
/* 311 */ if (this.trace) log.trace(this + " Local bind");
/* */
/* 313 */ ensureAllConnectionsCreated();
/* */
/* 317 */ Collection bindings = this.postOffice.getAllBindingsForQueueName(queueName);
/* */
/* 319 */ Iterator iter = bindings.iterator();
/* */
/* 321 */ if (this.trace) log.trace(this + " Looking for remote bindings");
/* */
/* 323 */ while (iter.hasNext())
/* */ {
/* 325 */ Binding binding = (Binding)iter.next();
/* */
/* 327 */ if (this.trace) log.trace(this + " Remote binding is " + binding);
/* */
/* 331 */ if (binding.queue.getNodeID() != this.nodeID)
/* */ {
/* 333 */ if (this.trace) log.trace(this + " Creating sucker");
/* */
/* 335 */ createSucker(queueName, binding.queue.getNodeID());
/* */ }
/* */
/* */ }
/* */
/* */ }
/* */ else
/* */ {
/* 343 */ if (this.trace) log.trace(this + " Remote bind");
/* */
/* 345 */ ensureAllConnectionsCreated();
/* */
/* 349 */ Binding localBinding = this.postOffice.getBindingForQueueName(queueName);
/* */
/* 351 */ if (localBinding == null)
/* */ {
/* 354 */ if (this.trace) log.trace(this + " There's no local binding");
/* */
/* */ }
/* */ else
/* */ {
/* 360 */ if (this.trace) log.trace(this + " Creating sucker");
/* */
/* 362 */ createSucker(queueName, notification.nodeID);
/* */ }
/* */ }
/* */ }
/* 366 */ else if (notification.type == 1)
/* */ {
/* 368 */ String queueName = (String)notification.data;
/* */
/* 370 */ if (notification.nodeID == this.nodeID)
/* */ {
/* 376 */ removeAllSuckers(queueName);
/* */ }
/* */ else
/* */ {
/* 384 */ removeSucker(queueName, notification.nodeID);
/* */ }
/* */ }
/* */ }
/* */ catch (Exception e)
/* */ {
/* 390 */ log.error("Failed to process notification", e);
/* */ }
/* */ }
/* */
/* */ public String toString()
/* */ {
/* 396 */ return "ClusterConnectionManager:" + System.identityHashCode(this) + " xa: " + this.xa + " nodeID: " + this.nodeID + " connectionFactoryName: " + this.connectionFactoryUniqueName;
/* */ }
/* */
/* */ private void ensureAllConnectionsCreated()
/* */ throws Exception
/* */ {
/* 402 */ Map updatedReplicantMap = this.replicator.get("CF_" + this.connectionFactoryUniqueName);
/* */
/* 406 */ Iterator iter = updatedReplicantMap.entrySet().iterator();
/* */
/* 408 */ while (iter.hasNext())
/* */ {
/* 410 */ Map.Entry entry = (Map.Entry)iter.next();
/* */
/* 412 */ Integer nid = (Integer)entry.getKey();
/* */
/* 414 */ ClientConnectionFactoryDelegate delegate = (ClientConnectionFactoryDelegate)entry.getValue();
/* */
/* 416 */ if (this.connections.get(nid) == null)
/* */ {
/* */ try
/* */ {
/* 420 */ ConnectionInfo info = new ConnectionInfo(new JBossConnectionFactory(delegate), this.suckerUser, this.suckerPassword);
/* */
/* 422 */ log.trace(this + " created connection info " + info);
/* */
/* 424 */ this.connections.put(nid, info);
/* */
/* 426 */ info.start();
/* */ }
/* */ catch (Exception e)
/* */ {
/* 431 */ log.error("Failed to start connection info ", e);
/* */ }
/* */ }
/* */ }
/* */ }
/* */
/* */ private void createSucker(String queueName, int nodeID) throws Exception
/* */ {
/* 439 */ log.debug("createSucker " + queueName + " nodeID=" + nodeID);
/* */
/* 441 */ ConnectionInfo info = (ConnectionInfo)this.connections.get(new Integer(nodeID));
/* */
/* 443 */ if (info == null)
/* */ {
/* 445 */ if (this.trace) log.trace("Cluster pull connection factory has not yet been deployed on node " + nodeID);
/* */
/* 447 */ return;
/* */ }
/* */
/* 450 */ ConnectionInfo localInfo = (ConnectionInfo)this.connections.get(new Integer(this.nodeID));
/* */
/* 452 */ if (localInfo == null)
/* */ {
/* 454 */ if (this.trace) log.trace("Cluster pull connection factory has not yet been deployed on local node");
/* */
/* 456 */ return;
/* */ }
/* */
/* 461 */ if (!info.hasSucker(queueName))
/* */ {
/* 463 */ if (this.trace) log.trace("Creating Sucker for queue " + queueName + " node " + nodeID);
/* */
/* 467 */ Binding binding = this.postOffice.getBindingForQueueName(queueName);
/* */
/* 469 */ Queue localQueue = binding.queue;
/* */
/* 471 */ if (localQueue.isClustered())
/* */ {
/* 473 */ MessageSucker sucker = new MessageSucker(localQueue, info.connection, localInfo.connection, this.xa, this.preserveOrdering);
/* */
/* 475 */ info.addSucker(sucker);
/* */
/* 477 */ sucker.start();
/* */
/* 479 */ if (this.trace) log.trace("Started it");
/* */
/* */ }
/* */
/* */ }
/* 484 */ else if (this.trace) { log.trace("Sucker for queue " + queueName + " node " + nodeID + " already exists, not creating it");
/* */ }
/* */ }
/* */
/* */ private void removeSucker(String queueName, int nodeID)
/* */ {
/* 490 */ log.debug("removeSucker " + queueName + " nodeID=" + nodeID);
/* */
/* 492 */ ConnectionInfo info = (ConnectionInfo)this.connections.get(new Integer(nodeID));
/* */
/* 494 */ if (info == null)
/* */ {
/* 497 */ return;
/* */ }
/* */
/* 500 */ MessageSucker sucker = info.removeSucker(queueName);
/* */
/* 502 */ if (sucker != null)
/* */ {
/* 508 */ sucker.stop();
/* */ }
/* */ }
/* */
/* */ private void removeAllSuckers(String queueName)
/* */ {
/* 514 */ log.debug("removeAllSuckers " + queueName);
/* */
/* 516 */ Iterator iter = this.connections.values().iterator();
/* */
/* 518 */ while (iter.hasNext())
/* */ {
/* 520 */ ConnectionInfo info = (ConnectionInfo)iter.next();
/* */
/* 522 */ MessageSucker sucker = info.removeSucker(queueName);
/* */
/* 526 */ if (sucker != null)
/* */ {
/* 528 */ sucker.stop();
/* */ }
/* */ }
/* */ }
/* */
/* */ private void createAllSuckers() throws Exception
/* */ {
/* 535 */ Collection allBindings = this.postOffice.getAllBindings();
/* */
/* 537 */ Iterator iter = allBindings.iterator();
/* */
/* 539 */ Map nameMap = new HashMap();
/* */
/* 543 */ while (iter.hasNext())
/* */ {
/* 545 */ Binding binding = (Binding)iter.next();
/* */
/* 547 */ if (binding.queue.isClustered())
/* */ {
/* 549 */ List queues = (List)nameMap.get(binding.queue.getName());
/* */
/* 551 */ if (queues == null)
/* */ {
/* 553 */ queues = new ArrayList();
/* */
/* 555 */ nameMap.put(binding.queue.getName(), queues);
/* */ }
/* */
/* 558 */ queues.add(binding.queue);
/* */ }
/* */ }
/* */
/* 562 */ iter = nameMap.entrySet().iterator();
/* */
/* 564 */ while (iter.hasNext())
/* */ {
/* 566 */ Map.Entry entry = (Map.Entry)iter.next();
/* */
/* 568 */ String queueName = (String)entry.getKey();
/* */
/* 570 */ List queues = (List)entry.getValue();
/* */
/* 574 */ Iterator iter2 = queues.iterator();
/* */
/* 576 */ Queue localQueue = null;
/* */
/* 578 */ while (iter2.hasNext())
/* */ {
/* 580 */ Queue queue = (Queue)iter2.next();
/* */
/* 582 */ if (queue.getNodeID() == this.nodeID)
/* */ {
/* 584 */ localQueue = queue;
/* */
/* 586 */ break;
/* */ }
/* */ }
/* */
/* 590 */ if (localQueue != null)
/* */ {
/* 592 */ iter2 = queues.iterator();
/* */
/* 594 */ while (iter2.hasNext())
/* */ {
/* 596 */ Queue queue = (Queue)iter2.next();
/* */
/* 598 */ if ((queue.getNodeID() != this.nodeID) && (queue.isClustered()))
/* */ {
/* 602 */ createSucker(queueName, queue.getNodeID());
/* */ }
/* */ }
/* */ }
/* */ }
/* */ }
/* */
/* */ class ConnectionInfo
/* */ {
/* */ private JBossConnectionFactory connectionFactory;
/* */ private JBossConnection connection;
/* */ private Map suckers;
/* */ private boolean started;
/* */ private String suckerUser;
/* */ private String suckerPassword;
/* */
/* */ ConnectionInfo(JBossConnectionFactory connectionFactory, String suckerUser, String suckerPassword) throws Exception {
/* 625 */ this.connectionFactory = connectionFactory;
/* */
/* 627 */ this.suckers = new HashMap();
/* */
/* 629 */ this.suckerUser = suckerUser;
/* */
/* 631 */ this.suckerPassword = suckerPassword;
/* */ }
/* */
/* */ synchronized void start() throws Exception
/* */ {
/* 636 */ if (this.started)
/* */ {
/* 638 */ return;
/* */ }
/* */
/* 641 */ if (this.connection == null)
/* */ {
/* 643 */ this.connection = ((JBossConnection)this.connectionFactory.createConnection(this.suckerUser, this.suckerPassword));
/* */ }
/* */
/* 646 */ this.connection.start();
/* */
/* 648 */ this.started = true;
/* */ }
/* */
/* */ synchronized void stop() throws Exception
/* */ {
/* 653 */ if (!this.started)
/* */ {
/* 655 */ return;
/* */ }
/* */
/* 658 */ this.connection.stop();
/* */
/* 660 */ this.started = false;
/* */ }
/* */
/* */ synchronized void resetAllSuckers()
/* */ {
/* 665 */ Iterator iter = this.suckers.values().iterator();
/* */
/* 667 */ while (iter.hasNext())
/* */ {
/* 669 */ MessageSucker sucker = (MessageSucker)iter.next();
/* */
/* 671 */ sucker.setConsuming(false);
/* */ }
/* */ }
/* */
/* */ synchronized void closeAllSuckers()
/* */ {
/* 677 */ Iterator iter = this.suckers.values().iterator();
/* */
/* 679 */ while (iter.hasNext())
/* */ {
/* 681 */ MessageSucker sucker = (MessageSucker)iter.next();
/* */
/* 683 */ sucker.stop();
/* */ }
/* */
/* 686 */ this.suckers.clear();
/* */ }
/* */
/* */ synchronized void close()
/* */ {
/* 691 */ closeAllSuckers();
/* */
/* 696 */ Callable callable = new Callable()
/* */ {
/* */ public Object call() {
/* */ try {
/* 700 */ ClusterConnectionManager.ConnectionInfo.this.connection.close();
/* */ }
/* */ catch (JMSException ignore)
/* */ {
/* */ }
/* 705 */ return null;
/* */ }
/* */ };
/* 708 */ Callable timedCallable = new TimedCallable(callable, 2000L);
/* */ try
/* */ {
/* 712 */ timedCallable.call();
/* */ }
/* */ catch (Throwable t)
/* */ {
/* */ }
/* */
/* 719 */ this.connection = null;
/* */
/* 721 */ this.started = false;
/* */ }
/* */
/* */ synchronized boolean hasSucker(String queueName)
/* */ {
/* 726 */ return this.suckers.containsKey(queueName);
/* */ }
/* */
/* */ synchronized void addSucker(MessageSucker sucker)
/* */ {
/* 731 */ if (this.suckers.containsKey(sucker.getQueueName()))
/* */ {
/* 733 */ throw new IllegalStateException("Already has sucker for queue " + sucker.getQueueName());
/* */ }
/* */
/* 736 */ this.suckers.put(sucker.getQueueName(), sucker);
/* */ }
/* */
/* */ synchronized MessageSucker removeSucker(String queueName)
/* */ {
/* 741 */ MessageSucker sucker = (MessageSucker)this.suckers.remove(queueName);
/* */
/* 743 */ return sucker;
/* */ }
/* */ }
/* */ }
/* Location: /home/mnovotny/projects/EMBEDDED_JBOSS_BETA3_COMMUNITY/embedded/output/lib/embedded-jboss/lib/jboss-embedded-all.jar
* Qualified Name: org.jboss.messaging.core.impl.clusterconnection.ClusterConnectionManager
* JD-Core Version: 0.6.0
*/