/*
* JBoss, the OpenSource J2EE webOS
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jboss.cache.aop;
import EDU.oswego.cs.dl.util.concurrent.Semaphore;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.CacheException;
import org.jboss.cache.PropertyConfigurator;
import org.jboss.cache.TreeCache;
import org.jboss.cache.aop.test.Address;
import org.jboss.cache.aop.test.Person;
import org.jboss.cache.misc.TestingUtil;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.Set;
/**
* Tests concurrency of the __JBossInternal__ region of the cache.
*
* @author <a href="mailto://brian.stansberry@jboss.com">Brian Stansberry</a>
* @version $Revision: 1838 $
*/
public class SharedRefConcurrentTest extends TestCase
{
private Map caches;
private static final int CACHE_COUNT = 2;
private static final int SUBTREE_SIZE = 4;
private static final int MAX_OPS = 20;
private static final int MAX_SLEEP = 200;
private static final int MIN_SLEEP = 5;
private Log log=LogFactory.getLog(SharedRefConcurrentTest.class);
/**
* Tests partial state transfer under heavy concurrent load and REPL_SYNC.
* See <code>concurrentUseTest</code> for details.
*
* @throws Exception
*/
public void testConcurrentUseSync() throws Exception
{
concurrentUseTest(true);
}
/**
* Tests partial state transfer under heavy concurrent load and REPL_ASYNC.
* See <code>concurrentUseTest</code> for details.
*
* @throws Exception
*/
public void testConcurrentUseAsync() throws Exception
{
if (true)
return;
concurrentUseTest(false);
}
/**
* Initiates 4 caches and has them concurrently do puts/removes that involve
* shared references. Each cache operates only in its own subtree, and
* all shared references are within that subtree. Basically we are testing
* for concurrency problems with the shared reference map.
*
* @param sync whether to use REPL_SYNC or REPL_ASYNCE
*
* @throws Exception
*/
private void concurrentUseTest(boolean sync) throws Exception
{
// String[] names = { "A", "B", "C", "D", "E" };
// String[] names = { "A", "B" };
// int CACHE_COUNT = names.length;
CacheUser[] cacheUsers = new CacheUser[CACHE_COUNT];
try {
// Create a semaphore and take all its tickets
Semaphore semaphore = new Semaphore(CACHE_COUNT);
for (int i = 0; i < CACHE_COUNT; i++) {
semaphore.acquire();
}
// Create activation threads that will block on the semaphore
TreeCache[] caches = new TreeCache[CACHE_COUNT];
for (int i = 0; i < CACHE_COUNT; i++) {
cacheUsers[i] = new CacheUser(semaphore, String.valueOf(i), sync);
caches[i] = cacheUsers[i].getTreeCache();
}
// Make sure everyone is in sync
TestingUtil.blockUntilViewsReceived(caches, 60000);
// Release the semaphore to allow the threads to start work
semaphore.release(CACHE_COUNT);
// Sleep to ensure the threads get all the semaphore tickets
TestingUtil.sleepThread(1000);
// Reacquire the semaphore tickets; when we have them all
// we know the threads are done
for (int i = 0; i < CACHE_COUNT; i++) {
boolean acquired = semaphore.attempt(60000);
if (!acquired)
fail("failed to acquire semaphore " + i);
}
// Sleep to allow any async calls to clear
if (!sync)
TestingUtil.sleepThread(500);
// Ensure the caches held by the cacheUsers see all the values
for (int i = 0; i < CACHE_COUNT; i++)
{
log.info("TEST: CacheUser " + i + " did " +
cacheUsers[i].getOpCount() + " operations");
assertEquals("CacheUser " + i + " saw no exceptions",
null, cacheUsers[i].getException());
}
}
catch (Exception ex) {
fail(ex.getLocalizedMessage());
}
finally {
for (int i = 0; i < CACHE_COUNT; i++)
cacheUsers[i].cleanup();
}
}
protected PojoCache createCache(String cacheID, boolean sync)
throws Exception
{
if (caches.get(cacheID) != null)
throw new IllegalStateException(cacheID + " already created");
PojoCache tree = new PojoCache();
PropertyConfigurator config = new PropertyConfigurator();
String configFile = sync ? "META-INF/replSync-service.xml" : "META-INF/replAsync-service.xml";
config.configure(tree, configFile); // read in generic replAsync xml
tree.setDeadlockDetection(sync);
tree.setClusterName("StateTransferTest");
tree.createService();
tree.startService();
caches.put(cacheID, tree);
return tree;
}
protected void setUp() throws Exception
{
super.setUp();
caches = new HashMap();
}
protected void tearDown() throws Exception
{
super.tearDown();
Set keys = caches.keySet();
String[] cacheIDs = new String[keys.size()];
cacheIDs = (String[]) keys.toArray(cacheIDs);
for (int i = 0; i < cacheIDs.length; i++)
{
stopCache((PojoCache) caches.get(cacheIDs[i]));
}
}
protected void stopCache(PojoCache cache)
{
if (cache != null)
{
try
{
cache.stopService();
cache.destroyService();
}
catch (Exception e)
{
log.error("Exception stopping cache " + e.getMessage(), e);
}
}
}
private class CacheUser implements Runnable
{
private Semaphore semaphore;
private PojoCache cache;
private String name;
private Exception exception;
private Thread thread;
private Person[] people;
private boolean[] loaded;
private int opCount;
CacheUser(Semaphore semaphore,
String name,
boolean sync)
throws Exception
{
this.cache = createCache(name, sync);
this.semaphore = semaphore;
this.name = name;
Address addr1 = new Address();
addr1.setStreet("1 Test Street");
addr1.setCity("TestOne, CA");
Address addr2 = new Address();
addr2.setStreet("2 Test Street");
addr2.setCity("TestTwo, CA");
people = new Person[SUBTREE_SIZE];
loaded = new boolean[SUBTREE_SIZE];
for (int j = 0; j < SUBTREE_SIZE; j++)
{
Person p = new Person();
p.setName("Person " + j);
p.setAge(j);
p.setAddress((j % 2 == 0) ? addr1 : addr2);
people[j] = p;
}
thread = new Thread(this);
thread.start();
}
public void run()
{
boolean acquired = false;
try
{
acquired = semaphore.attempt(60000);
if (!acquired)
throw new Exception(name + " cannot acquire semaphore");
log.info("TEST: " + name + " acquired semaphore");
useCache();
}
catch (Exception e)
{
log.error("TEST: " + name + ": (opCount " + opCount + ") " + e.getLocalizedMessage(), e);
// Save it for the test to check
exception = e;
}
finally
{
if (acquired)
semaphore.release();
}
}
void useCache() throws Exception
{
Random random = new Random(System.currentTimeMillis() + name.hashCode());
int factor;
String fqn;
for (opCount = 0; opCount < MAX_OPS; opCount++)
{
factor = random.nextInt(MAX_SLEEP);
int index = factor % SUBTREE_SIZE;
fqn = "/" + name +"/" + String.valueOf(index);
if (loaded[index] == false) {
cache.putObject(fqn, people[index]);
loaded[index] = true;
log.info("TEST: " + name + " put Person at " + fqn);
}
else
{
cache.removeObject(fqn);
loaded[index] = false;
log.info("TEST: " + name + " removed Person at " + fqn);
}
int sleep = Math.max(MIN_SLEEP, factor);
log.info("TEST: " + name + " op " + opCount + " complete; sleeping " + sleep + " ms");
TestingUtil.sleepThread(sleep);
}
}
public Exception getException()
{
return exception;
}
public PojoCache getTreeCache()
{
return cache;
}
public Object getCacheValue(String fqn) throws CacheException
{
return cache.getObject(fqn);
}
public int getOpCount()
{
return opCount;
}
public void cleanup()
{
if (thread != null && thread.isAlive())
thread.interrupt();
}
}
}