/*
* JBoss, the OpenSource J2EE webOS
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jboss.cache.statetransfer;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import org.jboss.cache.CacheException;
import org.jboss.cache.Fqn;
import org.jboss.cache.PropertyConfigurator;
import org.jboss.cache.TreeCache;
import org.jboss.cache.TreeCacheMBean;
import org.jboss.cache.eviction.Region;
import org.jboss.cache.eviction.RegionManager;
import org.jboss.cache.loader.CacheLoader;
import org.jboss.cache.misc.TestingUtil;
import EDU.oswego.cs.dl.util.concurrent.Semaphore;
/**
* Abstract superclass of "StateTransferVersion"-specific tests
* of TreeCache's state transfer capability.
*
* TODO add tests with classloader regions
*
* @author <a href="mailto://brian.stansberry@jboss.com">Brian Stansberry</a>
* @version $Id$
*/
public abstract class VersionedTestBase extends StateTransferTestBase
{
private static final int SUBTREE_SIZE = 10;
public void testInitialStateTransfer() throws Exception
{
TreeCacheMBean cache1 = createCache("cache1", false, false, false);
cache1.put("/a/b", "name", JOE);
cache1.put("/a/b", "age", TWENTY);
cache1.put("/a/c", "name", BOB);
cache1.put("/a/c", "age", FORTY);
TreeCacheMBean cache2 = createCache("cache2", false, false, false);
// Pause to give caches time to see each other
TestingUtil.blockUntilViewsReceived(new TreeCacheMBean[] { cache1, cache2 }, 60000);
assertEquals("Incorrect name for /a/b", JOE, cache2.get("/a/b", "name"));
assertEquals("Incorrect age for /a/b", TWENTY, cache2.get("/a/b", "age"));
assertEquals("Incorrect name for /a/c", BOB, cache2.get("/a/c", "name"));
assertEquals("Incorrect age for /a/c", FORTY, cache2.get("/a/c", "age"));
}
public void testInitialStateTferWithLoader() throws Exception
{
initialStateTferWithLoaderTest(false);
}
public void testInitialStateTferWithAsyncLoader() throws Exception
{
initialStateTferWithLoaderTest(true);
}
private void initialStateTferWithLoaderTest(boolean asyncLoader) throws Exception
{
TreeCacheMBean cache1 = createCache("cache1", false, false, true);
cache1.put("/a/b", "name", JOE);
cache1.put("/a/b", "age", TWENTY);
cache1.put("/a/c", "name", BOB);
cache1.put("/a/c", "age", FORTY);
TreeCacheMBean cache2 = createCache("cache2", false, false, true, asyncLoader, false);
cache2.startService();
// Pause to give caches time to see each other
TestingUtil.blockUntilViewsReceived(new TreeCacheMBean[] { cache1, cache2 }, 60000);
if (asyncLoader)
TestingUtil.sleepThread((long)100);
CacheLoader loader = cache2.getCacheLoader();
Map ab = loader.get(A_B);
assertNotNull("Loader transferred /a/b", ab);
assertEquals("Incorrect loader name for /a/b", JOE, ab.get("name"));
assertEquals("Incorrect loader age for /a/b", TWENTY, ab.get("age"));
Map ac = loader.get(A_C);
assertNotNull("Loader transferred /a/c", ac);
assertEquals("Incorrect loader name for /a/c", BOB, ac.get("name"));
assertEquals("Incorrect loader age for /a/c", FORTY, ac.get("age"));
assertEquals("Incorrect name for /a/b", JOE, cache2.get("/a/b", "name"));
assertEquals("Incorrect age for /a/b", TWENTY, cache2.get("/a/b", "age"));
assertEquals("Incorrect name for /a/c", BOB, cache2.get("/a/c", "name"));
assertEquals("Incorrect age for /a/c", FORTY, cache2.get("/a/c", "age"));
}
public void testPartialStateTransfer() throws Exception
{
TreeCacheMBean cache1 = createCache("cache1", false, true, false);
cache1.activateRegion("/a");
cache1.put("/a/b", "name", JOE);
cache1.put("/a/b", "age", TWENTY);
cache1.put("/a/c", "name", BOB);
cache1.put("/a/c", "age", FORTY);
TreeCacheMBean cache2 = createCache("cache2", false, true, false);
// Pause to give caches time to see each other
TestingUtil.blockUntilViewsReceived(new TreeCacheMBean[] { cache1, cache2 }, 60000);
assertNull("/a/b name transferred against policy", cache2.get("/a/b", "name"));
assertNull("/a/b age transferred against policy", cache2.get("/a/b", "age"));
assertNull("/a/c name transferred against policy", cache2.get("/a/c", "name"));
assertNull("/a/c age transferred against policy", cache2.get("/a/c", "age"));
cache2.activateRegion("/a/b");
assertEquals("Incorrect name for /a/b", JOE, cache2.get("/a/b", "name"));
assertEquals("Incorrect age for /a/b", TWENTY, cache2.get("/a/b", "age"));
assertNull("/a/c name transferred against policy", cache2.get("/a/c", "name"));
assertNull("/a/c age transferred against policy", cache2.get("/a/c", "age"));
cache1.put("/a/d", "name", JANE);
assertNull("/a/d name transferred against policy", cache2.get("/a/d", "name"));
cache2.activateRegion("/a/c");
assertEquals("Incorrect name for /a/b", JOE, cache2.get("/a/b", "name"));
assertEquals("Incorrect age for /a/b", TWENTY, cache2.get("/a/b", "age"));
assertEquals("Incorrect name for /a/c", BOB, cache2.get("/a/c", "name"));
assertEquals("Incorrect age for /a/c", FORTY, cache2.get("/a/c", "age"));
assertNull("/a/d name transferred against policy", cache2.get("/a/d", "name"));
cache2.activateRegion("/a/d");
assertEquals("Incorrect name for /a/b", JOE, cache2.get("/a/b", "name"));
assertEquals("Incorrect age for /a/b", TWENTY, cache2.get("/a/b", "age"));
assertEquals("Incorrect name for /a/c", BOB, cache2.get("/a/c", "name"));
assertEquals("Incorrect age for /a/c", FORTY, cache2.get("/a/c", "age"));
assertEquals("Incorrect name for /a/d", JANE, cache2.get("/a/d", "name"));
cache1.inactivateRegion("/a");
cache1.activateRegion("/a/b");
cache1.activateRegion("/a/c");
cache1.activateRegion("/a/d");
assertEquals("Incorrect name for /a/b", JOE, cache1.get("/a/b", "name"));
assertEquals("Incorrect age for /a/b", TWENTY, cache1.get("/a/b", "age"));
assertEquals("Incorrect name for /a/c", BOB, cache1.get("/a/c", "name"));
assertEquals("Incorrect age for /a/c", FORTY, cache1.get("/a/c", "age"));
assertEquals("Incorrect name for /a/d", JANE, cache1.get("/a/d", "name"));
}
public void testPartialStateTferWithLoader() throws Exception
{
TreeCacheMBean cache1 = createCache("cache1", false, true, true);
cache1.activateRegion("/a");
cache1.put("/a/b", "name", JOE);
cache1.put("/a/b", "age", TWENTY);
cache1.put("/a/c", "name", BOB);
cache1.put("/a/c", "age", FORTY);
TreeCacheMBean cache2 = createCache("cache2", false, true, true);
// Pause to give caches time to see each other
TestingUtil.blockUntilViewsReceived(new TreeCacheMBean[] { cache1, cache2 }, 60000);
CacheLoader loader = cache2.getCacheLoader();
assertNull("/a/b transferred to loader against policy", loader.get(A_B));
assertNull("/a/b name transferred against policy", cache2.get("/a/b", "name"));
assertNull("/a/b age transferred against policy", cache2.get("/a/b", "age"));
assertNull("/a/c name transferred against policy", cache2.get("/a/c", "name"));
assertNull("/a/c age transferred against policy", cache2.get("/a/c", "age"));
cache2.activateRegion("/a/b");
Map ab = loader.get(A_B);
assertNotNull("Loader transferred /a/b", ab);
assertEquals("Incorrect name from loader for /a/b", JOE, ab.get("name"));
assertEquals("Incorrect age from loader for /a/b", TWENTY, ab.get("age"));
assertNull("/a/c transferred to loader against policy", loader.get(A_C));
assertEquals("Incorrect name for /a/b", JOE, cache2.get("/a/b", "name"));
assertEquals("Incorrect age for /a/b", TWENTY, cache2.get("/a/b", "age"));
assertNull("/a/c name transferred against policy", cache2.get("/a/c", "name"));
assertNull("/a/c age transferred against policy", cache2.get("/a/c", "age"));
cache1.put("/a/d", "name", JANE);
assertNull("/a/d name transferred against policy", cache2.get("/a/d", "name"));
cache2.activateRegion("/a/c");
ab = loader.get(A_B);
assertNotNull("Loader still has /a/b", ab);
assertEquals("Incorrect name from loader for /a/b", JOE, ab.get("name"));
assertEquals("Incorrect age from loader for /a/b", TWENTY, ab.get("age"));
Map ac = loader.get(A_C);
assertNotNull("Loader transferred /a/c", ac);
assertEquals("Incorrect name from loader for /a/c", BOB, ac.get("name"));
assertEquals("Incorrect age from loader for /a/c", FORTY, ac.get("age"));
assertEquals("Incorrect name for /a/b", JOE, cache2.get("/a/b", "name"));
assertEquals("Incorrect age for /a/b", TWENTY, cache2.get("/a/b", "age"));
assertEquals("Incorrect name for /a/c", BOB, cache2.get("/a/c", "name"));
assertEquals("Incorrect age for /a/c", FORTY, cache2.get("/a/c", "age"));
assertNull("/a/d name transferred against policy", cache2.get("/a/d", "name"));
cache2.activateRegion("/a/d");
ab = loader.get(A_B);
assertNotNull("Loader still has /a/b", ab);
assertEquals("Incorrect name from loader for /a/b", JOE, ab.get("name"));
assertEquals("Incorrect age from loader for /a/b", TWENTY, ab.get("age"));
ac = loader.get(A_C);
assertNotNull("Loader transferred /a/c", ac);
assertEquals("Incorrect name from loader for /a/c", BOB, ac.get("name"));
assertEquals("Incorrect age from loader for /a/c", FORTY, ac.get("age"));
Map ad = loader.get(A_D);
assertNotNull("Loader transferred /a/d", ad);
assertEquals("Incorrect name from loader for /a/d", JANE, ad.get("name"));
assertEquals("Incorrect name for /a/b", JOE, cache2.get("/a/b", "name"));
assertEquals("Incorrect age for /a/b", TWENTY, cache2.get("/a/b", "age"));
assertEquals("Incorrect name for /a/c", BOB, cache2.get("/a/c", "name"));
assertEquals("Incorrect age for /a/c", FORTY, cache2.get("/a/c", "age"));
assertEquals("Incorrect name for /a/d", JANE, cache2.get("/a/d", "name"));
cache1.inactivateRegion("/a");
cache1.activateRegion("/a/b");
cache1.activateRegion("/a/c");
cache1.activateRegion("/a/d");
loader = cache1.getCacheLoader();
ab = loader.get(A_B);
assertNotNull("Loader still has /a/b", ab);
assertEquals("Incorrect name from loader for /a/b", JOE, ab.get("name"));
assertEquals("Incorrect age from loader for /a/b", TWENTY, ab.get("age"));
ac = loader.get(A_C);
assertNotNull("Loader transferred /a/c", ac);
assertEquals("Incorrect name from loader for /a/c", BOB, ac.get("name"));
assertEquals("Incorrect age from loader for /a/c", FORTY, ac.get("age"));
ad = loader.get(A_D);
assertNotNull("Loader transferred /a/d", ad);
assertEquals("Incorrect name from loader for /a/d", JANE, ad.get("name"));
assertEquals("Incorrect name for /a/b", JOE, cache1.get("/a/b", "name"));
assertEquals("Incorrect age for /a/b", TWENTY, cache1.get("/a/b", "age"));
assertEquals("Incorrect name for /a/c", BOB, cache1.get("/a/c", "name"));
assertEquals("Incorrect age for /a/c", FORTY, cache1.get("/a/c", "age"));
assertEquals("Incorrect name for /a/d", JANE, cache1.get("/a/d", "name"));
}
public void testPartialStateTferWithClassLoader() throws Exception
{
// FIXME: This test is meaningless because MarshalledValueInputStream
// will find the classes w/ their own loader if TCL can't. Need
// to find a way to test!
// But, at least it tests JBCACHE-305 by registering a classloader
// both before and after startService()
// Set the TCL to a classloader that can't see Person/Address
Thread.currentThread().setContextClassLoader(getNotFoundClassLoader());
TreeCacheMBean cache1 = createCache("cache1",
false, // async
true, // use marshaller
true, // use cacheloader
false, false); // don't start
ClassLoader cl1 = getClassLoader();
cache1.registerClassLoader("/a", cl1);
startCache(cache1);
cache1.activateRegion("/a");
Object ben = createBen(cl1);
cache1.put("/a/b", "person", ben);
// For cache 2 we won't register loader until later
TreeCacheMBean cache2 = createCache("cache2",
false, // async
true, // use marshalling
true, // use cacheloader
false, true); // start
// Pause to give caches time to see each other
TestingUtil.blockUntilViewsReceived(new TreeCacheMBean[] { cache1, cache2 }, 60000);
CacheLoader loader = cache2.getCacheLoader();
assertNull("/a/b not transferred to loader", loader.get(A_B));
assertNull("/a/b not transferred to cache", cache2.get("/a/b", "person"));
ClassLoader cl2 = getClassLoader();
// cache2.registerClassLoader("/a", cl2);
cache2.activateRegion("/a");
assertEquals("Correct state from loader for /a/b", ben.toString(), loader.get(A_B).get("person").toString());
assertEquals("Correct state from cache for /a/b", ben.toString(), cache2.get("/a/b", "person").toString());
}
public void testLoadEntireStateAfterStart() throws Exception
{
TreeCacheMBean cache1 = createCache("cache1", false, true, true);
cache1.activateRegion("/");
cache1.put("/a/b", "name", JOE);
cache1.put("/a/b", "age", TWENTY);
cache1.put("/a/c", "name", BOB);
cache1.put("/a/c", "age", FORTY);
TreeCacheMBean cache2 = createCache("cache2", false, true, true);
// Pause to give caches time to see each other
TestingUtil.blockUntilViewsReceived(new TreeCacheMBean[] { cache1, cache2 }, 60000);
CacheLoader loader = cache2.getCacheLoader();
assertNull("/a/b transferred to loader against policy", loader.get(A_B));
assertNull("/a/b name transferred against policy", cache2.get("/a/b", "name"));
assertNull("/a/b age transferred against policy", cache2.get("/a/b", "age"));
assertNull("/a/c name transferred against policy", cache2.get("/a/c", "name"));
assertNull("/a/c age transferred against policy", cache2.get("/a/c", "age"));
cache2.activateRegion("/");
assertEquals("Incorrect name from loader for /a/b", JOE, loader.get(A_B).get("name"));
assertEquals("Incorrect age from loader for /a/b", TWENTY, loader.get(A_B).get("age"));
assertEquals("Incorrect name from loader for /a/c", BOB, loader.get(A_C).get("name"));
assertEquals("Incorrect age from loader for /a/c", FORTY, loader.get(A_C).get("age"));
assertEquals("Incorrect name for /a/b", JOE, cache2.get("/a/b", "name"));
assertEquals("Incorrect age for /a/b", TWENTY, cache2.get("/a/b", "age"));
assertEquals("Incorrect name for /a/c", BOB, cache2.get("/a/c", "name"));
assertEquals("Incorrect age for /a/c", FORTY, cache2.get("/a/c", "age"));
}
/**
* Tests concurrent activation of the same subtree by multiple nodes in a
* REPL_SYNC environment. The idea is to see what would happen with a
* farmed deployment. See <code>concurrentActivationTest</code> for details.
*
* @throws Exception
*/
public void testConcurrentActivationSync() throws Exception
{
concurrentActivationTest(true);
}
/**
* Tests concurrent activation of the same subtree by multiple nodes in a
* REPL_ASYNC environment. The idea is to see what would happen with a
* farmed deployment. See <code>concurrentActivationTest</code> for details.
*
* @throws Exception
*/
public void testConcurrentActivationAsync() throws Exception
{
concurrentActivationTest(false);
}
/**
* Starts 5 caches and then concurrently activates the same region under
* all 5, causing each to attempt a partial state transfer from the others.
* As soon as each cache has activated its region, it does a put to a node
* in the region, thus complicating the lives of the other caches trying
* to get partial state.
* <p>
* Failure condition is if any node sees an exception or if the final state
* of all caches is not consistent.
*
* @param sync use REPL_SYNC or REPL_ASYNC
* @throws Exception
*/
private void concurrentActivationTest(boolean sync) throws Exception
{
String[] names = { "A", "B", "C", "D", "E" };
int count = names.length;
CacheActivator[] activators = new CacheActivator[count];
try {
// Create a semaphore and take all its tickets
Semaphore semaphore = new Semaphore(count);
for (int i = 0; i < count; i++) {
semaphore.acquire();
}
// Create activation threads that will block on the semaphore
TreeCacheMBean[] caches = new TreeCacheMBean[count];
for (int i = 0; i < count; i++) {
activators[i] = new CacheActivator(semaphore, names[i], sync);
caches[i] = activators[i].getTreeCache();
activators[i].start();
}
// Make sure everyone is in sync
TestingUtil.blockUntilViewsReceived(caches, 60000);
// Release the semaphore to allow the threads to start work
semaphore.release(count);
// Sleep to ensure the threads get all the semaphore tickets
TestingUtil.sleepThread((long)1000);
// Reacquire the semaphore tickets; when we have them all
// we know the threads are done
for (int i = 0; i < count; i++) {
boolean acquired = semaphore.attempt(60000);
if (!acquired)
fail("failed to acquire semaphore " + i);
}
// Sleep to allow any async calls to clear
if (!sync)
TestingUtil.sleepThread((long)500);
// Ensure the caches held by the activators see all the values
for (int i = 0; i < count; i++)
{
assertNull("Activator " + names[i] + " caught an exception",
activators[i].getException());
for (int j = 0; j < count; j++)
{
String fqn = "/a/b/" + names[j];
assertEquals("Incorrect value for " + fqn + " on activator " + names[i],
"VALUE", activators[i].getCacheValue(fqn));
// System.out.println(names[i] + ":" + fqn + " = " + activators[i].getCacheValue(fqn));
}
}
}
catch (Exception ex) {
fail(ex.getLocalizedMessage());
}
finally {
for (int i = 0; i < count; i++)
activators[i].cleanup();
}
}
/**
* Tests partial state transfer under heavy concurrent load and REPL_SYNC.
* See <code>concurrentUseTest</code> for details.
*
* @throws Exception
*/
public void testConcurrentUseSync() throws Exception
{
concurrentUseTest(true);
}
/**
* Tests partial state transfer under heavy concurrent load and REPL_ASYNC.
* See <code>concurrentUseTest</code> for details.
*
* @throws Exception
*/
public void testConcurrentUseAsync() throws Exception
{
concurrentUseTest(false);
}
/**
* Initiates 5 caches, 4 with active trees and one with an inactive tree.
* Each of the active caches begins rapidly generating puts against nodes
* in a subtree for which it is responsible. The 5th cache activates
* each subtree, and at the end confirms no node saw any exceptions and
* that each node has consistent state.
*
* @param sync whether to use REPL_SYNC or REPL_ASYNCE
*
* @throws Exception
*/
private void concurrentUseTest(boolean sync) throws Exception
{
String[] names = { "B", "C", "D", "E" };
int count = names.length;
CacheStressor[] stressors = new CacheStressor[count];
try {
// The first cache we create is inactivated.
TreeCacheMBean cacheA = createCache("cacheA", sync, true, false);
TreeCacheMBean[] caches = new TreeCacheMBean[count + 1];
caches[0] = cacheA;
// Create a semaphore and take all its tickets
Semaphore semaphore = new Semaphore(count);
for (int i = 0; i < count; i++) {
semaphore.acquire();
}
// Create stressor threads that will block on the semaphore
for (int i = 0; i < count; i++)
{
stressors[i] = new CacheStressor(semaphore, names[i], sync);
caches[i + 1] = stressors[i].getTreeCache();
stressors[i].start();
}
// Make sure everyone's views are in sync
TestingUtil.blockUntilViewsReceived(caches, 60000);
// Repeat the basic test four times
//for (int x = 0; x < 4; x++)
for (int x = 0; x < 1; x++)
{
if (x > 0)
{
// Reset things by inactivating the region
// and enabling the stressors
for (int i = 0; i < count; i++)
{
cacheA.inactivateRegion("/" + names[i]);
System.out.println("Run " + x + "-- /" + names[i] + " inactivated on A");
stressors[i].startPuts();
}
}
// Release the semaphore to allow the threads to start work
semaphore.release(count);
// Sleep to ensure the threads get all the semaphore tickets
// and to ensure puts are actively in progress
TestingUtil.sleepThread((long)300);
// Activate cacheA
for (int i = 0; i < count; i++)
{
// System.out.println("Activating /" + names[i] + " on A");
cacheA.activateRegion("/" + names[i]);
// Stop the stressor so we don't pollute cacheA's state
// with too many messages sent after activation -- we want
// to compare transferred state with the sender
stressors[i].stopPuts();
System.out.println("Run " + x + "-- /" + names[i] + " activated on A");
// Reacquire one semaphore ticket
boolean acquired = semaphore.attempt(60000);
if (!acquired)
fail("failed to acquire semaphore " + i);
// Pause to allow other work to proceed
TestingUtil.sleepThread((long)100);
}
// Sleep to allow any in transit msgs to clear
// if (!sync)
TestingUtil.sleepThread((long)1000);
// Ensure the stressors saw no exceptions
for (int i = 0; i < count; i++)
{
assertNull("Stressor " + names[i] + " caught an exception",
stressors[i].getException());
}
// Compare cache contents
for (int i = 0; i < count; i++)
{
for (int j = 0; j < SUBTREE_SIZE; j++)
{
String fqn = "/" + names[i] +"/" + j;
assertEquals("/A/" + j + " matches " + fqn,
cacheA.get(fqn, "KEY"),
stressors[i].getTreeCache().get(fqn, "KEY"));
}
}
}
for (int i = 0; i < count; i++)
stressors[i].stopThread();
}
finally {
for (int i = 0; i < count; i++)
{
if (stressors[i] != null)
stressors[i].cleanup();
}
}
}
/**
* Test for JBCACHE-913
*
* @throws Exception
*/
public void testEvictionSeesStateTransfer() throws Exception
{
TreeCache cache1 = new TreeCache();
PropertyConfigurator config = new PropertyConfigurator();
config.configure(cache1, "META-INF/replSync-eviction-service.xml");
caches.put("cache1", cache1);
TreeCache cache2 = new TreeCache();
config.configure(cache2, "META-INF/replSync-eviction-service.xml");
caches.put("cache2", cache2);
cache1.startService();
cache1.put("/a/b/c", "key", "value");
cache2.startService();
RegionManager erm = cache2.getEvictionRegionManager();
Region region = erm.getRegion(Fqn.ROOT);
// We expect events for /a, /a/b and /a/b/c
assertEquals("Saw the expected number of node events", 3, region.nodeEventQueueSize());
}
/**
* Further test for JBCACHE-913
*
* @throws Exception
*/
public void testEvictionAfterStateTransfer() throws Exception
{
TreeCache cache1 = new TreeCache();
PropertyConfigurator config = new PropertyConfigurator();
config.configure(cache1, "META-INF/replSync-eviction-service.xml");
caches.put("cache1", cache1);
final TreeCache cache2 = new TreeCache();
config.configure(cache2, "META-INF/replSync-eviction-service.xml");
caches.put("cache2", cache2);
cache1.startService();
// Put in a bunch of data, both in the default region
// and in the region that's configured for 5 nodes max w/ a 4 sec max age
for (int i = 0; i < 25000; i++)
{
cache1.put("/base/" + i, "key", "base" + i);
if (i < 5)
cache1.put("/org/jboss/test/data/" + i, "key", "data" + i);
}
cache2.startService();
Set children = cache2.getChildrenNames("/base");
assertNotNull("Base children transferred", children);
assertTrue("Minimum number of base children transferred", children.size() >= 5000);
// Sleep 2.5 secs so the nodes we are about to create in data won't
// exceed the 4 sec TTL when eviction thread runs
TestingUtil.sleepThread(2500);
// Thread that puts data in the cache
class Putter extends Thread
{
TreeCache cache = null;
boolean stopped = false;
Exception ex = null;
public void run()
{
int i = 25000;
while (!stopped)
{
try
{
cache.put("/base/" + i, "key", "base" + i);
cache.put("/org/jboss/test/data/" + i, "key", "data"+i);
i++;
}
catch (Exception e)
{
ex = e;
}
}
}
}
// Start putting data in the caches
Putter p1= new Putter();
p1.cache = cache1;
p1.start();
Putter p2= new Putter();
p2.cache = cache2;
p2.start();
// Pause a while to let puts occur. Make the pause
// random to guard against some pattern invalidating the result
Random rnd = new Random();
TestingUtil.sleepThread(rnd.nextInt(200) + 25);
// Continuously check cache2 to confirm that at some point
// nodes are evicted from both regions
int maxCountBase = 0;
int maxCountData = 0;
boolean sawBaseDecrease = false;
boolean sawDataDecrease = false;
long start = System.currentTimeMillis();
while ((System.currentTimeMillis() - start) < 10000)
{
children = cache2.getChildrenNames("/org/jboss/test/data");
if (children != null)
{
int dataCount = children.size();
if (dataCount < maxCountData)
{
System.out.println("data " + dataCount + " < " + maxCountData + " elapsed = " + (System.currentTimeMillis() - start));
sawDataDecrease = true;
}
else
{
maxCountData = dataCount;
}
}
children = cache2.getChildrenNames("/base");
if (children != null)
{
int baseCount = children.size();
if (baseCount < maxCountBase)
{
System.out.println("base " + baseCount + " < " + maxCountBase+ " elapsed = " + (System.currentTimeMillis() - start));
sawBaseDecrease = true;
}
else
{
maxCountBase = baseCount;
}
}
if (sawDataDecrease && sawBaseDecrease)
{
break;
}
TestingUtil.sleepThread(50);
}
p1.stopped = true;
p2.stopped = true;
p1.join(1000);
p2.join(1000);
assertTrue("Saw data decrease", sawDataDecrease);
assertTrue("Saw base decrease", sawBaseDecrease);
assertNull("No exceptions in p1", p1.ex);
assertNull("No exceptions in p2", p2.ex);
// Sleep 5.1 secs so we are sure the eviction thread ran again
TestingUtil.sleepThread(5100);
children = cache2.getChildrenNames("/base");
if (children != null)
{
System.out.println(children.size());
assertTrue("Excess children evicted", children.size() <= 5000);
}
children = cache2.getChildrenNames("/org/jboss/test/data");
if (children != null)
{
System.out.println(children.size());
assertTrue("Excess children evicted", children.size() <= 5);
}
if (children != null && children.size() > 0)
{
// Sleep more to let the eviction thread run again,
// which will evict all data nodes due to their ttl of 4 secs
TestingUtil.sleepThread(5100);
children = cache2.getChildrenNames("/org/jboss/test/data");
if (children != null)
assertEquals("All data children evicted", 0, children.size());
}
}
private Object createBen(ClassLoader loader) throws Exception
{
Class addrClazz = loader.loadClass("org.jboss.cache.marshall.Address");
Method setCity = addrClazz.getMethod("setCity", new Class[] { String.class});
Method setStreet = addrClazz.getMethod("setStreet", new Class[] { String.class});
Method setZip = addrClazz.getMethod("setZip", new Class[] { int.class});
Object addr = addrClazz.newInstance();
setCity.invoke(addr, new Object[] { "San Jose" });
setStreet.invoke(addr, new Object[] { "1007 Home" } );
setZip.invoke(addr, new Object[] { new Integer(90210) } );
Class benClazz = loader.loadClass("org.jboss.cache.marshall.Person");
Method setName = benClazz.getMethod("setName", new Class[] { String.class});
Method setAddress = benClazz.getMethod("setAddress", new Class[] { addrClazz });
Object ben = benClazz.newInstance();
setName.invoke(ben, new Object[] { "Ben" } );
setAddress.invoke(ben, new Object[] { addr } );
return ben;
}
private class CacheActivator extends CacheUser
{
CacheActivator(Semaphore semaphore,
String name,
boolean sync)
throws Exception
{
super(semaphore, name, sync, false);
}
void useCache() throws Exception
{
cache.activateRegion("/a/b");
// System.out.println(name + " activated region" + " " + System.currentTimeMillis());
String childFqn = "/a/b/" + name;
cache.put(childFqn, "KEY", "VALUE");
// System.out.println(name + " put fqn " + childFqn + " " + System.currentTimeMillis());
}
public Object getCacheValue(String fqn) throws CacheException
{
return cache.get(fqn, "KEY");
}
}
private class CacheStressor extends CacheUser
{
private Random random = new Random(System.currentTimeMillis());
private boolean putsStopped = false;
private boolean stopped = false;
CacheStressor(Semaphore semaphore,
String name,
boolean sync)
throws Exception
{
super(semaphore, name, sync, true);
}
void useCache() throws Exception
{
// Do continuous puts into the cache. Use our own nodes,
// as we're not testing conflicts between writer nodes,
// just whether activation causes problems
int factor = 0;
int i = 0;
String fqn = null;
boolean acquired = false;
while (!stopped)
{
if (i > 0)
{
acquired = semaphore.attempt(60000);
if (!acquired)
throw new Exception(name + " cannot acquire semaphore");
}
while(!putsStopped)
{
factor = random.nextInt(50);
fqn = "/" + name +"/" + String.valueOf(factor % SUBTREE_SIZE);
Integer value = new Integer(factor / SUBTREE_SIZE);
cache.put(fqn, "KEY", value);
TestingUtil.sleepThread((long)factor);
i++;
}
System.out.println(name + ": last put [#"+ i + "] -- " + fqn + " = " + (factor / SUBTREE_SIZE));
semaphore.release();
// Go to sleep until directed otherwise
while (!stopped && putsStopped)
TestingUtil.sleepThread((long)100);
}
}
public void stopPuts()
{
putsStopped = true;
}
public void startPuts()
{
putsStopped = false;
}
public void stopThread()
{
stopped = true;
if (thread.isAlive())
thread.interrupt();
}
}
private abstract class CacheUser implements Runnable
{
protected Semaphore semaphore;
protected TreeCacheMBean cache;
protected String name;
protected Exception exception;
protected Thread thread;
CacheUser(Semaphore semaphore,
String name,
boolean sync,
boolean activateRoot)
throws Exception
{
this.cache = createCache(name, sync, true, false);
this.semaphore = semaphore;
this.name = name;
if (activateRoot)
cache.activateRegion("/");
}
public void run()
{
boolean acquired = false;
try
{
acquired = semaphore.attempt(60000);
if (!acquired)
throw new Exception(name + " cannot acquire semaphore");
//System.out.println(name + " acquired semaphore");
useCache();
}
catch (Exception e)
{
System.out.println(name + ": " + e.getLocalizedMessage());
e.printStackTrace(System.out);
// Save it for the test to check
exception = e;
}
finally
{
if (acquired)
semaphore.release();
}
}
abstract void useCache() throws Exception;
public Exception getException()
{
return exception;
}
public TreeCacheMBean getTreeCache()
{
return cache;
}
public void start()
{
thread = new Thread(this);
thread.start();
}
public void cleanup()
{
if (thread != null && thread.isAlive())
thread.interrupt();
}
}
}