Package org.openqueue.oqtane

Source Code of org.openqueue.oqtane.OQUserCmdHandler

/* oqtane  (OpenQueue server classes and daemon)
*
* Copyright (c) 2000 oqtane Development Team (See Credits file)
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307 USA
*/

package org.openqueue.oqtane;

import java.net.*;
import java.io.*;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Date;
import java.util.Random;
import java.util.Vector;
import java.util.Enumeration;
import java.util.StringTokenizer;
import java.util.Hashtable;


public class OQUserCmdHandler extends Thread {
  static String OQprotocolVersion = "0.50";
  public static Vector userCmdHandlers;
  static Hashtable responseCodes;

  private OQServer oqServer;
  protected boolean shouldRun = true;
  private long  lConnectionID = 0;
  private String aCmdTag = new String ();
  protected Socket userCmdSocket;
  private String authorizedUserName;   // this is "" until they give a valid password.
  private int authorizedUserID;
  protected OQUser thisUser;
  private int state = 0;
  private StreamLock outputStreamLock = new StreamLock();
  private String modeRequestString;
  protected OQUpdater theUpdater = null;
  protected boolean ReadyForUpdates = false;
  private DataInputStream din;
  private DataOutputStream dout;
  private String sServerNonce = null;
  private final String modesAvailable= new String("MODES=ALIVE");

  static final int state_UNAUTHORIZED = 0;
  static final int state_READY = 1// set after password is accepted.
  static final int state_COMMANDRECEIVED = 2; // looking for a blank line to end the command.
  static final int state_SENDING = 3;   // set to receive data for a message.

  static final int rc_OK = 200;
  static final int rc_BYE = 201;
  static final int rc_READYFORSEND = 202;
  static final int rc_RECEIVED = 203;
  static final int rc_CLIENTERROR = 300;
  static final int rc_UNKNOWNNAMEPASSWORD = 301;
  static final int rc_UNKNOWNCOMMAND = 302;
  // static final int rc_WRONGPASSWORD = 303;
  static final int rc_ERRORLOGGINGOUT = 304;
  static final int rc_UNKNOWNTOPICID = 305;
  static final int rc_NOSENDPERMISSION = 306;
  static final int rc_MODEUNAVAILABLE = 307;
  static final int rc_SYNTAXERROR = 308;
  static final int rc_NOREADPERMISSION = 309;
  static final int rc_UNKNOWNITEMID = 310;
  static final int rc_ALREADYLOGGEDIN = 311;
  static final int rc_CANNOTSUBSCRIBE = 320;
  static final int rc_CANNOTUNSUBSCRIBE = 321;
  static final int rc_CANNOTSETCURSOR = 322;
  static final int rc_SERVERERROR = 400;
  static final int rc_SERVERMAXCONN = 505;


  // init the class variables
  static {
    userCmdHandlers = new Vector ();
    responseCodes = new Hashtable ();

    responseCodes.put (Integer.toString(rc_OK), "OK.");
    responseCodes.put (Integer.toString(rc_BYE), "BYE.");
    responseCodes.put (Integer.toString(rc_CLIENTERROR), "Client Error.");
    responseCodes.put (Integer.toString(rc_UNKNOWNNAMEPASSWORD), "User name or password not recognized.");
    // responseCodes.put (Integer.toString(rc_WRONGPASSWORD), "Password not recognized.");
    responseCodes.put (Integer.toString(rc_UNKNOWNCOMMAND), "Unknown command.");
    responseCodes.put (Integer.toString(rc_ERRORLOGGINGOUT), "Error logging out.");
    responseCodes.put (Integer.toString(rc_UNKNOWNTOPICID), "Topic ID not recognized.");
    responseCodes.put (Integer.toString(rc_UNKNOWNITEMID), "Item ID not recognized.");
    responseCodes.put (Integer.toString(rc_NOSENDPERMISSION), "You do not have permission to send to this topic.");
    responseCodes.put (Integer.toString(rc_NOREADPERMISSION), "You do not have permission to read from this topic.");
    responseCodes.put (Integer.toString(rc_MODEUNAVAILABLE), "No mode is available which matches your request.");
    responseCodes.put (Integer.toString(rc_READYFORSEND), "Ready for message to be sent; end with '.' on its own line.");
    responseCodes.put (Integer.toString(rc_SYNTAXERROR), "Syntax error.");
    responseCodes.put (Integer.toString(rc_RECEIVED), "Received.");
    responseCodes.put (Integer.toString(rc_ALREADYLOGGEDIN), "You are already logged in on a different connection.");
    responseCodes.put (Integer.toString(rc_CANNOTSUBSCRIBE), "Cannot subscribe to the topic.");
    responseCodes.put (Integer.toString(rc_CANNOTUNSUBSCRIBE), "Cannot unsubscribe from the topic.");
    responseCodes.put (Integer.toString(rc_CANNOTSETCURSOR), "Cannot set cursor position.");
    responseCodes.put (Integer.toString(rc_SERVERERROR), "Server error.");
    responseCodes.put (Integer.toString(rc_SERVERMAXCONN), "Server too busy. Maximum connections in use.");

  }

  public OQUserCmdHandler(OQServer aServer, Socket aSocket, long lTotalConnections) {
    oqServer = aServer;
    lConnectionID = lTotalConnections;
    userCmdSocket = aSocket;
    authorizedUserName = new String("");
    modeRequestString = new String("");
    authorizedUserID = 0;
    setName("BUCHndlr for connID" + getlConnectionID() +")" );
  }

  public boolean getReadyForUpdates() {
    return ReadyForUpdates;
  }

  public int prepareToDie() {
    shouldRun = false;

    if (userCmdSocket != null) {
      try {
        userCmdSocket.close();
        userCmdSocket = null;
      } catch (Exception e) {
      }
    }

    if (theUpdater != null) {
      synchronized (theUpdater) {
        theUpdater.prepareToDie();
        try {
          theUpdater.shouldRun = false;
          theUpdater.start();
        } catch (Exception ex) {
        };
        try {
          theUpdater.notify();
        } catch (Exception ex) {
        };

        try {
          theUpdater.join(3000);
        } catch (InterruptedException ex) {
          OQServer.report(OQServer.repDebug, "Exception in joining updater to kill it: "+ex.getMessage()+"    ");
        }
        if (theUpdater.isAlive()) {
          try {
            OQServer.report(OQServer.repDebug, "Updater still alive for " + theUpdater.getName() +". updater shouldRun =" + theUpdater.shouldRun + ", runstarted =" + theUpdater.isRunStarted() + " ");

            try {
              theUpdater.notify();
              sleep(1);
              theUpdater.join(3000);
              if (theUpdater.isAlive()) {
                OQServer.report(OQServer.repDebug, "Second try, Updater still alive for " + theUpdater.getName() +". updater shouldRun =" + theUpdater.shouldRun + ", runstarted =" + theUpdater.isRunStarted() + " ");
              }
            } catch (InterruptedException ex) {
              OQServer.report(OQServer.repDebug, "Exception in joining updater to kill it: "+ex.getMessage()+"    ");
            }

          } catch (Exception ex) {
            OQServer.report(OQServer.repDebug, "Exception in checking updater to kill it: "+ex.getMessage()+"    ");
          }
        }

      } // synch
    }

    if (thisUser != null) {
      // try to ensure we wrap up this user OK.
      thisUser.putAddress(null);
      thisUser.removeAllToDoItems();
      thisUser.prepareToKill();

      thisUser = null;
    }

    return 0; // 0 means no errors.

  }

  public String getAuthorizedUserName() {
    return authorizedUserName;
  }

  public long getlConnectionID() {
    return lConnectionID;
  }

  public int getUserType() {
    if (thisUser != null) {
      return thisUser.getUserType();
    } else {
      return 0;
    }
  }

  public static OQUserCmdHandler getBylConnectionID(long aConnID) {
    OQUserCmdHandler result = null;
    synchronized (OQUserCmdHandler.userCmdHandlers) {
      OQUserCmdHandler aBCH;

      Enumeration enum = OQUserCmdHandler.userCmdHandlers.elements ();
      while ((enum.hasMoreElements ()) && (result == null)) {
        aBCH = (OQUserCmdHandler) enum.nextElement ();
        if ( aBCH.getlConnectionID() == aConnID) { result = aBCH; }
      }
    }
    return result;
  }

  /* Note: there is no setState method, because
   * state is set through the handling of the OQ session.
   */
  public int getState() {
    return state;
  }

  public int getAuthorizedUserID() {
    return authorizedUserID;
  }

  public void setAuthorizedUserName(String s) {
    authorizedUserName = s;
  }

  public void setAuthorizedUserID(int id) {
    authorizedUserID = id;
  }

  public void printResponseHeader (int responseCode) {
    DataOutputStream d;
    try {
    String s = Integer.toString (responseCode);
    d = grabOutputStream();

    if (aCmdTag.equals("")) {
      d.writeBytes (s + " - " + responseCodes.get (s) + "\r\n\r\n");
    } else {
      d.writeBytes (aCmdTag + " " + s + " - " + responseCodes.get (s) + "\r\n\r\n");
    }

    d.flush ();
    releaseOutputStream();
    } catch (IOException ex) {
      // what to do?
    }
  }

  public void printResponseHeaderWithoutGrabbing (DataOutputStream d, int responseCode) {
    try {
    String s = Integer.toString (responseCode);
    if (aCmdTag.equals("")) {
      d.writeBytes (s + " - " + responseCodes.get (s) + "\r\n");
    } else {
      d.writeBytes (aCmdTag + " " + s + " - " + responseCodes.get (s) + "\r\n");
    }
    d.flush ();
    } catch (IOException ex) {
      // what to do?
    }
  }

  /* Check the "MODE=" parameter to see if we have at
   * least one requested mode.
   */
  private boolean IsAvailableMode(String s) {
    if (s != null) {
      if (s.toUpperCase().startsWith("MODE=")) {
        s = s.substring(5);
      }

      if ((s.indexOf("ALIVE") > -1) || (s.indexOf(",ALIVE"))  > -1) {
        return true;
      } else {
        return false;
      }
    } else {
      return false;
    }
  }

  public   DataOutputStream grabOutputStream() {
    outputStreamLock.grabMe();
    return dout;
  }

  public   void releaseOutputStream() {
    outputStreamLock.releaseMe();
  }

  /* returns number of bytes written out on this connection */
  protected int dataWrittenSize() {
    if (dout == null) {
      return 0;
    } else {
      return dout.size();
    }
  }

  private String userAuthenticates(Hashtable hashAuthParams, String theUserPassword)  {
    String sResponse = null; //null=failure, non-null=authenticated
    try {
      MD5 md5Hash = null;
      if (theUserPassword == null) { theUserPassword = "";};
      String aUser     = (String) hashAuthParams.get("USER");
      String aType     = (String) hashAuthParams.get("AUTHTYPE");
      String aPassword = (String) hashAuthParams.get("PASSWORD");

      if (aType.equals("plaintext")) {
        if (theUserPassword.equals(aPassword)) {
          sResponse = ""; // will be treated as success, but no response will be printed.
        }
      } else {

        if (aType.equals("simple-md5")) {
          // value = md5( user:nonce:password )
          String aValue = (String) hashAuthParams.get("HASH");
          String aUserNonce = (String) hashAuthParams.get("USERNONCE");
          if (aUserNonce.equals("") || aValue.equals("")) {
            // Do not accept user if they haven't included a nonce and value.
            // This protects them from stupidity.
          } else {
            md5Hash = new MD5(sServerNonce+":"+aUser+":"+aUserNonce+":"+theUserPassword);
            String sMD5asHex = md5Hash.asHex();
            if (aValue.equals(sMD5asHex)) {
              md5Hash = new MD5(sServerNonce+":"+aUserNonce+":"+theUserPassword);
              sResponse = md5Hash.asHex();
            }
          }
        }  // end of simple-md5

      }

    } catch (Exception e) {
    }
    return sResponse;
  }

  public void run()  {
    int anID;
    int aTopicID = 0;
    int anItemID;
    int anAckID; // acknowledgment for the server-generated transaction number
    OQTopic tempTopic;
    OQItem tempItem;


    userCmdHandlers.addElement (this);
    String lineRead;
    StringTokenizer  sTokenizer = null;
    OQTopic theTopic = null;
    String aCommand = new String ();
    String aName = new String ();
    String aTopic = new String ();
    String aPassword = new String ();
    String messageData = new String ();
    String theHeaders = new String ();
    boolean sendEndsWithDot = true;
    boolean sentItemFullyReceived = false;
    boolean sentFirstLineOfCommand = false;
    String receivedSubject = new String ("");
    boolean seenBlankLine = false// true when all headers for this message are done.
    int declaredContentLength = -1;    // XXXXXXXXX this limitation must be removed!
    DataOutputStream d;
    String sAuthenticationMethod = "";
    String aNameValue; // helper variables to split name=value pairs
    String sNVname ="";
    String sNVvalue="";
    StringTokenizer  sNVTokenizer = null;
    Hashtable hashAuthParams = new Hashtable ();
    int theUserID = 0;
    int theUserType = 0;
    String theUserPassword = ""// password read in from database
    String sql = "";

    thisUser = null;
    hashAuthParams.put("MODE", "ALIVE")// default
    hashAuthParams.put("AUTHTYPE", "plaintext")// default
    try {
      try {
        din = new DataInputStream (new BufferedInputStream (userCmdSocket.getInputStream ()));
        dout = new DataOutputStream (new BufferedOutputStream (userCmdSocket.getOutputStream ()));

        Random rServerNonce = new Random();
        sServerNonce = Long.toHexString(rServerNonce.nextLong());
        sServerNonce = sServerNonce + Long.toHexString(System.currentTimeMillis());
        sServerNonce = sServerNonce.substring(0,7) + sServerNonce.substring(sServerNonce.length()-8);

        dout.writeBytes ("OPENQUEUE=" + OQprotocolVersion + " " + "SERVER=" + OQServer.getServerShortDesc() + " " + modesAvailable + " NONCE=" + sServerNonce +"\r\n\r\n");
        dout.flush ();

        while ((shouldRun == true) && ((lineRead = din.readLine ()) != null)) {

            if (state != state_SENDING) {
              if (sentFirstLineOfCommand == false) {
                sTokenizer = new StringTokenizer (lineRead, " ");
                try {
                  aCommand = "";
                  aCmdTag = "";
                  if (state == state_READY) {

                    /* About to get cmdTag */
                    if (sTokenizer.hasMoreTokens()) aCmdTag = sTokenizer.nextToken();
                  }

                  /* About to parse out aCommand */
                  if (sTokenizer.hasMoreTokens()) aCommand = sTokenizer.nextToken();
                } catch (Exception ex) {
                  OQServer.report(OQServer.repDebug, "Error tokenizing command line. " + ex);
                }
              }
            }

            /* process the command, if supported */
            switch (state) {

              case state_UNAUTHORIZED :
                if (sentFirstLineOfCommand ==false){
                  if (aCommand.toUpperCase().equals ("LOGIN")) {
                    if (sTokenizer.hasMoreTokens()) aName = sTokenizer.nextToken();
                    if (sTokenizer.hasMoreTokens()) aPassword= sTokenizer.nextToken();
                    if (sTokenizer.hasMoreTokens()) modeRequestString = sTokenizer.nextToken();

                    if (modeRequestString.toUpperCase().startsWith("MODE=")) {
                      hashAuthParams.put("MODE", modeRequestString.substring(5));
                    } else {
                      hashAuthParams.put("MODE", modeRequestString);
                    }
                    // sAuthenticationMethod = "" still
                    hashAuthParams.put("USER", aName);
                    hashAuthParams.put("PASSWORD", aPassword);
                  }
                  if (aCmdTag.startsWith("!")) {
                    if (sTokenizer.hasMoreTokens()) aCommand = sTokenizer.nextToken();
                  }
                  if (aCommand.toUpperCase().equals ("AUTH")) {
                    if (sTokenizer.hasMoreTokens()) {
                      sAuthenticationMethod = sTokenizer.nextToken();
                    } else {
                      // hmmm... no authentication method specified?
                    }
                  }
                  sentFirstLineOfCommand = true;
                }

                if (lineRead.equals("")) {

                  aName     = (String) hashAuthParams.get("USER");
                  aPassword = (String) hashAuthParams.get("PASSWORD");

                  // System.out.println("hashAuthParams = " + hashAuthParams.toString());
                  if (IsAvailableMode(hashAuthParams.get("MODE").toString().toUpperCase() )) {
// yyy start
                    if (hashAuthParams.get("USER").toString().toUpperCase().equals("GUEST")) {
                      // setThisUserAsGuest(hashAuthParams, d)
                      // public boolean setThisUserAsGuest(){
                        thisUser = new OQUser(oqServer, "guest", "password", 0, true, 0);
                        setAuthorizedUserName ("guest");
                        /* user id remains 0. */
                        d = grabOutputStream();
                        printResponseHeaderWithoutGrabbing  (d, rc_OK);
                        theUpdater = new OQUpdater(this);
                        thisUser.handleLogin (this, userCmdSocket, d, OQUser.login_LASTITEM);
                        d.writeBytes("\r\n");
                        d.flush();
                        theUpdater.start();
                        state = state_READY;
                        releaseOutputStream();
                      // }
                      // ********************
                    } else {
                      thisUser = OQUser.getByName(aName);

// bbb start
                      /* if not already in memory, look it up in database. */
                      if (thisUser == null) {
                        OQDBAccess aDB = oqServer.GetDB();
                        synchronized(aDB) {
                          try {
                            sql = OQDBAccess.getSQLCommand(OQDBAccess.sql_GETUSERIDANDPASSWORDFROMNAME, aName);
                            ResultSet rs = aDB.getStatement().executeQuery( sql );
                            OQUser bUser = null;

                            while(rs.next()) {
                              theUserPassword =  rs.getString(1);
                              theUserID = (int) rs.getLong(2);
                              theUserType = (int) rs.getLong(3);

                              /* Discovered user in database...
                               * MUST now create a user object and set the password,
                               * look up subscriptions, etc.
                               */
                              bUser = new OQUser (oqServer, aName, aPassword, theUserID, false, theUserType);
                            }
                            rs.close();
                            if (bUser != null) {
// was getting subs here
                              thisUser = bUser;
                            else {
                              /* No match from user database */
                            }
                          }  catch    (Exception ex) {
                            OQServer.report(OQServer.repError, "Exception trying to find user in database. " + ex.getMessage() );
                          }
                        } // synchronized
                      }
// bbb end
                    }


                    if (thisUser == null) {
                      aName = "";
                      aPassword = "";
                      printResponseHeader (rc_UNKNOWNNAMEPASSWORD); //  user name or password not recognized.
                    } else {
                        // if (thisUser.password.equals(aPassword)) {
                        String sAuthResponse = userAuthenticates(hashAuthParams, theUserPassword);
                        if (sAuthResponse != null) {

                          /* Are we already logged in? Only one connection per user allowed for now. */
                          if ( thisUser.isLoggedIn() ) {
                            printResponseHeader (rc_ALREADYLOGGEDIN);
                          } else {

                            /* Mark user as logged in, and send OK response. */
                            setAuthorizedUserName (aName);
                            OQDBAccess aDB = oqServer.GetDB();
                            synchronized (aDB) {
                              sql = OQDBAccess.getSQLCommand(OQDBAccess.sql_USERIDFROMNAME, aName);
                              long lng = aDB.Get1KnownLong( sql );
                              setAuthorizedUserID( (int) lng );
                            }
                            state = state_READY;

                            d = grabOutputStream();
                            printResponseHeaderWithoutGrabbing  (d, rc_OK);
                            if (sAuthResponse.equals("") == false) {
                              d.writeBytes("Response: " + sAuthResponse + "\r\n");
                            }
                            /* Don't send final d.writeBytes("\r\n"); until
                             * response is done from handleLogin.
                             */
                            theUpdater = new OQUpdater(this);

                            sql = OQDBAccess.getSQLCommand(OQDBAccess.sql_LOADUSERSUBS, String.valueOf(theUserID) );
                            ResultSet rs2 = aDB.getStatement().executeQuery( sql );
                            while(rs2.next()) {
                              int iTmpTopicID;
                              int iTmpLastSeen;
                              int iTmpFlags;
                              int iTmpMaxDefault;

                              iTmpTopicID = (int) rs2.getInt(1);
                              iTmpLastSeen =  (int) rs2.getLong(2);
                              iTmpFlags =  (short) rs2.getInt(3);
                              iTmpMaxDefault =  (int) rs2.getLong(4);

                              OQSubscription aSubscription = new OQSubscription( iTmpTopicID, iTmpLastSeen, iTmpFlags );
                              aSubscription.setMaxQueued( iTmpMaxDefault );
                              aSubscription.lookupTopicPointer();
                              thisUser.subscriptions.addElement (aSubscription);
                            }
                            rs2.close();

                            thisUser.handleLogin (this, userCmdSocket, d, OQUser.login_LASTITEM);
                            d.writeBytes("\r\n");
                            d.flush();
                            theUpdater.start();
                            releaseOutputStream();
                          }
                        } else {
                          printResponseHeader (rc_UNKNOWNNAMEPASSWORD); //  logged in OK.
                        }

                    }
// yyy end
                  } else {
                    printResponseHeader (rc_MODEUNAVAILABLE);
                  }
                  sentFirstLineOfCommand = false;
                } else {

                  // Record additional AUTH parameters if we're beyond first line.
                  if ((sentFirstLineOfCommand == true) && (lineRead.startsWith("!")==false) ) {
                    int iNameValueBoundary = lineRead.indexOf(58);
                    if (iNameValueBoundary > 0) {
                      try {
                        sNVname = lineRead.substring(0,iNameValueBoundary);
                        sNVvalue = lineRead.substring(iNameValueBoundary+1);
                      } catch (Exception e) {
                      }
                      if (sNVname != null) { sNVname = sNVname.trim().toUpperCase(); }
                      if (sNVvalue != null) { sNVvalue = sNVvalue.trim(); }
                      hashAuthParams.put(sNVname, sNVvalue);

                    } else {
                      // unexpected. We'll ignore for now.
                    }
                  }  // end of test for authentication method


                }

              break;

              case state_SENDING :
                if (sendEndsWithDot && lineRead.equals(".")) {    // end of message. Post it.
                  sentItemFullyReceived = true;
                } else {   // contains message data or header line.
                  if (seenBlankLine) {
                    if (messageData.equals("")) {
                            messageData = lineRead;
                    } else {
                          messageData = messageData + "\r\n" + lineRead;
                    }
                  } else /* process header line.  */
                    if (lineRead.equals("")) {
                      seenBlankLine = true;     // everything after this is message data.
                      if (sendEndsWithDot == false) {
                        /* receive all message content now. */
                        byte b[] = new byte[declaredContentLength];
                        try {
                          din.readFully(b, 0, declaredContentLength);
                        } catch (Exception ex) {
                          /* ? what to do ? */
                        }
                        StringBuffer sb = new StringBuffer(declaredContentLength);
                        int i;
                        for (i=0; i < declaredContentLength; i++) {
                          sb.append( (char) b[i] );   // Character(c));
                        }
                        messageData = sb.toString();
                        sentItemFullyReceived = true;
                      }
                    } else /* process real header line. */

                                                                                  if (theHeaders.equals("")) {
                          theHeaders = lineRead;
                      } else {
                          theHeaders = theHeaders + "\r\n" + lineRead;
                      }

                      String headerType = lineRead.toUpperCase();

                      if (headerType.startsWith ("SUBJECT:")) {
                        receivedSubject = lineRead.substring(9, lineRead.length());
                      }

                      if (headerType.startsWith ("CONTENT-LENGTH:")) {
                        try {
                          declaredContentLength = Integer.parseInt(lineRead.substring(16, lineRead.length()));
                          sendEndsWithDot = false;
                        } catch (RuntimeException ex) {
                          OQServer.report(OQServer.repDebug, "Invalid content length declaration. " + ex);
                          /* ??? return error to client? */
                        }
                      }

                    }
                  }
                }

                if (sentItemFullyReceived) {
               
                  /* Now let updater write out stuff if it wants;
                   * We're finished with focusing on user input.
                   */

                   // ??? shouldn't this be after we print response header?
                  releaseOutputStream();

                  /* go back to accepting commands. */
                  state = state_READY;

                  /* Save message, then tell sender we got the whole message. */
                  try {
                    OQItem theItem;
                    if (receivedSubject.equals("")) {

                      /* stick the Subject at the front of theHeaders, if headers exist. */
                      if (theHeaders.equals("")) {
                              theHeaders = "Subject: (No Subject)";
                      } else {
                              theHeaders = "Subject: (No Subject)" + "\r\n" + theHeaders;
                      }
                                                                    }
                    /* Add the time the server processed this SEND command. */
                    Date rightNow = new Date();
                    theHeaders = theHeaders + "\r\n" + "Posted: " + rightNow.toGMTString();
                    if (receivedSubject.equals("")) {
                      theItem = theTopic.addItem("(No Subject)", theHeaders, messageData);
                    } else {
                      theItem = theTopic.addItem(receivedSubject, theHeaders, messageData);
                    }
                    if (theItem != null) {
                      printResponseHeader (rc_RECEIVED);
                      oqServer.incrementPerfCounter(OQServer.PERFCOUNTER_SEND_MESSAGES, 1);
                    } else {
                      printResponseHeader (rc_SERVERERROR);
                    }
                  } catch (Exception ex) {
                    printResponseHeader (rc_SERVERERROR);
                  }
                 
                  messageData = "";
                  theHeaders = "";
                  sendEndsWithDot = true;
                  seenBlankLine = false;
                  receivedSubject = "";
                  declaredContentLength = -1;
                  receivedSubject = "";
                  sentItemFullyReceived = false;
                }
                break;


              case state_READY:
                sentFirstLineOfCommand = true;
                if ( lineRead.equals("") ) {

                /* we've now seen a blank line, and can continue. */
                state = state_COMMANDRECEIVED;

                /* Note that we don't "break" here, so we can handle
                 * the command in the state_COMMANDRECEIVED block.
                 */

              } else {
                break;
              }

              case state_COMMANDRECEIVED :
                                                          sentFirstLineOfCommand = false;

                if (aCommand.equals ("LOGOUT")) {
                  thisUser.putAddress (null);
                  printResponseHeader (rc_BYE); /* logged out OK. */
                  try {
                    userCmdSocket.close ();
                  } catch (Exception ex) {
                  }
                } else {

                  if (aCommand.equals ("GET")) {
                    d = grabOutputStream();
                    if (sTokenizer.hasMoreTokens()) {

                      /* Form is "GET #1234 55" where 1234 is topic ID and 55 is item ID.
                       * Or, form is "GET /foo/bar 55".
                       */

                      String aTopicString = new String(sTokenizer.nextToken());
                      tempTopic = OQTopic.getByReference(aTopicString);
                      aTopicID = tempTopic.topicID;

                      if ((aTopicID > -1) && (sTokenizer.hasMoreTokens())) {
                        if (tempTopic != null) {

                          try {
                            anItemID = Integer.parseInt(sTokenizer.nextToken());
                          } catch (Exception ex) {
                            anItemID = -1; // error
                          }

                          if (tempTopic.userHasReadPermission(thisUser) == true) {
                            tempItem = tempTopic.getItemByID(anItemID);
                            if (tempItem != null) {

                              /* Asked for valid message, which follows. */
                              printResponseHeaderWithoutGrabbing  (d, rc_OK);

                              d.writeBytes("Subject: " + tempItem.getSubject() + "\r\n");
                              if (tempItem.getMessageLength() > 0) {
                                d.writeBytes("Content-Length: " + tempItem.getMessageLength() + "\r\n\r\n");
                                d.writeBytes(tempItem.getMessage() + "\r\n");
                              } else {
                                d.writeBytes("Content-Length: 0\r\n");
                              }
                            } else {
                              printResponseHeaderWithoutGrabbing  (d, rc_UNKNOWNITEMID);
                              OQServer.report(OQServer.repDebugMinor, "In handler run, Unknown item ID.");
                            }
                          } else {
                            printResponseHeaderWithoutGrabbing  (d, rc_NOREADPERMISSION); //  User not authorized to do SEND on this topic.
                           OQServer.report(OQServer.repDebugMinor, "In handler run, no permission to read.");
                          }
                        } else {
                          printResponseHeaderWithoutGrabbing  (d, rc_UNKNOWNTOPICID);
                          OQServer.report(OQServer.repDebugMinor, "In handler run, unknown topic ID.");
                        }
                      } else {
                        printResponseHeaderWithoutGrabbing  (d, rc_SYNTAXERROR); // we have a topic, but no item number.
                      }

                    } else {
                      printResponseHeaderWithoutGrabbing  (d, rc_SYNTAXERROR);
                    }
                  d.writeBytes("\r\n");
                  d.flush();
                  releaseOutputStream();
                } else {
                  if (aCommand.equals ("SEND")) {

                    /*  Cmd should be of form  "SEND #1234" or "SEND /foo/bar". */
                    anID = 0;
                    if (sTokenizer.hasMoreTokens()) {
                      String s = sTokenizer.nextToken();
                      theTopic = OQTopic.getByReference(s);
                      if (theTopic != null) {
                        anID = theTopic.topicID;
                      }
                    }

                    if (anID > 0) {
                      if (theTopic != null) {
                        if (((getAuthorizedUserID() == theTopic.owner) || (theTopic.owner == 0 )) || (thisUser.isAdmin()==true) ) {
                          state = state_SENDING;
                          /* Now grab output stream to keep our blabbing from confusing client. */
                          d = grabOutputStream();
                          printResponseHeaderWithoutGrabbing  (d, rc_READYFORSEND);
                          d.writeBytes("\r\n");
                          d.flush();

                          /* User can start SEND now.
                           * Note: we don't release the output stream here, because
                           * we want to keep things quiet after announcing that it's
                           * OK to start sending a message.
                           * The output stream is released above,
                           * when the sent message is complete.
                           */

                        } else {
                          printResponseHeader (rc_NOSENDPERMISSION); //  User not authorized to do SEND on this topic.
                          OQServer.report(OQServer.repDebugMinor, "No permission to send, authorizedUserID = " + getAuthorizedUserID() );
                        }
                      } else {
                        printResponseHeader (rc_UNKNOWNTOPICID);
                        OQServer.report(OQServer.repDebugMinor, "Unknown topic ID.");
                      }
                    } else {
                      printResponseHeader (rc_UNKNOWNTOPICID);
                      OQServer.report(OQServer.repDebugMinor, "Topic is zero.");
                    /* end of SEND command. */

                  } else {
                    if (aCommand.equals ("ACK")) {
                      if (sTokenizer.hasMoreTokens()) {
                        String s = sTokenizer.nextToken();
                        try {
                          anAckID = Integer.parseInt(s);
                        } catch (Exception ex) {
                          anAckID = -1;
                        }
                        if (theUpdater.transactionID == anAckID) {

                          /* updater should be waiting for us,
                           * so notify updater thread and set the acknowledgedID.
                           */

                          printResponseHeader(rc_OK);

                          theUpdater.notifyWithAcknowledgement(anAckID);
                          try {
                            synchronized (theUpdater) {
                              oqServer.incrementPerfCounter(OQServer.PERFCOUNTER_GET_MESSAGES, theUpdater.itemsToSend);
                              theUpdater.isHandlerReadyForNextUpdate = true;
                              theUpdater.notify();
                            }
                          } catch (Exception e) {
                          }



                        } else {
                          printResponseHeader(rc_CLIENTERROR);
                          OQServer.report(OQServer.repDebug, "Bad ACK ID. Expected " + theUpdater.transactionID + " but client sent " + anAckID + ".");
                        }
                                                                                  }
                    } else {
                      if (aCommand.equals ("SUBSCRIBE")) { // subscribe to a topic.
                        aTopic = "";
                        if (sTokenizer.hasMoreTokens()) aTopic = sTokenizer.nextToken();
                        if (aTopic.equals("")) {
                          printResponseHeader(rc_SYNTAXERROR);
                        } else {

                          // A topic number or path was found, so subscribe.
                          OQTopic tempSubscribeTopic = OQTopic.getByReference(aTopic);
                          if (tempSubscribeTopic != null) {
                            aTopicID = tempSubscribeTopic.topicID;
                            if (thisUser.userSubscribeToTopic(aTopicID)) {
                              printResponseHeader(rc_OK);
                            } else {
                              printResponseHeader(rc_CANNOTSUBSCRIBE);
                            }
                          }  else {
                            printResponseHeader(rc_UNKNOWNTOPICID);
                          }

                        }
                      } else {
                        if (aCommand.equals ("UNSUBSCRIBE")) { // subscribe to a topic.
                          aTopic = "";
                          if (sTokenizer.hasMoreTokens()) aTopic = sTokenizer.nextToken();
                            if (aTopic.equals("")) {
                              printResponseHeader(rc_SYNTAXERROR);
                            } else {

                              OQTopic tempUnsubscribeTopic = OQTopic.getByReference(aTopic);
                              if (tempUnsubscribeTopic != null) {
                                aTopicID = tempUnsubscribeTopic.topicID;
                                if (thisUser.userUnsubscribeFromTopic(aTopicID)) {
                                  printResponseHeader(rc_OK);
                                } else {
                                  printResponseHeader(rc_CANNOTUNSUBSCRIBE);
                                }
                              }  else {
                                printResponseHeader(rc_UNKNOWNTOPICID);
                              }

                            }

                          } else {
                          if (aCommand.equals ("GETSUBS")) { // Get list of the user's subscriptions.
                            d = grabOutputStream();
                            printResponseHeaderWithoutGrabbing  (d, rc_OK);
                            d.writeBytes(thisUser.subsAsString());
                            d.writeBytes("\r\n");
                            d.flush();
                            releaseOutputStream();

                          } else {
                            if (aCommand.equals ("UPDATES")) { // Turn updates on or off.
                              boolean updatesFlag = false; // don't send updates
                              if (sTokenizer.hasMoreTokens()) {
                                  String ufString = sTokenizer.nextToken();
                                if (ufString.toUpperCase().equals("ON")) {
                                  updatesFlag = true;
                                  printResponseHeader (rc_OK);
                                  ReadyForUpdates = updatesFlag;
                                  theUpdater.isHandlerReadyForNextUpdate = true;
                                  try {
                                    synchronized (theUpdater) {
                                      theUpdater.notify();
                                    }
                                  } catch (Exception e) {
                                  }
                                } else {
                                  if (ufString.toUpperCase().equals("OFF")) {
                                    updatesFlag = false;
                                    printResponseHeader (rc_OK);
                                    ReadyForUpdates = updatesFlag;
                                  } else // neither ON nor OFF
                                    printResponseHeader (rc_SYNTAXERROR);
                                  }
                                }
                              }

                            } else {
                              if (aCommand.equals ("NAMES")) {

                                /* NAMES #1001 /foo/bar #540 returns three titles,
                                 * or "?" for missing title or "??" for unknown topic
                                 */
                                int tempNameTopicID;
                                String tempTopicStr;
                                OQTopic tempNamedTopic;
                               
                                d = grabOutputStream();
                                while (sTokenizer.hasMoreTokens()){
                                  tempTopicStr = sTokenizer.nextToken();
                                  tempNamedTopic = OQTopic.getByReference(tempTopicStr);

                                  if (tempNamedTopic != null) {
                                    if (tempNamedTopic.topicName.equals("")) {
                                      d.writeBytes("?\r\n");
                                    } else {
                                      d.writeBytes(tempNamedTopic.topicName + "\r\n");
                                    }
                                  } else // path names not supported yet.
                                          d.writeBytes("??\r\n");
                                  }
                                }
                                d.writeBytes("\r\n");
                                d.flush();
                                releaseOutputStream();

                                } else {

                                  /* THREADS is not part of the protocol.
                                   * This is just for debugging purposes, and
                                   * will be removed at some time.
                                   */
                                  if (aCommand.equals("THREADS")) {
                                    d = grabOutputStream();
                                    printResponseHeaderWithoutGrabbing  (d, rc_OK);
                                    d.writeBytes ("activeCount = " + currentThread().getThreadGroup().activeCount());

                                    Thread allThreads[];
                                    try{
                                      allThreads = new Thread[Thread.activeCount()];
                                      Thread.enumerate (allThreads);
                                      for (int x=0;x<allThreads.length;x++) {
                                        d.writeBytes("  Thread:"+allThreads[x].getName()+":Pri="+allThreads[x].getPriority()+":Alive="+allThreads[x].isAlive());
                                        if (x == 40) {
                                          d.writeBytes(" ... etc. etc. total of " + allThreads.length + " threads.  ");
                                          x = allThreads.length;
                                        }
                                      }
                                    }  catch (Exception e){
                                      OQServer.report(OQServer.repDebug, "thisThread was interrupted");
                                    }

                                    d.writeBytes("\r\n");
                                    d.writeBytes("\r\n");
                                    d.flush();
                                    releaseOutputStream();
                                  } else {

                                    if (aCommand.equals ("SETCURSOR")) { // lets us skip messages we don't want to receive, etc.
                                    /* ***********************
                                    *
                                    *  This is totally half-baked, and will give unreliable results.
                                    *  Please implement this fully and cleanly by rewriting setSubscriptionCursor.
                                    *
                                    ***************************/

                                      aTopic = "";
                                      if (sTokenizer.hasMoreTokens()) aTopic = sTokenizer.nextToken();
                                      if (aTopic.equals("")) {
                                        printResponseHeader(rc_SYNTAXERROR);
                                      } else {
                                        OQTopic setcursTopic = OQTopic.getByReference(aTopic);
                                        if (setcursTopic != null) {
                                          aTopicID = setcursTopic.topicID;
                                          String sCursPosition = "";
                                          int iCursPosition = 0;
                                          if (sTokenizer.hasMoreTokens()) sCursPosition = sTokenizer.nextToken();
                                          try {
                                            iCursPosition = Integer.parseInt(sCursPosition);
                                            // okay, set the user's cursor position.
                                            setSubscriptionCursor(setcursTopic, iCursPosition);
                                            printResponseHeader(rc_OK);
                                          } catch (Exception ex) {
                                            printResponseHeader(rc_CANNOTSETCURSOR);
                                          }
                                        } else {
                                          printResponseHeader(rc_UNKNOWNTOPICID);
                                        }
                                      }

                                    } else {
                                      if (aCommand.equals("")) {
                                        /* Blank line where expecting a command. No biggie. */
                                      } else {
                                        printResponseHeader (rc_UNKNOWNCOMMAND); //  unknown command.
                                      }
                                    }
                                  }


                                }
                              }
                            }
                          }
                      }
                    }
                  }
                }

                if (state == state_COMMANDRECEIVED) {

                  /* this won't be called if, e.g., we've
                   * entered state_SENDING.
                   */
                  state = state_READY;   /* go back to accepting commands. */
                }
              }

            }  /* end of switch (state)  */

        }     /* end of while */
      } catch (IOException ex) {
        /* usually it's just the client closing the socket. */
      }
    }   catch (Exception ex) {
      OQServer.report(OQServer.repError, this.getName()+": "+ "Exception at far outside of BUCH run().");
      ex.printStackTrace();
    }

    try {
      oqServer.incrementPerfCounter(OQServer.PERFCOUNTER_CURRENT_CONNECTIONS, -1);
      userCmdHandlers.removeElement(this);
      prepareToDie();
    } catch (Exception ex) {
      OQServer.report(OQServer.repError, "Exception during cleanup of run() for BUCH. " + ex.getMessage() );
    }
  }

  private void setSubscriptionCursor(OQTopic aTopic, int newCursorPosition){
    /* Need to:
     *   - set position in OQUser.
     *   - have OQUser update database
     *   - remove any older/newer messages in to-do items.
     * !!!!!!!!!!!!!!!!! this needs to be finished.
     */
     thisUser.markItemAsHeardOf(aTopic.topicID, newCursorPosition);
     thisUser.recordItemAsHeardOf(aTopic.topicID, newCursorPosition );
  }


  public static void main(String args[]) throws IOException {
  }

}
TOP

Related Classes of org.openqueue.oqtane.OQUserCmdHandler

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.