package org.jboss.cache.cluster;
import org.easymock.EasyMock;
import static org.easymock.EasyMock.*;
import org.jboss.cache.Cache;
import org.jboss.cache.RPCManager;
import org.jboss.cache.commands.ReplicableCommand;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.factories.ComponentRegistry;
import org.jboss.cache.util.TestingUtil;
import org.jboss.cache.util.internals.ReplicationQueueNotifier;
import org.jgroups.Address;
import static org.testng.AssertJUnit.assertNotNull;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import java.util.Collections;
import java.util.Vector;
import java.util.concurrent.CountDownLatch;
import org.jboss.cache.UnitTestCacheFactory;
@Test(groups = "functional", sequential = true)
public class ReplicationQueueTest
{
private static final int COUNT = 10;
Cache cache, cache2;
ReplicationQueue replQ;
ComponentRegistry registry;
RPCManager originalRpcManager;
@BeforeMethod
public void setUp() throws CloneNotSupportedException
{
Configuration c = new Configuration();
c.setCacheMode(Configuration.CacheMode.REPL_ASYNC);
c.setUseReplQueue(true);
c.setReplQueueMaxElements(COUNT);
c.setReplQueueInterval(-1);
cache = new UnitTestCacheFactory<Object, Object>().createCache(c, false);
cache.start();
registry = TestingUtil.extractComponentRegistry(cache);
replQ = registry.getComponent(ReplicationQueue.class);
originalRpcManager = cache.getConfiguration().getRuntimeConfig().getRPCManager();
cache2 = new UnitTestCacheFactory<Object, Object>().createCache(cache.getConfiguration().clone());
TestingUtil.blockUntilViewsReceived(60000, cache, cache2);
}
@AfterMethod
public void tearDown()
{
// reset the original RPCManager
injectRpcManager(originalRpcManager);
TestingUtil.killCaches(cache, cache2);
cache = null;
cache2 = null;
}
private void injectRpcManager(RPCManager manager)
{
registry.registerComponent(manager, RPCManager.class);
}
public void testQueueHoldAndFlush() throws Exception
{
assert replQ != null;
// mock the RPCManager used in the cache
RPCManager mockRpcManager = EasyMock.createStrictMock(RPCManager.class);
injectRpcManager(mockRpcManager);
// expect basic cluster related calls
expect(mockRpcManager.getMembers()).andReturn(originalRpcManager.getMembers()).anyTimes();
replay(mockRpcManager);
// check that nothing on the RPCManager will be called until we hit the replication queue threshold.
for (int i = 0; i < COUNT - 1; i++) cache.put("/a/b/c/" + i, "k", "v");
assert replQ.elements.size() == COUNT - 1;
// verify that no calls have been made on the mockRpcManager
verify(mockRpcManager);
// reset the mock
reset(mockRpcManager);
// now try the last PUT which should result in the queue being flushed.
expect(mockRpcManager.getMembers()).andReturn(originalRpcManager.getMembers()).anyTimes();
expect(mockRpcManager.callRemoteMethods((Vector<Address>) anyObject(), (ReplicableCommand) anyObject(), anyBoolean(), anyLong(), anyBoolean())).andReturn(Collections.emptyList()).anyTimes();
replay(mockRpcManager);
cache.put("/a/b/c/LAST", "k", "v");
assert replQ.elements.size() == 0;
// verify that the rpc call was only made once.
verify(mockRpcManager);
}
public void testFlushConcurrency() throws Exception
{
// will create multiple threads to constantly perform a cache update, and measure the number of expected invocations on the RPC manager.
final int numThreads = 25;
final int numLoopsPerThread = 1000;
int totalInvocations = numThreads * numLoopsPerThread;
assert totalInvocations % COUNT == 0 : "NumThreads and NumLoopsPerThread must multiply to be a multiple of COUNT";
final CountDownLatch latch = new CountDownLatch(1);
// mock the RPCManager used in the cache
RPCManager mockRpcManager = EasyMock.createStrictMock(RPCManager.class);
injectRpcManager(mockRpcManager);
// expect basic cluster related calls
expect(mockRpcManager.getMembers()).andReturn(originalRpcManager.getMembers()).anyTimes();
expect(mockRpcManager.callRemoteMethods((Vector<Address>) anyObject(), (ReplicableCommand) anyObject(), anyBoolean(), anyLong(), anyBoolean())).andReturn(Collections.emptyList()).anyTimes();
replay(mockRpcManager);
Thread[] threads = new Thread[numThreads];
for (int i = 0; i < numThreads; i++)
{
threads[i] = new Thread()
{
public void run()
{
try
{
latch.await();
}
catch (InterruptedException e)
{
// do nothing
}
for (int j = 0; j < numLoopsPerThread; j++)
{
cache.put("/a/b/c/" + getName() + "/" + j, "k", "v");
}
}
};
threads[i].start();
}
// start the threads
latch.countDown();
// wait for threads to join
for (Thread t : threads) t.join();
// now test results
verify(mockRpcManager);
ReplicationQueueNotifier notifier = new ReplicationQueueNotifier(cache);
notifier.waitUntillAllReplicated(250);
// TestingUtil.sleepThread(250); // make sure the queue flushes
assert replQ.elements.size() == 0;
}
public void testFailure() throws InterruptedException
{
for (int i = 0; i < COUNT; i++)
{
System.out.println("on put i = " + i);
cache.put("/a/b/c" + i, "key", "value");
assertNotNull(cache.get("/a/b/c" + i, "key"));
}
ReplicationQueueNotifier notifier = new ReplicationQueueNotifier(cache);
notifier.waitUntillAllReplicated(500);
// TestingUtil.sleepThread(500);
for (int i = 0; i < COUNT; i++) assertNotNull("on get i = " + i, cache2.get("/a/b/c" + i, "key"));
}
}