/* oqtane (OpenQueue server classes and daemon)
*
* Copyright (c) 2000 oqtane Development Team (See Credits file)
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Lesser General Public
* License as published by the Free Software Foundation; either
* version 2.1 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public
* License along with this library; if not, write to the Free Software
* Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
*/
package org.openqueue.oqtane;
import java.net.*;
import java.io.*;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Date;
import java.util.Random;
import java.util.Vector;
import java.util.Enumeration;
import java.util.StringTokenizer;
import java.util.Hashtable;
public class OQUserCmdHandler extends Thread {
static String OQprotocolVersion = "0.50";
public static Vector userCmdHandlers;
static Hashtable responseCodes;
private OQServer oqServer;
protected boolean shouldRun = true;
private long lConnectionID = 0;
private String aCmdTag = new String ();
protected Socket userCmdSocket;
private String authorizedUserName; // this is "" until they give a valid password.
private int authorizedUserID;
protected OQUser thisUser;
private int state = 0;
private StreamLock outputStreamLock = new StreamLock();
private String modeRequestString;
protected OQUpdater theUpdater = null;
protected boolean ReadyForUpdates = false;
private DataInputStream din;
private DataOutputStream dout;
private String sServerNonce = null;
private final String modesAvailable= new String("MODES=ALIVE");
static final int state_UNAUTHORIZED = 0;
static final int state_READY = 1; // set after password is accepted.
static final int state_COMMANDRECEIVED = 2; // looking for a blank line to end the command.
static final int state_SENDING = 3; // set to receive data for a message.
static final int rc_OK = 200;
static final int rc_BYE = 201;
static final int rc_READYFORSEND = 202;
static final int rc_RECEIVED = 203;
static final int rc_CLIENTERROR = 300;
static final int rc_UNKNOWNNAMEPASSWORD = 301;
static final int rc_UNKNOWNCOMMAND = 302;
// static final int rc_WRONGPASSWORD = 303;
static final int rc_ERRORLOGGINGOUT = 304;
static final int rc_UNKNOWNTOPICID = 305;
static final int rc_NOSENDPERMISSION = 306;
static final int rc_MODEUNAVAILABLE = 307;
static final int rc_SYNTAXERROR = 308;
static final int rc_NOREADPERMISSION = 309;
static final int rc_UNKNOWNITEMID = 310;
static final int rc_ALREADYLOGGEDIN = 311;
static final int rc_CANNOTSUBSCRIBE = 320;
static final int rc_CANNOTUNSUBSCRIBE = 321;
static final int rc_CANNOTSETCURSOR = 322;
static final int rc_SERVERERROR = 400;
static final int rc_SERVERMAXCONN = 505;
// init the class variables
static {
userCmdHandlers = new Vector ();
responseCodes = new Hashtable ();
responseCodes.put (Integer.toString(rc_OK), "OK.");
responseCodes.put (Integer.toString(rc_BYE), "BYE.");
responseCodes.put (Integer.toString(rc_CLIENTERROR), "Client Error.");
responseCodes.put (Integer.toString(rc_UNKNOWNNAMEPASSWORD), "User name or password not recognized.");
// responseCodes.put (Integer.toString(rc_WRONGPASSWORD), "Password not recognized.");
responseCodes.put (Integer.toString(rc_UNKNOWNCOMMAND), "Unknown command.");
responseCodes.put (Integer.toString(rc_ERRORLOGGINGOUT), "Error logging out.");
responseCodes.put (Integer.toString(rc_UNKNOWNTOPICID), "Topic ID not recognized.");
responseCodes.put (Integer.toString(rc_UNKNOWNITEMID), "Item ID not recognized.");
responseCodes.put (Integer.toString(rc_NOSENDPERMISSION), "You do not have permission to send to this topic.");
responseCodes.put (Integer.toString(rc_NOREADPERMISSION), "You do not have permission to read from this topic.");
responseCodes.put (Integer.toString(rc_MODEUNAVAILABLE), "No mode is available which matches your request.");
responseCodes.put (Integer.toString(rc_READYFORSEND), "Ready for message to be sent; end with '.' on its own line.");
responseCodes.put (Integer.toString(rc_SYNTAXERROR), "Syntax error.");
responseCodes.put (Integer.toString(rc_RECEIVED), "Received.");
responseCodes.put (Integer.toString(rc_ALREADYLOGGEDIN), "You are already logged in on a different connection.");
responseCodes.put (Integer.toString(rc_CANNOTSUBSCRIBE), "Cannot subscribe to the topic.");
responseCodes.put (Integer.toString(rc_CANNOTUNSUBSCRIBE), "Cannot unsubscribe from the topic.");
responseCodes.put (Integer.toString(rc_CANNOTSETCURSOR), "Cannot set cursor position.");
responseCodes.put (Integer.toString(rc_SERVERERROR), "Server error.");
responseCodes.put (Integer.toString(rc_SERVERMAXCONN), "Server too busy. Maximum connections in use.");
}
public OQUserCmdHandler(OQServer aServer, Socket aSocket, long lTotalConnections) {
oqServer = aServer;
lConnectionID = lTotalConnections;
userCmdSocket = aSocket;
authorizedUserName = new String("");
modeRequestString = new String("");
authorizedUserID = 0;
setName("BUCHndlr for connID" + getlConnectionID() +")" );
}
public boolean getReadyForUpdates() {
return ReadyForUpdates;
}
public int prepareToDie() {
shouldRun = false;
if (userCmdSocket != null) {
try {
userCmdSocket.close();
userCmdSocket = null;
} catch (Exception e) {
}
}
if (theUpdater != null) {
synchronized (theUpdater) {
theUpdater.prepareToDie();
try {
theUpdater.shouldRun = false;
theUpdater.start();
} catch (Exception ex) {
};
try {
theUpdater.notify();
} catch (Exception ex) {
};
try {
theUpdater.join(3000);
} catch (InterruptedException ex) {
OQServer.report(OQServer.repDebug, "Exception in joining updater to kill it: "+ex.getMessage()+" ");
}
if (theUpdater.isAlive()) {
try {
OQServer.report(OQServer.repDebug, "Updater still alive for " + theUpdater.getName() +". updater shouldRun =" + theUpdater.shouldRun + ", runstarted =" + theUpdater.isRunStarted() + " ");
try {
theUpdater.notify();
sleep(1);
theUpdater.join(3000);
if (theUpdater.isAlive()) {
OQServer.report(OQServer.repDebug, "Second try, Updater still alive for " + theUpdater.getName() +". updater shouldRun =" + theUpdater.shouldRun + ", runstarted =" + theUpdater.isRunStarted() + " ");
}
} catch (InterruptedException ex) {
OQServer.report(OQServer.repDebug, "Exception in joining updater to kill it: "+ex.getMessage()+" ");
}
} catch (Exception ex) {
OQServer.report(OQServer.repDebug, "Exception in checking updater to kill it: "+ex.getMessage()+" ");
}
}
} // synch
}
if (thisUser != null) {
// try to ensure we wrap up this user OK.
thisUser.putAddress(null);
thisUser.removeAllToDoItems();
thisUser.prepareToKill();
thisUser = null;
}
return 0; // 0 means no errors.
}
public String getAuthorizedUserName() {
return authorizedUserName;
}
public long getlConnectionID() {
return lConnectionID;
}
public int getUserType() {
if (thisUser != null) {
return thisUser.getUserType();
} else {
return 0;
}
}
public static OQUserCmdHandler getBylConnectionID(long aConnID) {
OQUserCmdHandler result = null;
synchronized (OQUserCmdHandler.userCmdHandlers) {
OQUserCmdHandler aBCH;
Enumeration enum = OQUserCmdHandler.userCmdHandlers.elements ();
while ((enum.hasMoreElements ()) && (result == null)) {
aBCH = (OQUserCmdHandler) enum.nextElement ();
if ( aBCH.getlConnectionID() == aConnID) { result = aBCH; }
}
}
return result;
}
/* Note: there is no setState method, because
* state is set through the handling of the OQ session.
*/
public int getState() {
return state;
}
public int getAuthorizedUserID() {
return authorizedUserID;
}
public void setAuthorizedUserName(String s) {
authorizedUserName = s;
}
public void setAuthorizedUserID(int id) {
authorizedUserID = id;
}
public void printResponseHeader (int responseCode) {
DataOutputStream d;
try {
String s = Integer.toString (responseCode);
d = grabOutputStream();
if (aCmdTag.equals("")) {
d.writeBytes (s + " - " + responseCodes.get (s) + "\r\n\r\n");
} else {
d.writeBytes (aCmdTag + " " + s + " - " + responseCodes.get (s) + "\r\n\r\n");
}
d.flush ();
releaseOutputStream();
} catch (IOException ex) {
// what to do?
}
}
public void printResponseHeaderWithoutGrabbing (DataOutputStream d, int responseCode) {
try {
String s = Integer.toString (responseCode);
if (aCmdTag.equals("")) {
d.writeBytes (s + " - " + responseCodes.get (s) + "\r\n");
} else {
d.writeBytes (aCmdTag + " " + s + " - " + responseCodes.get (s) + "\r\n");
}
d.flush ();
} catch (IOException ex) {
// what to do?
}
}
/* Check the "MODE=" parameter to see if we have at
* least one requested mode.
*/
private boolean IsAvailableMode(String s) {
if (s != null) {
if (s.toUpperCase().startsWith("MODE=")) {
s = s.substring(5);
}
if ((s.indexOf("ALIVE") > -1) || (s.indexOf(",ALIVE")) > -1) {
return true;
} else {
return false;
}
} else {
return false;
}
}
public DataOutputStream grabOutputStream() {
outputStreamLock.grabMe();
return dout;
}
public void releaseOutputStream() {
outputStreamLock.releaseMe();
}
/* returns number of bytes written out on this connection */
protected int dataWrittenSize() {
if (dout == null) {
return 0;
} else {
return dout.size();
}
}
private String userAuthenticates(Hashtable hashAuthParams, String theUserPassword) {
String sResponse = null; //null=failure, non-null=authenticated
try {
MD5 md5Hash = null;
if (theUserPassword == null) { theUserPassword = "";};
String aUser = (String) hashAuthParams.get("USER");
String aType = (String) hashAuthParams.get("AUTHTYPE");
String aPassword = (String) hashAuthParams.get("PASSWORD");
if (aType.equals("plaintext")) {
if (theUserPassword.equals(aPassword)) {
sResponse = ""; // will be treated as success, but no response will be printed.
}
} else {
if (aType.equals("simple-md5")) {
// value = md5( user:nonce:password )
String aValue = (String) hashAuthParams.get("HASH");
String aUserNonce = (String) hashAuthParams.get("USERNONCE");
if (aUserNonce.equals("") || aValue.equals("")) {
// Do not accept user if they haven't included a nonce and value.
// This protects them from stupidity.
} else {
md5Hash = new MD5(sServerNonce+":"+aUser+":"+aUserNonce+":"+theUserPassword);
String sMD5asHex = md5Hash.asHex();
if (aValue.equals(sMD5asHex)) {
md5Hash = new MD5(sServerNonce+":"+aUserNonce+":"+theUserPassword);
sResponse = md5Hash.asHex();
}
}
} // end of simple-md5
}
} catch (Exception e) {
}
return sResponse;
}
public void run() {
int anID;
int aTopicID = 0;
int anItemID;
int anAckID; // acknowledgment for the server-generated transaction number
OQTopic tempTopic;
OQItem tempItem;
userCmdHandlers.addElement (this);
String lineRead;
StringTokenizer sTokenizer = null;
OQTopic theTopic = null;
String aCommand = new String ();
String aName = new String ();
String aTopic = new String ();
String aPassword = new String ();
String messageData = new String ();
String theHeaders = new String ();
boolean sendEndsWithDot = true;
boolean sentItemFullyReceived = false;
boolean sentFirstLineOfCommand = false;
String receivedSubject = new String ("");
boolean seenBlankLine = false; // true when all headers for this message are done.
int declaredContentLength = -1; // XXXXXXXXX this limitation must be removed!
DataOutputStream d;
String sAuthenticationMethod = "";
String aNameValue; // helper variables to split name=value pairs
String sNVname ="";
String sNVvalue="";
StringTokenizer sNVTokenizer = null;
Hashtable hashAuthParams = new Hashtable ();
int theUserID = 0;
int theUserType = 0;
String theUserPassword = ""; // password read in from database
String sql = "";
thisUser = null;
hashAuthParams.put("MODE", "ALIVE"); // default
hashAuthParams.put("AUTHTYPE", "plaintext"); // default
try {
try {
din = new DataInputStream (new BufferedInputStream (userCmdSocket.getInputStream ()));
dout = new DataOutputStream (new BufferedOutputStream (userCmdSocket.getOutputStream ()));
Random rServerNonce = new Random();
sServerNonce = Long.toHexString(rServerNonce.nextLong());
sServerNonce = sServerNonce + Long.toHexString(System.currentTimeMillis());
sServerNonce = sServerNonce.substring(0,7) + sServerNonce.substring(sServerNonce.length()-8);
dout.writeBytes ("OPENQUEUE=" + OQprotocolVersion + " " + "SERVER=" + OQServer.getServerShortDesc() + " " + modesAvailable + " NONCE=" + sServerNonce +"\r\n\r\n");
dout.flush ();
while ((shouldRun == true) && ((lineRead = din.readLine ()) != null)) {
if (state != state_SENDING) {
if (sentFirstLineOfCommand == false) {
sTokenizer = new StringTokenizer (lineRead, " ");
try {
aCommand = "";
aCmdTag = "";
if (state == state_READY) {
/* About to get cmdTag */
if (sTokenizer.hasMoreTokens()) aCmdTag = sTokenizer.nextToken();
}
/* About to parse out aCommand */
if (sTokenizer.hasMoreTokens()) aCommand = sTokenizer.nextToken();
} catch (Exception ex) {
OQServer.report(OQServer.repDebug, "Error tokenizing command line. " + ex);
}
}
}
/* process the command, if supported */
switch (state) {
case state_UNAUTHORIZED :
if (sentFirstLineOfCommand ==false){
if (aCommand.toUpperCase().equals ("LOGIN")) {
if (sTokenizer.hasMoreTokens()) aName = sTokenizer.nextToken();
if (sTokenizer.hasMoreTokens()) aPassword= sTokenizer.nextToken();
if (sTokenizer.hasMoreTokens()) modeRequestString = sTokenizer.nextToken();
if (modeRequestString.toUpperCase().startsWith("MODE=")) {
hashAuthParams.put("MODE", modeRequestString.substring(5));
} else {
hashAuthParams.put("MODE", modeRequestString);
}
// sAuthenticationMethod = "" still
hashAuthParams.put("USER", aName);
hashAuthParams.put("PASSWORD", aPassword);
}
if (aCmdTag.startsWith("!")) {
if (sTokenizer.hasMoreTokens()) aCommand = sTokenizer.nextToken();
}
if (aCommand.toUpperCase().equals ("AUTH")) {
if (sTokenizer.hasMoreTokens()) {
sAuthenticationMethod = sTokenizer.nextToken();
} else {
// hmmm... no authentication method specified?
}
}
sentFirstLineOfCommand = true;
}
if (lineRead.equals("")) {
aName = (String) hashAuthParams.get("USER");
aPassword = (String) hashAuthParams.get("PASSWORD");
// System.out.println("hashAuthParams = " + hashAuthParams.toString());
if (IsAvailableMode(hashAuthParams.get("MODE").toString().toUpperCase() )) {
// yyy start
if (hashAuthParams.get("USER").toString().toUpperCase().equals("GUEST")) {
// setThisUserAsGuest(hashAuthParams, d)
// public boolean setThisUserAsGuest(){
thisUser = new OQUser(oqServer, "guest", "password", 0, true, 0);
setAuthorizedUserName ("guest");
/* user id remains 0. */
d = grabOutputStream();
printResponseHeaderWithoutGrabbing (d, rc_OK);
theUpdater = new OQUpdater(this);
thisUser.handleLogin (this, userCmdSocket, d, OQUser.login_LASTITEM);
d.writeBytes("\r\n");
d.flush();
theUpdater.start();
state = state_READY;
releaseOutputStream();
// }
// ********************
} else {
thisUser = OQUser.getByName(aName);
// bbb start
/* if not already in memory, look it up in database. */
if (thisUser == null) {
OQDBAccess aDB = oqServer.GetDB();
synchronized(aDB) {
try {
sql = OQDBAccess.getSQLCommand(OQDBAccess.sql_GETUSERIDANDPASSWORDFROMNAME, aName);
ResultSet rs = aDB.getStatement().executeQuery( sql );
OQUser bUser = null;
while(rs.next()) {
theUserPassword = rs.getString(1);
theUserID = (int) rs.getLong(2);
theUserType = (int) rs.getLong(3);
/* Discovered user in database...
* MUST now create a user object and set the password,
* look up subscriptions, etc.
*/
bUser = new OQUser (oqServer, aName, aPassword, theUserID, false, theUserType);
}
rs.close();
if (bUser != null) {
// was getting subs here
thisUser = bUser;
} else {
/* No match from user database */
}
} catch (Exception ex) {
OQServer.report(OQServer.repError, "Exception trying to find user in database. " + ex.getMessage() );
}
} // synchronized
}
// bbb end
}
if (thisUser == null) {
aName = "";
aPassword = "";
printResponseHeader (rc_UNKNOWNNAMEPASSWORD); // user name or password not recognized.
} else {
// if (thisUser.password.equals(aPassword)) {
String sAuthResponse = userAuthenticates(hashAuthParams, theUserPassword);
if (sAuthResponse != null) {
/* Are we already logged in? Only one connection per user allowed for now. */
if ( thisUser.isLoggedIn() ) {
printResponseHeader (rc_ALREADYLOGGEDIN);
} else {
/* Mark user as logged in, and send OK response. */
setAuthorizedUserName (aName);
OQDBAccess aDB = oqServer.GetDB();
synchronized (aDB) {
sql = OQDBAccess.getSQLCommand(OQDBAccess.sql_USERIDFROMNAME, aName);
long lng = aDB.Get1KnownLong( sql );
setAuthorizedUserID( (int) lng );
}
state = state_READY;
d = grabOutputStream();
printResponseHeaderWithoutGrabbing (d, rc_OK);
if (sAuthResponse.equals("") == false) {
d.writeBytes("Response: " + sAuthResponse + "\r\n");
}
/* Don't send final d.writeBytes("\r\n"); until
* response is done from handleLogin.
*/
theUpdater = new OQUpdater(this);
sql = OQDBAccess.getSQLCommand(OQDBAccess.sql_LOADUSERSUBS, String.valueOf(theUserID) );
ResultSet rs2 = aDB.getStatement().executeQuery( sql );
while(rs2.next()) {
int iTmpTopicID;
int iTmpLastSeen;
int iTmpFlags;
int iTmpMaxDefault;
iTmpTopicID = (int) rs2.getInt(1);
iTmpLastSeen = (int) rs2.getLong(2);
iTmpFlags = (short) rs2.getInt(3);
iTmpMaxDefault = (int) rs2.getLong(4);
OQSubscription aSubscription = new OQSubscription( iTmpTopicID, iTmpLastSeen, iTmpFlags );
aSubscription.setMaxQueued( iTmpMaxDefault );
aSubscription.lookupTopicPointer();
thisUser.subscriptions.addElement (aSubscription);
}
rs2.close();
thisUser.handleLogin (this, userCmdSocket, d, OQUser.login_LASTITEM);
d.writeBytes("\r\n");
d.flush();
theUpdater.start();
releaseOutputStream();
}
} else {
printResponseHeader (rc_UNKNOWNNAMEPASSWORD); // logged in OK.
}
}
// yyy end
} else {
printResponseHeader (rc_MODEUNAVAILABLE);
}
sentFirstLineOfCommand = false;
} else {
// Record additional AUTH parameters if we're beyond first line.
if ((sentFirstLineOfCommand == true) && (lineRead.startsWith("!")==false) ) {
int iNameValueBoundary = lineRead.indexOf(58);
if (iNameValueBoundary > 0) {
try {
sNVname = lineRead.substring(0,iNameValueBoundary);
sNVvalue = lineRead.substring(iNameValueBoundary+1);
} catch (Exception e) {
}
if (sNVname != null) { sNVname = sNVname.trim().toUpperCase(); }
if (sNVvalue != null) { sNVvalue = sNVvalue.trim(); }
hashAuthParams.put(sNVname, sNVvalue);
} else {
// unexpected. We'll ignore for now.
}
} // end of test for authentication method
}
break;
case state_SENDING :
if (sendEndsWithDot && lineRead.equals(".")) { // end of message. Post it.
sentItemFullyReceived = true;
} else { // contains message data or header line.
if (seenBlankLine) {
if (messageData.equals("")) {
messageData = lineRead;
} else {
messageData = messageData + "\r\n" + lineRead;
}
} else { /* process header line. */
if (lineRead.equals("")) {
seenBlankLine = true; // everything after this is message data.
if (sendEndsWithDot == false) {
/* receive all message content now. */
byte b[] = new byte[declaredContentLength];
try {
din.readFully(b, 0, declaredContentLength);
} catch (Exception ex) {
/* ? what to do ? */
}
StringBuffer sb = new StringBuffer(declaredContentLength);
int i;
for (i=0; i < declaredContentLength; i++) {
sb.append( (char) b[i] ); // Character(c));
}
messageData = sb.toString();
sentItemFullyReceived = true;
}
} else { /* process real header line. */
if (theHeaders.equals("")) {
theHeaders = lineRead;
} else {
theHeaders = theHeaders + "\r\n" + lineRead;
}
String headerType = lineRead.toUpperCase();
if (headerType.startsWith ("SUBJECT:")) {
receivedSubject = lineRead.substring(9, lineRead.length());
}
if (headerType.startsWith ("CONTENT-LENGTH:")) {
try {
declaredContentLength = Integer.parseInt(lineRead.substring(16, lineRead.length()));
sendEndsWithDot = false;
} catch (RuntimeException ex) {
OQServer.report(OQServer.repDebug, "Invalid content length declaration. " + ex);
/* ??? return error to client? */
}
}
}
}
}
if (sentItemFullyReceived) {
/* Now let updater write out stuff if it wants;
* We're finished with focusing on user input.
*/
// ??? shouldn't this be after we print response header?
releaseOutputStream();
/* go back to accepting commands. */
state = state_READY;
/* Save message, then tell sender we got the whole message. */
try {
OQItem theItem;
if (receivedSubject.equals("")) {
/* stick the Subject at the front of theHeaders, if headers exist. */
if (theHeaders.equals("")) {
theHeaders = "Subject: (No Subject)";
} else {
theHeaders = "Subject: (No Subject)" + "\r\n" + theHeaders;
}
}
/* Add the time the server processed this SEND command. */
Date rightNow = new Date();
theHeaders = theHeaders + "\r\n" + "Posted: " + rightNow.toGMTString();
if (receivedSubject.equals("")) {
theItem = theTopic.addItem("(No Subject)", theHeaders, messageData);
} else {
theItem = theTopic.addItem(receivedSubject, theHeaders, messageData);
}
if (theItem != null) {
printResponseHeader (rc_RECEIVED);
oqServer.incrementPerfCounter(OQServer.PERFCOUNTER_SEND_MESSAGES, 1);
} else {
printResponseHeader (rc_SERVERERROR);
}
} catch (Exception ex) {
printResponseHeader (rc_SERVERERROR);
}
messageData = "";
theHeaders = "";
sendEndsWithDot = true;
seenBlankLine = false;
receivedSubject = "";
declaredContentLength = -1;
receivedSubject = "";
sentItemFullyReceived = false;
}
break;
case state_READY:
sentFirstLineOfCommand = true;
if ( lineRead.equals("") ) {
/* we've now seen a blank line, and can continue. */
state = state_COMMANDRECEIVED;
/* Note that we don't "break" here, so we can handle
* the command in the state_COMMANDRECEIVED block.
*/
} else {
break;
}
case state_COMMANDRECEIVED :
sentFirstLineOfCommand = false;
if (aCommand.equals ("LOGOUT")) {
thisUser.putAddress (null);
printResponseHeader (rc_BYE); /* logged out OK. */
try {
userCmdSocket.close ();
} catch (Exception ex) {
}
} else {
if (aCommand.equals ("GET")) {
d = grabOutputStream();
if (sTokenizer.hasMoreTokens()) {
/* Form is "GET #1234 55" where 1234 is topic ID and 55 is item ID.
* Or, form is "GET /foo/bar 55".
*/
String aTopicString = new String(sTokenizer.nextToken());
tempTopic = OQTopic.getByReference(aTopicString);
aTopicID = tempTopic.topicID;
if ((aTopicID > -1) && (sTokenizer.hasMoreTokens())) {
if (tempTopic != null) {
try {
anItemID = Integer.parseInt(sTokenizer.nextToken());
} catch (Exception ex) {
anItemID = -1; // error
}
if (tempTopic.userHasReadPermission(thisUser) == true) {
tempItem = tempTopic.getItemByID(anItemID);
if (tempItem != null) {
/* Asked for valid message, which follows. */
printResponseHeaderWithoutGrabbing (d, rc_OK);
d.writeBytes("Subject: " + tempItem.getSubject() + "\r\n");
if (tempItem.getMessageLength() > 0) {
d.writeBytes("Content-Length: " + tempItem.getMessageLength() + "\r\n\r\n");
d.writeBytes(tempItem.getMessage() + "\r\n");
} else {
d.writeBytes("Content-Length: 0\r\n");
}
} else {
printResponseHeaderWithoutGrabbing (d, rc_UNKNOWNITEMID);
OQServer.report(OQServer.repDebugMinor, "In handler run, Unknown item ID.");
}
} else {
printResponseHeaderWithoutGrabbing (d, rc_NOREADPERMISSION); // User not authorized to do SEND on this topic.
OQServer.report(OQServer.repDebugMinor, "In handler run, no permission to read.");
}
} else {
printResponseHeaderWithoutGrabbing (d, rc_UNKNOWNTOPICID);
OQServer.report(OQServer.repDebugMinor, "In handler run, unknown topic ID.");
}
} else {
printResponseHeaderWithoutGrabbing (d, rc_SYNTAXERROR); // we have a topic, but no item number.
}
} else {
printResponseHeaderWithoutGrabbing (d, rc_SYNTAXERROR);
}
d.writeBytes("\r\n");
d.flush();
releaseOutputStream();
} else {
if (aCommand.equals ("SEND")) {
/* Cmd should be of form "SEND #1234" or "SEND /foo/bar". */
anID = 0;
if (sTokenizer.hasMoreTokens()) {
String s = sTokenizer.nextToken();
theTopic = OQTopic.getByReference(s);
if (theTopic != null) {
anID = theTopic.topicID;
}
}
if (anID > 0) {
if (theTopic != null) {
if (((getAuthorizedUserID() == theTopic.owner) || (theTopic.owner == 0 )) || (thisUser.isAdmin()==true) ) {
state = state_SENDING;
/* Now grab output stream to keep our blabbing from confusing client. */
d = grabOutputStream();
printResponseHeaderWithoutGrabbing (d, rc_READYFORSEND);
d.writeBytes("\r\n");
d.flush();
/* User can start SEND now.
* Note: we don't release the output stream here, because
* we want to keep things quiet after announcing that it's
* OK to start sending a message.
* The output stream is released above,
* when the sent message is complete.
*/
} else {
printResponseHeader (rc_NOSENDPERMISSION); // User not authorized to do SEND on this topic.
OQServer.report(OQServer.repDebugMinor, "No permission to send, authorizedUserID = " + getAuthorizedUserID() );
}
} else {
printResponseHeader (rc_UNKNOWNTOPICID);
OQServer.report(OQServer.repDebugMinor, "Unknown topic ID.");
}
} else {
printResponseHeader (rc_UNKNOWNTOPICID);
OQServer.report(OQServer.repDebugMinor, "Topic is zero.");
} /* end of SEND command. */
} else {
if (aCommand.equals ("ACK")) {
if (sTokenizer.hasMoreTokens()) {
String s = sTokenizer.nextToken();
try {
anAckID = Integer.parseInt(s);
} catch (Exception ex) {
anAckID = -1;
}
if (theUpdater.transactionID == anAckID) {
/* updater should be waiting for us,
* so notify updater thread and set the acknowledgedID.
*/
printResponseHeader(rc_OK);
theUpdater.notifyWithAcknowledgement(anAckID);
try {
synchronized (theUpdater) {
oqServer.incrementPerfCounter(OQServer.PERFCOUNTER_GET_MESSAGES, theUpdater.itemsToSend);
theUpdater.isHandlerReadyForNextUpdate = true;
theUpdater.notify();
}
} catch (Exception e) {
}
} else {
printResponseHeader(rc_CLIENTERROR);
OQServer.report(OQServer.repDebug, "Bad ACK ID. Expected " + theUpdater.transactionID + " but client sent " + anAckID + ".");
}
}
} else {
if (aCommand.equals ("SUBSCRIBE")) { // subscribe to a topic.
aTopic = "";
if (sTokenizer.hasMoreTokens()) aTopic = sTokenizer.nextToken();
if (aTopic.equals("")) {
printResponseHeader(rc_SYNTAXERROR);
} else {
// A topic number or path was found, so subscribe.
OQTopic tempSubscribeTopic = OQTopic.getByReference(aTopic);
if (tempSubscribeTopic != null) {
aTopicID = tempSubscribeTopic.topicID;
if (thisUser.userSubscribeToTopic(aTopicID)) {
printResponseHeader(rc_OK);
} else {
printResponseHeader(rc_CANNOTSUBSCRIBE);
}
} else {
printResponseHeader(rc_UNKNOWNTOPICID);
}
}
} else {
if (aCommand.equals ("UNSUBSCRIBE")) { // subscribe to a topic.
aTopic = "";
if (sTokenizer.hasMoreTokens()) aTopic = sTokenizer.nextToken();
if (aTopic.equals("")) {
printResponseHeader(rc_SYNTAXERROR);
} else {
OQTopic tempUnsubscribeTopic = OQTopic.getByReference(aTopic);
if (tempUnsubscribeTopic != null) {
aTopicID = tempUnsubscribeTopic.topicID;
if (thisUser.userUnsubscribeFromTopic(aTopicID)) {
printResponseHeader(rc_OK);
} else {
printResponseHeader(rc_CANNOTUNSUBSCRIBE);
}
} else {
printResponseHeader(rc_UNKNOWNTOPICID);
}
}
} else {
if (aCommand.equals ("GETSUBS")) { // Get list of the user's subscriptions.
d = grabOutputStream();
printResponseHeaderWithoutGrabbing (d, rc_OK);
d.writeBytes(thisUser.subsAsString());
d.writeBytes("\r\n");
d.flush();
releaseOutputStream();
} else {
if (aCommand.equals ("UPDATES")) { // Turn updates on or off.
boolean updatesFlag = false; // don't send updates
if (sTokenizer.hasMoreTokens()) {
String ufString = sTokenizer.nextToken();
if (ufString.toUpperCase().equals("ON")) {
updatesFlag = true;
printResponseHeader (rc_OK);
ReadyForUpdates = updatesFlag;
theUpdater.isHandlerReadyForNextUpdate = true;
try {
synchronized (theUpdater) {
theUpdater.notify();
}
} catch (Exception e) {
}
} else {
if (ufString.toUpperCase().equals("OFF")) {
updatesFlag = false;
printResponseHeader (rc_OK);
ReadyForUpdates = updatesFlag;
} else { // neither ON nor OFF
printResponseHeader (rc_SYNTAXERROR);
}
}
}
} else {
if (aCommand.equals ("NAMES")) {
/* NAMES #1001 /foo/bar #540 returns three titles,
* or "?" for missing title or "??" for unknown topic
*/
int tempNameTopicID;
String tempTopicStr;
OQTopic tempNamedTopic;
d = grabOutputStream();
while (sTokenizer.hasMoreTokens()){
tempTopicStr = sTokenizer.nextToken();
tempNamedTopic = OQTopic.getByReference(tempTopicStr);
if (tempNamedTopic != null) {
if (tempNamedTopic.topicName.equals("")) {
d.writeBytes("?\r\n");
} else {
d.writeBytes(tempNamedTopic.topicName + "\r\n");
}
} else { // path names not supported yet.
d.writeBytes("??\r\n");
}
}
d.writeBytes("\r\n");
d.flush();
releaseOutputStream();
} else {
/* THREADS is not part of the protocol.
* This is just for debugging purposes, and
* will be removed at some time.
*/
if (aCommand.equals("THREADS")) {
d = grabOutputStream();
printResponseHeaderWithoutGrabbing (d, rc_OK);
d.writeBytes ("activeCount = " + currentThread().getThreadGroup().activeCount());
Thread allThreads[];
try{
allThreads = new Thread[Thread.activeCount()];
Thread.enumerate (allThreads);
for (int x=0;x<allThreads.length;x++) {
d.writeBytes(" Thread:"+allThreads[x].getName()+":Pri="+allThreads[x].getPriority()+":Alive="+allThreads[x].isAlive());
if (x == 40) {
d.writeBytes(" ... etc. etc. total of " + allThreads.length + " threads. ");
x = allThreads.length;
}
}
} catch (Exception e){
OQServer.report(OQServer.repDebug, "thisThread was interrupted");
}
d.writeBytes("\r\n");
d.writeBytes("\r\n");
d.flush();
releaseOutputStream();
} else {
if (aCommand.equals ("SETCURSOR")) { // lets us skip messages we don't want to receive, etc.
/* ***********************
*
* This is totally half-baked, and will give unreliable results.
* Please implement this fully and cleanly by rewriting setSubscriptionCursor.
*
***************************/
aTopic = "";
if (sTokenizer.hasMoreTokens()) aTopic = sTokenizer.nextToken();
if (aTopic.equals("")) {
printResponseHeader(rc_SYNTAXERROR);
} else {
OQTopic setcursTopic = OQTopic.getByReference(aTopic);
if (setcursTopic != null) {
aTopicID = setcursTopic.topicID;
String sCursPosition = "";
int iCursPosition = 0;
if (sTokenizer.hasMoreTokens()) sCursPosition = sTokenizer.nextToken();
try {
iCursPosition = Integer.parseInt(sCursPosition);
// okay, set the user's cursor position.
setSubscriptionCursor(setcursTopic, iCursPosition);
printResponseHeader(rc_OK);
} catch (Exception ex) {
printResponseHeader(rc_CANNOTSETCURSOR);
}
} else {
printResponseHeader(rc_UNKNOWNTOPICID);
}
}
} else {
if (aCommand.equals("")) {
/* Blank line where expecting a command. No biggie. */
} else {
printResponseHeader (rc_UNKNOWNCOMMAND); // unknown command.
}
}
}
}
}
}
}
}
}
}
}
if (state == state_COMMANDRECEIVED) {
/* this won't be called if, e.g., we've
* entered state_SENDING.
*/
state = state_READY; /* go back to accepting commands. */
}
}
} /* end of switch (state) */
} /* end of while */
} catch (IOException ex) {
/* usually it's just the client closing the socket. */
}
} catch (Exception ex) {
OQServer.report(OQServer.repError, this.getName()+": "+ "Exception at far outside of BUCH run().");
ex.printStackTrace();
}
try {
oqServer.incrementPerfCounter(OQServer.PERFCOUNTER_CURRENT_CONNECTIONS, -1);
userCmdHandlers.removeElement(this);
prepareToDie();
} catch (Exception ex) {
OQServer.report(OQServer.repError, "Exception during cleanup of run() for BUCH. " + ex.getMessage() );
}
}
private void setSubscriptionCursor(OQTopic aTopic, int newCursorPosition){
/* Need to:
* - set position in OQUser.
* - have OQUser update database
* - remove any older/newer messages in to-do items.
* !!!!!!!!!!!!!!!!! this needs to be finished.
*/
thisUser.markItemAsHeardOf(aTopic.topicID, newCursorPosition);
thisUser.recordItemAsHeardOf(aTopic.topicID, newCursorPosition );
}
public static void main(String args[]) throws IOException {
}
}