/*
*
* JBoss, the OpenSource J2EE webOS
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jboss.cache;
import org.jgroups.View;
import org.jboss.cache.data.Student;
import org.jboss.cache.data.Address;
import org.jboss.cache.data.Course;
import org.jboss.cache.data.RandomString;
import org.jboss.cache.transaction.DummyTransactionManager;
import org.jboss.cache.aop.PojoCache;
import java.io.BufferedReader;
import java.io.FileReader;
import java.util.*;
import EDU.oswego.cs.dl.util.concurrent.CountDown;
import javax.transaction.TransactionManager;
import javax.transaction.Transaction;
import javax.transaction.SystemException;
/**
* A standalone dummy server that accepts data from other replicated cache.
*/
public class Server {
static PojoCache cache_;
Properties props_ = new Properties();
static int threads_; // how many threads to send the put.
static int loops_; // how many loops
static int sleepIntervalInMillis_; // sleep interval between request
static boolean randomSleep_ = false;
static int objectListSize_ = 10; // what is the list size in the Courses list
final static String ROOT = "/JSESSION/localhost/"; // emulates the http test
static boolean isReady_ = false;
static Throwable ex_;
static long exceptionCounts_ = 0;
static CountDown countdown_;
static boolean receiver_ = false;
static boolean transaction_ = false;
static Random random = new Random(10);
static int updatePojoInterval_ = 0;
static boolean runPojoCache_ = false;
static long startTime_ = 0;
static long endTime_ = 0;
static int operationType_ = 1;
void initCache() throws Exception {
cache_ = new PojoCache();
PropertyConfigurator config = new PropertyConfigurator();
String file = (String)props_.get("cache_config");
if(file==null)
{
throw new RuntimeException("Cache config xml is not specified.");
}
config.configure(cache_, file); // read in generic replAsync xml
cache_.setTransactionManagerLookupClass("org.jboss.cache.DummyTransactionManagerLookup");
}
void destroyCache() throws Exception {
cache_.stopService();
cache_ = null;
}
TreeCache getCache() {
return cache_;
}
void parseConfig(String fileName) throws Exception {
String line;
StringTokenizer st;
String key, val;
BufferedReader fileReader = new BufferedReader(new FileReader(fileName));
while ((line = fileReader.readLine()) != null) {
if (line.startsWith("#"))
continue;
line = line.trim();
if (line.length() == 0)
continue;
st = new StringTokenizer(line, "=", false);
key = st.nextToken().toLowerCase();
val = st.nextToken();
props_.put(key, val);
System.out.println("Read in config key, value: "+ key + " "+ val);
}
fileReader.close();
}
void startLoadTest() throws InterruptedException, CacheException {
String str = (String)props_.get("threads");
if(str == null)
throw new RuntimeException("Can't find threads property");
threads_ = Integer.parseInt(str);
str = (String)props_.get("loops");
if(str == null)
throw new RuntimeException("Can't find loops property");
loops_ = Integer.parseInt(str);
str = (String)props_.get("sleep_interval_millis");
if(str == null)
throw new RuntimeException("Can't find sleepIntervalInMillis property");
sleepIntervalInMillis_ = Integer.parseInt(str);
str = (String)props_.get("random_sleep_interval");
if(str == null)
throw new RuntimeException("Can't find random_sleep_interval property");
randomSleep_ = new Boolean(str).booleanValue();
str = (String)props_.get("object_list_size");
if(str == null)
throw new RuntimeException("Can't find objectListSize property");
objectListSize_ = Integer.parseInt(str);
str = (String)props_.get("transaction");
if(str == null)
throw new RuntimeException("Can't find transaction property");
transaction_ = new Boolean(str).booleanValue();
/*
str = (String)props_.get("operation_type");
if(str == null)
throw new RuntimeException("Can't find operation_type property");
operationType_ = Integer.parseInt(str); */
str = (String)props_.get("run_pojocache");
if(str == null)
throw new RuntimeException("Can't find run_PojoCache property");
runPojoCache_ = new Boolean(str).booleanValue();
str = (String)props_.get("update_pojo_interval");
if(str == null)
throw new RuntimeException("Can't find update_pojo_interval property");
updatePojoInterval_ = Integer.parseInt(str);
countdown_ = new CountDown(threads_);
// Warm up the cache first to avoid any simultaneous write contention.
if(cache_.getCoordinator().equals(cache_.getLocalAddress()))
{
System.out.println("I am the coordinator: " +cache_.getLocalAddress());
cache_.put(ROOT +cache_.getLocalAddress().toString(), "test", "test");
}
sleep_(300);
if(runPojoCache_)
{
// This is to pre-load the POJO class definition
Object pojo = Loader.constructObject();
cache_.putObject(ROOT + cache_.getLocalAddress().toString() +"/test", pojo);
}
startTime_ = System.currentTimeMillis();
for(int i=0; i < threads_ ; i++)
{
Loader loader = new Loader(cache_, i, cache_.getLocalAddress().toString());
loader.start();
}
}
static int getSleepInterval()
{
if(sleepIntervalInMillis_ ==0) return sleepIntervalInMillis_;
if(randomSleep_)
{
return random.nextInt(sleepIntervalInMillis_);
} else
{
return sleepIntervalInMillis_;
}
}
List getMembers(String addrListStr)
{
StringTokenizer tok = new StringTokenizer(addrListStr, ",");
List list = new ArrayList();
while(tok.hasMoreTokens())
{
list.add(tok.nextToken());
}
return list;
}
static void sleep_(long msec)
{
try {
Thread.sleep(msec);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
void printUsage()
{
System.out.println("Options: -config to specify config file (like bench.txt)\n" +
" -receiver receiver only mode, e.g., no send\n");
}
public void printInfo()
{
System.out.println("\n************************************");
System.out.println("\nThis is a simple performance test for JBossCache.\n" +
"The load pattern is like those of http session repl, that is,\neach put " +
"is under a separate sub-tree.\n" +"As a result, there should not be write contention.\n");
}
public static void main(String[] args) throws Exception {
Server server = new Server();
if(args.length ==0)
{
server.printUsage();
return;
}
for (int i = 0; i < args.length; i++) {
if ("-config".equals(args[i])) {
System.out.println("Configuration file is: " + args[i+1]);
server.parseConfig(args[++i]);
} else if("-receiver".equals(args[i])) {
receiver_ = true;
} else {
System.err.println("Unknown argument:" + args[i]);
server.printUsage();
return;
}
}
server.printInfo();
String addrListStr = (String)server.props_.get("members");
List addrList = server.getMembers(addrListStr);
server.initCache();
ViewListener listener = new ViewListener(addrList);
cache_.addTreeCacheListener(listener); // register for view change
cache_.startService();
System.out.println("Cache started .. ");
System.out.println("Waiting for the other nodes to start up...");
while (!isReady_) {
sleep_(100);
}
// Let's remove the tree cache listener since we are done. This is needed so we don't cloud
// the perf number.
cache_.removeTreeCacheListener(listener);
sleep_(2000);
if(!receiver_)
{
server.startLoadTest();
System.out.println("Waiting for the test to finish...");
// blocked untill all done
countdown_.acquire();
endTime_ = System.currentTimeMillis();
System.out.println("\nThroughtput for this node with:\n" + "threads = " + threads_
+"\nloops = " +loops_
+ "\nsleep interval = " +sleepIntervalInMillis_ + "\nobject list size = "
+ objectListSize_ + "\ntranasaction? " +transaction_ +"\n"
+ "is: " + (loops_ *threads_ *1000)/(endTime_ -startTime_) + " requests/sec\n");
} else
{
System.out.println("Receiver mode only. Won't send request...");
}
if(ex_ != null)
{
System.err.println("Exception counts: " +exceptionCounts_);
throw new RuntimeException("Exception occurred during the run: " +ex_);
}
System.out.println("Test is finished. hit ctrl-c to kill the process ...");
while(true)
{
sleep_(1000);
}
// server.destroyCache();
}
public static class ViewListener implements TreeCacheListener {
List addrList_; // List of address
public ViewListener(List addrList) {
addrList_ = addrList;
}
public void nodeCreated(Fqn fqn) {
}
public void nodeRemoved(Fqn fqn) {
}
public void nodeLoaded(Fqn fqn) {
}
public void nodeEvicted(Fqn fqn) {
}
public void nodeModified(Fqn fqn) {
}
public void nodeVisited(Fqn fqn) {
}
public void cacheStarted(TreeCache cache) {
}
public void cacheStopped(TreeCache cache) {
}
public void viewChange(View new_view) // might be MergeView after merging
{
Vector vector = new_view.getMembers();
if(vector.size() > addrList_.size())
{
// We have more members than is specified!
throw new RuntimeException("You have more members in the cluster group than specified in the config."
+ " size of members now: " +vector.size());
}
// Need to match individually but let's just assum the size matters then.
if(vector.size() == addrList_.size())
{
// good to go
isReady_ = true;
}
}
}
public static class Loader extends Thread
{
int threadId;
String localAddress;
PojoCache cache_;
TransactionManager tm_ = DummyTransactionManager.getInstance();
public Loader(PojoCache cache, int i, String localAddress)
{
cache_ = cache;
threadId = i;
this.localAddress = localAddress;
// System.out.println("Local addres is: " +localAddress.toString());
}
public void run() {
Object obj = null;
Object obj1 = constructObject();
Object obj2 = constructObject();
String fqn = ROOT + localAddress + "/" + threadId;
long start_time = System.currentTimeMillis();
boolean isEven = true;
for(int i=0; i < loops_; i++)
{
if(getSleepInterval() !=0)
sleep_(getSleepInterval());
Transaction tx = null;
try {
if(transaction_)
{
tm_.begin();
tx = tm_.getTransaction();
}
boolean update = true;
if(runPojoCache_ && updatePojoInterval_!= 1)
{
if((i%updatePojoInterval_) != 0 )
{
update = false;
}
}
/**
* Need this for PojoCache. Otherwise, it'd be cheating becuase another putObject of thee
* same pojo is fast.
*/
if(update)
{
if(isEven)
{
obj = obj1;
isEven = false;
}
else
{
obj = obj2;
isEven = true;
}
}
doWork(fqn, localAddress, obj, update);
if(transaction_)
{
tx.commit();
}
} catch (Exception e) {
exceptionCounts_++;
ex_ = e;
e.printStackTrace();
if(transaction_)
{
try {
tx.rollback();
} catch (SystemException e1) {
e1.printStackTrace();
ex_ = e1;
}
}
} finally
{
}
// System.out.println("Processing at loop: " +i);
if( (i%50) ==0 && threadId == 0)
{
System.out.println("Processing at loop: " +i);
}
}
long end_time = System.currentTimeMillis();
long time = end_time - start_time;
if(threadId == 0)
System.out.println("Total time spent (ms) on thread id 0: " +time + " for "+ loops_ +" loops");
countdown_.release();
}
void doWork(String fqn, Object key, Object pojo, boolean update) throws CacheException {
// Note that we still need to reconstruct object every loop to avoid cheating false identity problem.
if(!runPojoCache_)
{
doPlainCacheWork(fqn, key, pojo);
} else
{
if(update)
{
// System.out.println("Running pojo cache update");
doPojoCacheWork(fqn, pojo);
} else
{
// System.out.println("Running field update");
doPojoCacheFieldWork(fqn, pojo);
}
}
}
void doPlainCacheWork(String fqn, Object key, Object pojo) throws CacheException {
if(operationType_ == 1)
{
Object obj1 = constructObject();
cache_.put(fqn, key, obj1);
Object obj = cache_.get(fqn, key);
if(!((Student)obj).getName().equals("Joe"))
{
throw new RuntimeException("Value returned not Joe");
}
} else
{
cache_.remove(fqn, key);
cache_.put(fqn, key, pojo);
Object obj = cache_.get(fqn, key);
if(!((Student)obj).getName().equals("Joe"))
{
throw new RuntimeException("Value returned not Joe");
}
}
}
void doPojoCacheWork(String fqn, Object pojo) throws CacheException {
if(operationType_ == 1)
{
cache_.putObject(fqn, pojo);
Object obj = cache_.getObject(fqn);
if(!((Student)obj).getName().equals("Joe"))
{
throw new RuntimeException("Value returned not Joe");
}
} else
{
cache_.removeObject(fqn);
cache_.putObject(fqn, pojo);
Object obj = cache_.getObject(fqn);
if(!((Student)obj).getName().equals("Joe"))
{
throw new RuntimeException("Value returned not Joe");
}
}
}
void doPojoCacheFieldWork(String fqn, Object pojo) throws CacheException {
Object obj = cache_.getObject(fqn);
List list = (List)((Student)obj).getCourses();
((Course)list.get(0)).setInstructor("Ben Wang");
// ((Student)obj).setSchool("Pingtung");
}
static Object constructObject()
{
Student joe = new Student();
joe.setName("Joe");
Address add = new Address();
add.setZip(94086);
add.setCity("Sunnyvale)");
add.setStreet("Albertson");
joe.setAddress(add);
String str;
for(int i=0; i < objectListSize_; i++)
{
Course course = new Course();
str = RandomString.randomstring(10,20);
course.setInstructor(str);
str = RandomString.randomstring(10,20);
course.setTitle(str);
str = RandomString.randomstring(10,20);
course.setRoom(str);
joe.addCourse(course);
}
return joe;
}
}
}