Package org.goldenorb.server

Source Code of org.goldenorb.server.OrbTrackerMemberDataServiceImpl

package org.goldenorb.server;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.goldenorb.client.NodeDoesNotExistException;
import org.goldenorb.client.OrbTrackerMemberData;
import org.goldenorb.client.OrbTrackerMemberDataService;
import org.goldenorb.client.WatcherException;
import org.goldenorb.client.ZooKeeperConnectionException;
import org.goldenorb.conf.OrbConfiguration;
import org.goldenorb.zookeeper.OrbZKFailure;
import org.goldenorb.zookeeper.ZookeeperUtils;

import com.google.gwt.user.server.rpc.RemoteServiceServlet;

/**
* GWT servlet that watches for status update through ZooKeeper.
*/
public class OrbTrackerMemberDataServiceImpl extends RemoteServiceServlet implements
    OrbTrackerMemberDataService, Watcher {
 
  /**
   *
   */
  private static final long serialVersionUID = 7790444401752275415L;
  private Map<String,OrbTrackerMemberData> memberDataContainer = new HashMap<String,OrbTrackerMemberData>();
  private String[] jobsInQueue;
  private String[] jobsInProgress;
  private ZooKeeper zk = null;
  // private String cluster;
  private Watcher leaderGroupWatcher;
  private Watcher jobQueueWatcher;
  private Watcher jobsInProgressWatcher;
  private OrbConfiguration orbConf = new OrbConfiguration(true);
  private boolean testingMode = false;
  private CountDownLatch jobsLatch;
  private CountDownLatch removeLatch;
  private CountDownLatch updateLatch;
 
  /**
   * Returns the data of the OrbTrackerMember nodes under the LeaderGroup node in ZooKeeper.
   */
  @Override
  public OrbTrackerMemberData[] getOrbTrackerMemberData() throws ZooKeeperConnectionException,
                                                         WatcherException,
                                                         NodeDoesNotExistException {
    if (zk == null) {
      try {
        initializeZooKeeper();
        memberDataContainer = new HashMap<String,OrbTrackerMemberData>();
      } catch (Exception e) {
        e.printStackTrace();
        throw new ZooKeeperConnectionException(e);
      }
    }
    // Initialize leader group watcher
    if (leaderGroupWatcher == null) {
      try {
        initializeLeaderGroupMonitor();
      } catch (OrbZKFailure e) {
        throw new WatcherException(e);
      }
    }
    return memberDataContainer.values().toArray(new OrbTrackerMemberData[0]);
  }
 
  private void initializeLeaderGroupMonitor() throws OrbZKFailure, NodeDoesNotExistException {
    String leaderGroupPath = "/GoldenOrb/" + orbConf.getOrbClusterName() + "/OrbTrackerLeaderGroup";
    if (ZookeeperUtils.nodeExists(zk, leaderGroupPath)) {
      leaderGroupWatcher = new LeaderGroupMonitor(leaderGroupPath, this, zk);
    } else {
      throw new NodeDoesNotExistException(leaderGroupPath);
    }
  }
 
  private void initializeJobMonitor(String nodeName) throws NodeDoesNotExistException, OrbZKFailure {
    String jobPath = "/GoldenOrb/" + orbConf.getOrbClusterName() + "/" + nodeName;
    if (ZookeeperUtils.nodeExists(zk, jobPath)) {
      if (nodeName.equalsIgnoreCase("JobQueue")) jobQueueWatcher = new JobsMonitor(jobPath, this, zk);
      else jobsInProgressWatcher = new JobsMonitor(jobPath, this, zk);
    } else {
      throw new NodeDoesNotExistException(jobPath);
    }
  }
 
  /**
   * Initializes a ZooKeeper instance if it has not already been intialized.
   *
   *
   * @throws IOException
   * @throws InterruptedException
   */
  private void initializeZooKeeper() throws IOException, InterruptedException {
    // String connectString = getServletConfig().getInitParameter("connectString");
  if(testingMode) {
    orbConf.setOrbZooKeeperQuorum("localhost:21810");
  }
    zk = ZookeeperUtils.connect(orbConf.getOrbZooKeeperQuorum());
  }
 
  /**
   * Update the data of the OrbTrackerMember nodes being monitored.
   *
   * @param updatedNode
   *          - OrbTrackermemberData object
   */
  public synchronized void updateNodeData(OrbTrackerMemberData updatedNode) {
    memberDataContainer.put(updatedNode.getName(), updatedNode);
    if (testingMode) {
      updateLatch.countDown();
    }
  }
 
  /**
   * Remove a node from the data container that is sent to client side application.
   *
   * @param nodeName
   */
  public synchronized void removeNodeData(String nodeName) {
    memberDataContainer.remove(nodeName);
    if (testingMode) {
      removeLatch.countDown();
    }
  }
 
  @Override
  public void process(WatchedEvent event) {
    // Don't care about what event get passed back from connecting to zookeeper.
    // It will either connect or throw an exception.
  }
 
  /**
   * Returns an array of the job names that are in queue.
   */
  @Override
  public String[] getJobsInQueue() throws NodeDoesNotExistException,
                                  ZooKeeperConnectionException,
                                  WatcherException {
    if (zk == null) {
      try {
        initializeZooKeeper();
      } catch (Exception e) {
        throw new ZooKeeperConnectionException(e);
      }
    }
    if (jobQueueWatcher == null) {
      try {
        initializeJobMonitor("JobQueue");
      } catch (OrbZKFailure e) {
        throw new WatcherException(e);
      }
    }
    return jobsInQueue;
  }
 
  /**
   * Returns an array of job names that are in progress.
   */
  @Override
  public String[] getJobsInProgress() throws NodeDoesNotExistException,
                                     ZooKeeperConnectionException,
                                     WatcherException {
    if (zk == null) {
      try {
        initializeZooKeeper();
      } catch (Exception e) {
        throw new ZooKeeperConnectionException(e);
      }
    }
    if (jobsInProgressWatcher == null) {
      try {
        initializeJobMonitor("JobsInProgress");
      } catch (OrbZKFailure e) {
        throw new WatcherException(e);
      }
    }
    return jobsInProgress;
  }
 
  /**
   * Sets the jobs that are in queue or in progress.
   *
   * @param jobs
   * @param nodeName
   */
  public void updateJobs(String[] jobs, String nodeName) {
    if (nodeName.equalsIgnoreCase("JobQueue")) {
      jobsInQueue = jobs;
    } else {
      jobsInProgress = jobs;
    }
    if (testingMode) {
      jobsLatch.countDown();
    }
  }
 
  /**
   * Also the use of countdownlatches to monitor when the servlet is receiving updates for testing purposes.
   *
   * @param jobsLatch
   * @param removeLatch
   * @param updateLatch
   */
  public void enterTestingMode(CountDownLatch jobsLatch,
                               CountDownLatch removeLatch,
                               CountDownLatch updateLatch) {
    this.jobsLatch = jobsLatch;
    this.removeLatch = removeLatch;
    this.updateLatch = updateLatch;
    this.testingMode = true;
  }
 
  /**
   * Used for testing purposes only
   */
  public void exitTestingMode() {
    this.testingMode = false;
  }
 
  /**
   * Used for testing purposes only
   */
  public void setUpdateLatch(CountDownLatch updateLatch) {
    this.updateLatch = updateLatch;
  }
 
  /**
   * Used for testing purposes only
   */
  public void setRemoveLatch(CountDownLatch removeLatch) {
    this.removeLatch = removeLatch;
  }
 
  /**
   * Used for testing purposes only
   */
  public void setJobsLatch(CountDownLatch jobsLatch) {
    this.jobsLatch = jobsLatch;
  }
 
  /**
   * Used for testing purposes only
   */
  public void setLatches(CountDownLatch jobsLatch, CountDownLatch removeLatch, CountDownLatch updateLatch) {
    this.jobsLatch = jobsLatch;
    this.removeLatch = removeLatch;
    this.updateLatch = updateLatch;
  }
 
  /**
   * Notify the servlet that node that should be watched has been deleted, so it will notice the creation of a
   * new node of the same type.
   */
  public void watcherNodeDeleted(String node) {
    if (node.equalsIgnoreCase("OrbTrackerLeaderGroup")) {
      leaderGroupWatcher = null;
      memberDataContainer.clear();
    } else if (node.equalsIgnoreCase("JobQueue")) {
      jobQueueWatcher = null;
      jobsInQueue = new String[0];
    } else if (node.equalsIgnoreCase("JobsInProgress")) {
      jobsInProgressWatcher = null;
      jobsInProgress = new String[0];
    }
  }
 
}
TOP

Related Classes of org.goldenorb.server.OrbTrackerMemberDataServiceImpl

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.