Package org.coweb

Source Code of org.coweb.SessionHandler

/**
* Copyright (c) The Dojo Foundation 2011. All Rights Reserved.
* Copyright (c) IBM Corporation 2008, 2011. All Rights Reserved.
*/
package org.coweb;

import java.util.concurrent.Semaphore;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import java.util.logging.Logger;

import org.cometd.bayeux.server.BayeuxServer;
import org.cometd.bayeux.Message;
import org.cometd.bayeux.server.ServerSession;
import org.cometd.bayeux.server.LocalSession;
import org.cometd.bayeux.server.ServerChannel;
import org.cometd.bayeux.server.ServerMessage;
import org.cometd.bayeux.server.ConfigurableServerChannel;

import org.coweb.oe.OperationEngineException;

import java.io.IOException;
import java.security.MessageDigest;

/**
* SessionHandler receives and handles all coweb protocol messages belonging to
* this session. Maintains a ServiceHandler for all incoming bot messages.
* Maintains a Moderator.
*/
public class SessionHandler implements ServerChannel.MessageListener {

  private static final String sendOnceKey =
    "698e141094de24baec7998994d6d32f4";

  private static final Logger log = Logger.getLogger(SessionHandler.class
      .getName());

  private String confKey = null;
  private boolean cacheState = false;
  private String sessionId = null;
  private ServiceHandler serviceHandler = null;
  private BayeuxServer server = null;
  private LateJoinHandler lateJoinHandler = null;
  private SessionModerator sessionModerator = null;
  private OperationEngineHandler operationEngine = null;
  private SessionManager manager;
  private LinkedList modSyncQueue = new LinkedList();

  private Semaphore modSyncSem = new Semaphore(1);
  private AtomicInteger order = new AtomicInteger(0);

  private String syncAppChannel = null;
  private String syncEngineChannel = null;
  private String rosterAvailableChannel = null;
  private String rosterUnavailableChannel = null;
  private String botRequestChannel = null;

  private String requestUrl = null;
  private String sessionName = null;

  private ArrayList<ServerSession> attendees = new ArrayList<ServerSession>();

  /**
   * Create a SessionHandler. There is one session handler keyed on cowebkey
   * and the cacheState boolean. This constructor sets up channel listeners,
   * creates the service handler, an operation engine, and a moderator
   * depending on the configuration settings.
   * @param confkey cowebkey that uniquely identifies a coweb application
   *                session.
   * @param cacheState Whether or not this session caches application state.
   * @param config Configuration options for this application. See <a href="http://opencoweb.org/ocwdocs/java/deploy.html#web-inf-web-xml">configuration documentation.</a>.
   */
  public SessionHandler(String confkey, boolean cacheState,
      Map<String, Object> config) {

    this.confKey = confkey;
    this.cacheState = cacheState;
    this.sessionId = hashURI(confkey);
    this.serviceHandler = new ServiceHandler(this.sessionId, config);
    this.manager = SessionManager.getInstance();

    this.server = this.manager.getBayeux();

    this.syncAppChannel = "/session/" + this.sessionId + "/sync/app";
    this.syncEngineChannel = "/session/" + this.sessionId + "/sync/engine";
    this.rosterAvailableChannel = "/session/" + this.sessionId
        + "/roster/available";
    this.rosterUnavailableChannel = "/session/" + this.sessionId
        + "/roster/unavailable";
    this.botRequestChannel = "/service/bot/%s/request";

    /* The following creates and listens to the app sync and engine
     * sync channels. */
    ServerChannel.Initializer initializer = new ServerChannel.Initializer()
    {
      @Override
      public void configureChannel(ConfigurableServerChannel channel) {
        channel.setPersistent(true);
        channel.setLazy(false);
      }
    };
    this.server.createIfAbsent(this.syncAppChannel, initializer);
    this.server.createIfAbsent(this.syncEngineChannel, initializer);

    ServerChannel sync = server.getChannel(this.syncAppChannel);
    sync.addListener(this);
    sync = server.getChannel(this.syncEngineChannel);
    sync.addListener(this);

    this.sessionModerator = SessionModerator.getInstance(this,
        (String)config.get("sessionModerator"), confKey);
    if (null == this.sessionModerator) {
      config.put("moderatorIsUpdater", false);
      /* Perhaps config.get("sessionModerator") had an exception or
       * didn't exist, so either we try to create default implementation
       * of moderator, or throw an exception. */
      log.severe("SessionModerator.getInstance(" +
          config.get("sessionModerator") + ") failed, reverting to " +
          "trying to create default implementation.");
      this.sessionModerator = SessionModerator.getInstance(this,
          null, confKey);
      if (null == this.sessionModerator) {
        throw new CowebException("Create SessionModerator", "");
      }
      log.severe("SessionModerator created default implementation, but " +
          "moderator can no longer be updater.");
    }

    // create the late join handler. clients will be updaters by default.
    boolean mustUseEngine = false;
    LateJoinHandler lh = null;
    if (config.containsKey("moderatorIsUpdater") &&
        ((Boolean) config.get("moderatorIsUpdater")).booleanValue()) {
      // If moderator is updater, the OperationEngine *must* be used!
      mustUseEngine = true;
      lh = new ModeratorLateJoinHandler(this, config);
    } else {
      log.info("creating LateJoinHandler");
      lh = new LateJoinHandler(this, config);
    }
    this.lateJoinHandler = lh;

    /* create the OT engine only if turned on in the config, or
     * moderatorIsUpdater. */
    boolean useEngine = config.containsKey("operationEngine") &&
      ((Boolean) config.get("operationEngine")).booleanValue();
    if (!useEngine && mustUseEngine) {
      log.warning("Must use OperationEngine because moderatorIsUpdater" +
          "==true, even though operationEngine is not set.");
      useEngine = true;
    }
    if (useEngine) {
      LocalSession modSession = this.sessionModerator.getLocalSession();
      Integer siteId = (Integer) modSession.getAttribute("siteid");

      try {
        log.info("creating operation engine with siteId = " + siteId);
        this.operationEngine = new OperationEngineHandler(this,
            siteId.intValue());
      } catch (OperationEngineException e) {
        e.printStackTrace();
      }
    }
    else {
      log.info("No op engine for this session");
    }
  }

  public SessionModerator getSessionModerator() {
    return this.sessionModerator;
  }

  public String getRequestUrl() {
    return this.requestUrl;
  }

  public void setRequestUrl(String url) {
    this.requestUrl = url;
  }

  public String getSessionName() {
    return this.sessionName;
  }

  public void setSessionName(String name) {
    this.sessionName = name;
  }

  public ServiceHandler getServiceHandler() {
    return this.serviceHandler;
  }

  public ArrayList<ServerSession> getAttendees() {
    return this.attendees;
  }

  public String getRosterAvailableChannel() {
    return this.rosterAvailableChannel;
  }

  public String getRosterUnavailableChannel() {
    return this.rosterUnavailableChannel;
  }

  public String getConfKey() {
    return this.confKey;
  }

  public String getSessionId() {
    return this.sessionId;
  }

  public boolean isCachingState() {
    return this.cacheState;
  }

  public String toString() {
    StringBuffer sb = new StringBuffer();
    sb.append("{\"confkey\":");
    sb.append(this.confKey);
    sb.append(",\"sessionid\":");
    sb.append(this.sessionId);
    sb.append("}");

    return sb.toString();
  }

  public ServerSession getServerSessionFromSiteid(String siteStr) {
    return this.lateJoinHandler.getServerSessionFromSiteid(siteStr);
  }

  /**
    * Called whenever a client sends an application message to the server.
    * This does not include /meta, /service, and /bot messages, which are
    * handled by different methods. This method handles application sync and
    * engine sync messages.
    *
    * The cometd implementation is free to have multiple threads invoke
    * onMessage for different incoming messages, so be aware of mutual
    * exclusion issues.
    */
  public boolean onMessage(ServerSession from, ServerChannel channel,
      ServerMessage.Mutable message) {

    Integer siteId = (Integer) from.getAttribute("siteid");

    String msgSessionId = (String) from.getAttribute("sessionid");
    if (!msgSessionId.equals(this.sessionId)) {
      log.severe("Received message not belonging to this session " +
          msgSessionId);
      return true; /* TODO why are we sending this message to clients, if
              the message doesn't belong to us? */
    }

    Map<String, Object> data = message.getDataAsMap();
    if (data.get("__alreadySent") == SessionHandler.sendOnceKey) {
      /* The message came from onMessage manually publishing a sync. We
       * simply forward it to clients. */
      data.remove("__alreadySent");
      return true;
    }
   
    this.lateJoinHandler.clearCacheState();

    data.put("siteId", siteId);

    /* Some of the following code must acquire this.operationEngine's lock.
     * OperationEngine must only be accessed by one client at a time. */
    String channelName = message.getChannel();
    if (channelName.equals(this.syncAppChannel)) {
      // put total order on message
      data.put("order", this.order.getAndIncrement());

      if (this.operationEngine != null) {
        synchronized (this.operationEngine) {
          return this.doAppSync(from, channel, message, data);
        }
      }
    } else if (channelName.equals(this.syncEngineChannel)) {
      if (operationEngine != null) {
        synchronized (this.operationEngine) {
          this.operationEngine.engineSyncInbound(data);
        }
      }
    } else {
      log.warning("onMessage does not handle channel messages '" +
          channelName + "'\n    message = " + message);
    }

    return true;
  }


  private boolean doAppSync(ServerSession from, ServerChannel channel,
      ServerMessage.Mutable message, Map<String, Object> data) {
    Map<String, Object> syncEvent = this.operationEngine.syncInbound(data);
    if (syncEvent != null) {
      try {
        this.modSyncSem.acquire();
      } catch (InterruptedException ie) {
        log.warning("onMessage interrupted, won't deliver sync to" +
            " moderator or other listeners");
        return false;
      }
      this.sessionModerator.onSync(from.getId(), syncEvent);
      data.put("__alreadySent", SessionHandler.sendOnceKey);
      /* In order to satisfy ordering requirements, we must send the
       * remote client's sync to other clients, then send any syncs
       * generated by the moderator. Thus, we manually publish the sync
       * instead of returning true from onMesage(). This is the only way
       * we can possibly publish the remote client sync *first*. */
      channel.publish(from, message);
      /* Now, send any moderator syncs. */
      this.flushModeratorQueue();
      this.modSyncSem.release();
      return false;
    }
    return true;
  }

  /**
   * Handles bayeux messaegs on the channels below.
   *   <li> /session/roster/*
   *   <li> /service/session/join/*
   *   <li> /service/session/updater
   *   <li> /service/bot/**
   *   <li> /bot/**
   *
   * Specifically, onPublish handles clients sending private bot messages
   * and when updaters send full application state to the server.
   */
  public void onPublish(ServerSession remote, Message message) {
    String channel = message.getChannel();
    try {
      if (channel.startsWith("/service/bot")) {
        /* Moderator can always make service requests. */
        String svcName = ServiceHandler.getServiceNameFromMessage(
            message, false);
        Map<String, Object> botData = message.getDataAsMap();
        botData = (Map<String, Object>)botData.get("value");
        if (this.sessionModerator.getServerSession() == remote ||
            this.sessionModerator.canClientMakeServiceRequest(
              svcName, remote.getId(), botData)) {
          this.serviceHandler.forwardUserRequest(remote, message);
        } else {
          this.serviceHandler.userCannotPost(remote, message);
        }
      } else if (channel.equals("/service/session/updater")) {
        this.lateJoinHandler.onUpdaterSendState(remote, message);
      } else {
        log.warning("onPublish doesn't handle '" + channel +
            "' messages\n  message = " + message);
      }
    } catch (Exception e) {
      log.severe("error receiving publish message");
      log.severe(message.getJSON());
      e.printStackTrace();
    }
  }

  /**
   * Called anytime a client posts to /meta/subscribe. SessionHandler needs
   * to know when (1) a client attempts to join a session, (2) a client wants
   * to subscribe to a bot, and (3) when a client wishes to become an updater.
   * Other subscriptions are ignored (i.e. we don't wish to know about them).
   * @param serverSession Client wishing to subscribe to some channel.
   * @param message Contains channel the client wishes to subscribe to.
   */
  public void onSubscribe(ServerSession serverSession, Message message)
      throws IOException {
    String channel = (String) message.get(Message.SUBSCRIPTION_FIELD);
    if (channel.equals("/service/session/join/*")) {
      Map<String, Object> userDefined= (Map<String, Object>)
        message.getExt().get("userDefined");
      if (this.sessionModerator.canClientJoinSession(
            serverSession.getId(), userDefined)) {
        if (this.lateJoinHandler.onClientJoin(serverSession, message)) {
          this.sessionModerator.onSessionReady();
        }
      } else {
        this.sendError(serverSession, "join-disallowed");
      }
    } else if (channel.startsWith("/service/bot") ||
        channel.startsWith("/bot")) {
      /* Client wishes to subscribe to service messages (broadcasts and
       * private bot messages). Moderator can always make service
       * requests. */
      boolean pub = true;
      if (channel.startsWith("/service"))
          pub = false;
      String svcName = ServiceHandler.getServiceNameFromMessage(message,
          pub);

      if (this.sessionModerator.getServerSession() == serverSession ||
          this.sessionModerator.canClientSubscribeService(
            svcName, serverSession.getId())) {
        this.serviceHandler.subscribeUser(serverSession, message);
      } else {
        this.serviceHandler.userCannotSubscribe(serverSession, message);
      }

    } else if (channel.endsWith("/session/updater")) {
      /* Client lets the server know it is an updater. */
      log.info("client subscribes to /session/updater");
      this.attendees.add(serverSession);
      this.lateJoinHandler.onUpdaterSubscribe(serverSession, message);
      this.sessionModerator.onClientJoinSession(serverSession.getId());
    }
  }

  /**
   * Subscribes the moderator to service message broadcasts.
   * @param svcName The service name.
   */
  void subscribeModeratorToService(String svcName) {
    try {
      this.serviceHandler.subscribeModerator(this.sessionModerator,
          svcName);
    } catch (Exception e) {
      log.severe("error subscribing moderator to service " + svcName);
      e.printStackTrace();
    }
  }

  private void sendError(ServerSession client, String topic) {
    Map<String, Object> map = new HashMap<String, Object>();
    map.put("topic", topic);
    client.deliver(this.manager.getServerSession(), "/session/error",
        map, null);
  }

  /**
   * Called when a client unsubscribes to a channel (via posting to
   * /meta/unsubscribe). We need to know when a client wishes to stop
   * listening to bot messages.
   */
  public void onUnsubscribe(ServerSession serverSession, Message message)
      throws IOException {
    String channel = (String) message.get(Message.SUBSCRIPTION_FIELD);
    if (channel.startsWith("/service/bot") || channel.startsWith("/bot")) {
      /* Client wishes to unsubscribe to service requests. */
      this.serviceHandler.unSubscribeUser(serverSession, message);
    }
    return;
  }

  public void onPurgingClient(ServerSession client) {
    this.attendees.remove(client);
    this.sessionModerator.onClientLeaveSession(client.getId());
    boolean last = this.lateJoinHandler.onClientRemove(client);
    if (last) {
      this.endSession();
    }
  }

  public static String hashURI(String url) {
    String hash = null;
    try {
      String t = Long.toString(System.currentTimeMillis());
      url = url + t;
      byte[] bytes = url.getBytes("UTF-8");
      MessageDigest md = MessageDigest.getInstance("MD5");
      byte[] digest = md.digest(bytes);

      StringBuffer sb = new StringBuffer();
      for (int i = 0; i < digest.length; i++) {
        sb.append(Integer.toString((digest[i] & 0xff) + 0x100, 16)
            .substring(1));
      }

      hash = sb.toString();

    } catch (Exception e) {
      hash = url;
    }

    return hash;
  }

  /**
   * Remove a buggy or malicious client from a session.
   * @param client The client to be removed.
   */
  public void removeBadClient(ServerSession client) {
    /* TODO this is clearly a NOP... */
    return;
  }

  /**
   * Called when all remote clients leave a OCW session. This method notifies
   * the moderator, the late join handler, the service handler of the session
   * ending. The server removes itself as a listener to sync channels.
   */
  public void endSession() {
    log.info("SessionHandler::endSession ***********");

    ServerChannel sync = this.server.getChannel(this.syncAppChannel);
    sync.removeListener(this);

    sync = server.getChannel(this.syncEngineChannel);
    sync.removeListener(this);

    // The following ordering must be observed!
    this.sessionModerator.endSession();
    if (null != this.operationEngine)
      this.operationEngine.shutdown();
    this.lateJoinHandler.onEndSession();
    this.serviceHandler.shutdown();

    // just to make sure we are not pinning anything down.
    this.sessionModerator = null;
    this.operationEngine = null;
    this.lateJoinHandler = null;
    this.serviceHandler = null;

    this.manager.removeSessionHandler(this);
  }
 
  /**
   * Publishes a local op engine sync event to the /session/ID/sync/engine
   * bayeux channel.
   * @param sites context int array context vector for this site
   */
  public void postEngineSync(Integer[] sites) {
    ServerChannel sync = this.server.getChannel(this.syncEngineChannel);
   
    HashMap<String, Object> data = new HashMap<String, Object>();
    data.put("context", sites);
   
    // We publish *from* the LocalSession.
    sync.publish(this.sessionModerator.getLocalSession(), data, null);
  }

  /**
    * Retreives the four element Object engine state array and returns it.
    * @return engine state array
    */
  public Object[] getEngineState() {
    Object[] ret;
    synchronized (this.operationEngine) {
      ret = this.operationEngine.getEngineState();
    }
    return ret;
  }

  /**
   * Publish a mesage from the moderator to all listening clients. This method
   * doesn't actually send anything directly, but rather it delegates work
   * to OperationEngineHandler which sends the event and pushes the op to the
   * local operation engine.
   * @param name Collab topic (includes ^coweb.sync. and collabId$).
   * @param value Application value.
   * @param type One of {"insert", "delete", "update", null}.
   * @param position Position where sync event is to be applied.
   */
  public void publishModeratorSync(String name, Object value, String type,
      int position) {
    this.operationEngine.localSync(name, value, type, position);
  }

  /**
   * Sends message on the /session/ID/sync/app channel. This is called from
   * OperationEngineHandler to actually put the message on the wire.
   */
  public void sendModeratorSync(Object message) {
    /* We act as if the moderator is just another client participating in
     * the session. This method is called from the OperationEngineHandler,
     * which means the op engine pushed the operation through as a local op.
     *
     * When this message is sent to the "server," the op engine pushes
     * through the exact same operation as a remote op. This mimics the
     * behavior of browser clients pushing local ops through their engines,
     * then pushing the same exact op through as a remote op when the server
     * forwards the op back to the same browser client. The op engine knows
     * to disregard the op, since it has already been pushed through once as
     * a local op.
     *
     * SessionModerator.onSync() may call this method (indirectly), in which
     * case the modSyncSem semaphore will already be exhausted of permits.
     * Thus, we try to acquire the semaphore, and if we fail, we wait until
     * onSync() returns to flush the queue. */
    synchronized(this.modSyncQueue) {
      this.modSyncQueue.push(message);
    }
    /* TODO Memory fence? */
    if (this.modSyncSem.tryAcquire()) {
      this.flushModeratorQueue();
      this.modSyncSem.release();
    }
  }

  /* Protected by modSyncSem. */
  private void flushModeratorQueue() {
    ServerChannel channel = this.server.getChannel(this.syncAppChannel);
    synchronized(this.modSyncQueue) {
      for (Object message: this.modSyncQueue)
        channel.publish(this.sessionModerator.getLocalSession(),
            message, null);
      this.modSyncQueue.clear();
    }
  }

  /**
   * Find the service request channel.
   * @param svc The channel name.
   * @return The server request channel.
   */
  private ServerChannel getServiceRequestChannel(String svc) {
    String ch = String.format(this.botRequestChannel, svc);
    this.server.createIfAbsent(ch);
    return this.server.getChannel(ch);
  }

  /**
   * Post a bot service message. This actually sends the message on the wire.
   * @param service Bot service name.
   * @param topic Bots expect a unique token to know which request to reply to
   * @param params JSON encodable message.
   */
  public void postModeratorService(String service, String topic,
      Map<String, Object> params) {
    ServerChannel channel = this.getServiceRequestChannel(service);
    Map<String, Object> message = new HashMap<String, Object>();
    message.put("value", params);
    message.put("topic", topic);
    message.put("service", service);
    channel.publish(this.sessionModerator.getLocalSession(), message, null);
  }

}
TOP

Related Classes of org.coweb.SessionHandler

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.