package org.jboss.cache.buddyreplication;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.CacheSPI;
import org.jboss.cache.notifications.annotation.CacheListener;
import org.jboss.cache.notifications.annotation.CacheStopped;
import org.jboss.cache.notifications.annotation.NodeVisited;
import org.jboss.cache.notifications.event.CacheStoppedEvent;
import org.jboss.cache.notifications.event.NodeVisitedEvent;
import org.jboss.cache.util.CachePrinter;
import org.jboss.cache.util.TestingUtil;
import org.testng.annotations.Test;
import java.util.List;
import java.util.concurrent.CountDownLatch;
@Test(testName = "buddyreplication.GravitationFromDyingNodeTest", groups = "functional")
public class GravitationFromDyingNodeTest extends BuddyReplicationTestsBase {
static Log log = LogFactory.getLog(GravitationFromDyingNodeTest.class);
public void testGravitationFromDyingNode() throws Exception {
// this test starts 4 nodes, each with 2 backups.
// the data owner is stopped.
// the node with no backup requests state from the other 2.
// one of the other two should be shutting down, and the other should be slow to respond
// so that the illegal cache state exception is propagated to the caller
// this encapsulates what is seen in JBCACHE-1528
List<CacheSPI<Object, Object>> caches = createCaches(2, 4, false, true, false, true);
final CacheSPI c1 = caches.get(0), c2 = caches.get(1), c3 = caches.get(2), c4 = caches.get(3);
c1.put("/a/b/c", "k", "v");
System.out.println("BEFORE: " + CachePrinter.printCacheDetails(c1, c2, c3, c4));
// kill c1
c1.stop();
log.error("** Stopped C1!");
// now stop c2, but make sure it takes time to shut down.
final StopListener sl = new StopListener();
c2.addCacheListener(sl);
final GetNodeListener gnl = new GetNodeListener();
c3.addCacheListener(gnl);
Thread stopper = new Thread("Stopper") {
@Override
public void run() {
log.error("** Stopping C2!");
c2.stop();
log.error("** Stopped C2!");
}
};
stopper.start();
Thread gateOpener = new Thread("GateOpener") {
@Override
public void run() {
TestingUtil.sleepThread(1000); // Yuk
log.error("** Opening gates!!");
sl.openGate();
gnl.openGate();
}
};
gateOpener.start();
log.error("** Starting gravitation!");
c4.getInvocationContext().getOptionOverrides().setForceDataGravitation(true);
assert null != c4.getNode("/a/b/c");
}
static abstract class GatedListener {
CountDownLatch gate = new CountDownLatch(1);
public void openGate() {
gate.countDown();
}
void waitForGate() {
try {
gate.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
@CacheListener
public static class StopListener extends GatedListener {
@CacheStopped
public void stop(CacheStoppedEvent e) {
if (e.isPre()) {
log.error("** Waiting on gate to stop C2!");
waitForGate();
}
}
}
@CacheListener
public static class GetNodeListener extends GatedListener {
@NodeVisited
public void visit(NodeVisitedEvent e) {
if (e.isPre()) {
log.error("** Waiting on gate to read C3!");
waitForGate();
}
}
}
}