/*
* Created on 17-Feb-2005
*
*
*
*/
package org.jboss.cache.optimistic;
import org.jboss.cache.CacheSPI;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.loader.SamplePojo;
import org.jboss.cache.marshall.MethodCall;
import org.jboss.cache.marshall.MethodCallFactory;
import org.jboss.cache.marshall.MethodDeclarations;
import org.jboss.cache.misc.TestingUtil;
import org.jboss.cache.transaction.DummyTransactionManager;
import org.jboss.cache.transaction.GlobalTransaction;
import org.jboss.cache.transaction.OptimisticTransactionEntry;
import org.jboss.cache.transaction.TransactionTable;
import org.jgroups.Address;
import static org.testng.AssertJUnit.*;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import javax.transaction.RollbackException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.List;
/**
* @author xenephon
*/
@SuppressWarnings("unchecked")
@Test(groups = "functional")
public class OptimisticReplicationInterceptorTest extends AbstractOptimisticTestCase
{
private CacheSPI cache;
@BeforeMethod(alwaysRun = true)
public void setUp() throws Exception
{
cache = createCache();
}
@AfterMethod(alwaysRun = true)
public void tearDown()
{
TestingUtil.killCaches(cache);
}
public void testLocalTransaction() throws Exception
{
MockInterceptor dummy = new MockInterceptor();
setAlteredInterceptorChain(dummy, cache);
DummyTransactionManager mgr = DummyTransactionManager.getInstance();
assertNull(mgr.getTransaction());
mgr.begin();
assertEquals(0, cache.getTransactionTable().getNumGlobalTransactions());
assertEquals(0, cache.getTransactionTable().getNumLocalTransactions());
SamplePojo pojo = new SamplePojo(21, "test");
cache.put("/one/two", "key1", pojo);
mgr.commit();
assertNull(mgr.getTransaction());
assertEquals(0, cache.getTransactionTable().getNumGlobalTransactions());
assertEquals(0, cache.getTransactionTable().getNumLocalTransactions());
//make sure all calls were done in right order
List calls = dummy.getAllCalledIds();
assertEquals(MethodDeclarations.optimisticPrepareMethod_id, calls.get(0));
assertEquals(MethodDeclarations.commitMethod_id, calls.get(1));
}
public void testRollbackTransaction() throws Exception
{
MockInterceptor dummy = new MockInterceptor();
setAlteredInterceptorChain(dummy, cache);
DummyTransactionManager mgr = DummyTransactionManager.getInstance();
assertNull(mgr.getTransaction());
assertEquals(0, cache.getTransactionTable().getNumGlobalTransactions());
assertEquals(0, cache.getTransactionTable().getNumLocalTransactions());
SamplePojo pojo = new SamplePojo(21, "test");
mgr.begin();
cache.put("/one/two", "key1", pojo);
mgr.rollback();
assertNull(mgr.getTransaction());
assertEquals(0, cache.getTransactionTable().getNumGlobalTransactions());
assertEquals(0, cache.getTransactionTable().getNumLocalTransactions());
//make sure all calls were done in right order
List calls = dummy.getAllCalledIds();
assertEquals(1, calls.size());
assertEquals(MethodDeclarations.rollbackMethod_id, calls.get(0));
}
public void testRemotePrepareTransaction() throws Exception
{
MockInterceptor dummy = new MockInterceptor();
setAlteredInterceptorChain(dummy, cache);
DummyTransactionManager mgr = DummyTransactionManager.getInstance();
//start local transaction
mgr.begin();
Transaction tx = mgr.getTransaction();
//this sets
cache.getCurrentTransaction(tx, true);
SamplePojo pojo = new SamplePojo(21, "test");
cache.put("/one/two", "key1", pojo);
GlobalTransaction gtx = cache.getCurrentTransaction(tx, true);
TransactionTable table = cache.getTransactionTable();
OptimisticTransactionEntry entry = (OptimisticTransactionEntry) table.get(gtx);
assertNotNull(mgr.getTransaction());
MethodCall meth = entry.getModifications().get(0);
mgr.commit();
GlobalTransaction remoteGtx = new GlobalTransaction();
remoteGtx.setAddress(new TestAddress());
//hack the method call to make it have the remote gtx
meth.getArgs()[0] = remoteGtx;
//call our remote method
MethodCall prepareMethod = MethodCallFactory.create(MethodDeclarations.optimisticPrepareMethod_id, remoteGtx, injectDataVersion(entry.getModifications()), null, remoteGtx.getAddress(), false);
try
{
TestingUtil.getRemoteDelegate(cache)._replicate(prepareMethod);
}
catch (Throwable t)
{
fail();
}
//our thread should be null
assertNull(mgr.getTransaction());
// there should be a registration for the remote gtx
assertNotNull(table.get(remoteGtx));
assertNotNull(table.getLocalTransaction(remoteGtx));
//assert that this is populated
//assert that the remote prepare has populated the local workspace
assertEquals(3, entry.getTransactionWorkSpace().getNodes().size());
List calls = dummy.getAllCalledIds();
assertEquals(MethodDeclarations.optimisticPrepareMethod_id, calls.get(2));
assertEquals(1, cache.getTransactionTable().getNumGlobalTransactions());
assertEquals(1, cache.getTransactionTable().getNumLocalTransactions());
}
public void testRemoteRollbackTransaction() throws Exception
{
MockInterceptor dummy = new MockInterceptor();
setAlteredInterceptorChain(dummy, cache);
DummyTransactionManager mgr = DummyTransactionManager.getInstance();
//start local transaction
mgr.begin();
Transaction tx = mgr.getTransaction();
//this sets
cache.getCurrentTransaction(tx, true);
SamplePojo pojo = new SamplePojo(21, "test");
cache.put("/one/two", "key1", pojo);
GlobalTransaction gtx = cache.getCurrentTransaction(tx, true);
TransactionTable table = cache.getTransactionTable();
OptimisticTransactionEntry entry = (OptimisticTransactionEntry) table.get(gtx);
assertNotNull(mgr.getTransaction());
MethodCall meth = entry.getModifications().get(0);
mgr.commit();
GlobalTransaction remoteGtx = new GlobalTransaction();
remoteGtx.setAddress(new TestAddress());
//hack the method call to make it have the remote gtx
meth.getArgs()[0] = remoteGtx;
//call our remote method
MethodCall prepareMethod = MethodCallFactory.create(MethodDeclarations.optimisticPrepareMethod_id, remoteGtx, injectDataVersion(entry.getModifications()), null, remoteGtx.getAddress(), false);
try
{
TestingUtil.getRemoteDelegate(cache)._replicate(prepareMethod);
}
catch (Throwable t)
{
fail();
}
//our thread should be null
assertNull(mgr.getTransaction());
// there should be a registration for the remote gtx
assertNotNull(table.get(remoteGtx));
assertNotNull(table.getLocalTransaction(remoteGtx));
//assert that this is populated
assertEquals(3, entry.getTransactionWorkSpace().getNodes().size());
List calls = dummy.getAllCalledIds();
assertEquals(MethodDeclarations.optimisticPrepareMethod_id, calls.get(2));
assertEquals(1, cache.getTransactionTable().getNumGlobalTransactions());
assertEquals(1, cache.getTransactionTable().getNumLocalTransactions());
// call our remote method
MethodCall rollbackMethod = MethodCallFactory.create(MethodDeclarations.rollbackMethod_id, remoteGtx);
try
{
TestingUtil.getRemoteDelegate(cache)._replicate(rollbackMethod);
}
catch (Throwable t)
{
fail();
}
//we should have the commit as well now
assertNull(mgr.getTransaction());
assertEquals(MethodDeclarations.rollbackMethod_id, calls.get(3));
assertEquals(0, cache.getTransactionTable().getNumGlobalTransactions());
assertEquals(0, cache.getTransactionTable().getNumLocalTransactions());
}
public void testRemoteCommitNoPrepareTransaction() throws Exception
{
MockInterceptor dummy = new MockInterceptor();
setAlteredInterceptorChain(dummy, cache);
DummyTransactionManager mgr = DummyTransactionManager.getInstance();
//start local transaction
mgr.begin();
Transaction tx = mgr.getTransaction();
//this sets
cache.getCurrentTransaction(tx, true);
SamplePojo pojo = new SamplePojo(21, "test");
cache.put("/one/two", "key1", pojo);
GlobalTransaction gtx = cache.getCurrentTransaction(tx, true);
TransactionTable table = cache.getTransactionTable();
OptimisticTransactionEntry entry = (OptimisticTransactionEntry) table.get(gtx);
assertNotNull(mgr.getTransaction());
MethodCall meth = entry.getModifications().get(0);
mgr.commit();
GlobalTransaction remoteGtx = new GlobalTransaction();
remoteGtx.setAddress(new TestAddress());
//hack the method call to make it have the remote gtx
meth.getArgs()[0] = remoteGtx;
List calls = dummy.getAllCalledIds();
assertEquals(2, calls.size());
assertEquals(0, cache.getTransactionTable().getNumGlobalTransactions());
assertEquals(0, cache.getTransactionTable().getNumLocalTransactions());
// call our remote method
MethodCall commitMethod = MethodCallFactory.create(MethodDeclarations.commitMethod_id, remoteGtx);
try
{
TestingUtil.getRemoteDelegate(cache)._replicate(commitMethod);
fail();
}
catch (Throwable t)
{
assertTrue(t instanceof RuntimeException);
//t.printStackTrace();
}
//we should have the commit as well now
assertNull(mgr.getTransaction());
assertEquals(2, calls.size());
assertEquals(0, cache.getTransactionTable().getNumGlobalTransactions());
assertEquals(0, cache.getTransactionTable().getNumLocalTransactions());
}
public void testRemoteRollbackNoPrepareTransaction() throws Throwable
{
MockInterceptor dummy = new MockInterceptor();
setAlteredInterceptorChain(dummy, cache);
DummyTransactionManager mgr = DummyTransactionManager.getInstance();
//start local transaction
mgr.begin();
Transaction tx = mgr.getTransaction();
//this sets
cache.getCurrentTransaction(tx, true);
SamplePojo pojo = new SamplePojo(21, "test");
cache.put("/one/two", "key1", pojo);
GlobalTransaction gtx = cache.getCurrentTransaction(tx, true);
TransactionTable table = cache.getTransactionTable();
OptimisticTransactionEntry entry = (OptimisticTransactionEntry) table.get(gtx);
assertNotNull(mgr.getTransaction());
MethodCall meth = entry.getModifications().get(0);
mgr.commit();
GlobalTransaction remoteGtx = new GlobalTransaction();
remoteGtx.setAddress(new TestAddress());
//hack the method call to make it have the remote gtx
meth.getArgs()[0] = remoteGtx;
List calls = dummy.getAllCalledIds();
assertEquals(2, calls.size());
assertEquals(0, cache.getTransactionTable().getNumGlobalTransactions());
assertEquals(0, cache.getTransactionTable().getNumLocalTransactions());
// call our remote method
MethodCall rollbackMethod = MethodCallFactory.create(MethodDeclarations.rollbackMethod_id, remoteGtx);
TestingUtil.getRemoteDelegate(cache)._replicate(rollbackMethod);
assertTrue("Should be handled on the remote end without barfing, in the event of a rollback without a prepare", true);
//we should have the commit as well now
assertNull(mgr.getTransaction());
assertEquals(2, calls.size());
assertEquals(0, cache.getTransactionTable().getNumGlobalTransactions());
assertEquals(0, cache.getTransactionTable().getNumLocalTransactions());
}
public void testRemoteCommitTransaction() throws Exception
{
MockInterceptor dummy = new MockInterceptor();
setAlteredInterceptorChain(dummy, cache);
DummyTransactionManager mgr = DummyTransactionManager.getInstance();
//start local transaction
mgr.begin();
Transaction tx = mgr.getTransaction();
//this sets
cache.getCurrentTransaction(tx, true);
SamplePojo pojo = new SamplePojo(21, "test");
cache.put("/one/two", "key1", pojo);
GlobalTransaction gtx = cache.getCurrentTransaction(tx, true);
TransactionTable table = cache.getTransactionTable();
OptimisticTransactionEntry entry = (OptimisticTransactionEntry) table.get(gtx);
assertNotNull(mgr.getTransaction());
MethodCall meth = entry.getModifications().get(0);
mgr.commit();
GlobalTransaction remoteGtx = new GlobalTransaction();
remoteGtx.setAddress(new TestAddress());
//hack the method call to make it have the remote gtx
meth.getArgs()[0] = remoteGtx;
//call our remote method
MethodCall prepareMethod = MethodCallFactory.create(MethodDeclarations.optimisticPrepareMethod_id, remoteGtx, injectDataVersion(entry.getModifications()), null, remoteGtx.getAddress(), false);
try
{
TestingUtil.getRemoteDelegate(cache)._replicate(prepareMethod);
}
catch (Throwable t)
{
fail();
}
//our thread should be null
assertNull(mgr.getTransaction());
// there should be a registration for the remote gtx
assertNotNull(table.get(remoteGtx));
assertNotNull(table.getLocalTransaction(remoteGtx));
//assert that this is populated
//assert that the remote prepare has populated the local workspace
assertEquals(3, entry.getTransactionWorkSpace().getNodes().size());
List calls = dummy.getAllCalledIds();
assertEquals(MethodDeclarations.optimisticPrepareMethod_id, calls.get(2));
assertEquals(1, cache.getTransactionTable().getNumGlobalTransactions());
assertEquals(1, cache.getTransactionTable().getNumLocalTransactions());
// call our remote method
MethodCall commitMethod = MethodCallFactory.create(MethodDeclarations.commitMethod_id, remoteGtx);
try
{
TestingUtil.getRemoteDelegate(cache)._replicate(commitMethod);
}
catch (Throwable t)
{
fail();
}
//we should have the commit as well now
assertNull(mgr.getTransaction());
assertEquals(MethodDeclarations.commitMethod_id, calls.get(3));
assertEquals(0, cache.getTransactionTable().getNumGlobalTransactions());
assertEquals(0, cache.getTransactionTable().getNumLocalTransactions());
}
public void testTwoWayRemoteCacheBroadcast() throws Exception
{
destroyCache(cache);
cache = createReplicatedCache(Configuration.CacheMode.REPL_SYNC);
MockInterceptor dummy = new MockInterceptor();
setAlteredInterceptorChain(dummy, cache);
CacheSPI cache2 = createReplicatedCache(Configuration.CacheMode.REPL_SYNC);
MockInterceptor dummy2 = new MockInterceptor();
setAlteredInterceptorChain(dummy2, cache2);
TransactionManager mgr = cache.getTransactionManager();
//start local transaction
mgr.begin();
Transaction tx = mgr.getTransaction();
//this sets
cache.getCurrentTransaction(tx, true);
SamplePojo pojo = new SamplePojo(21, "test");
cache.put("/one/two", "key1", pojo);
assertNotNull(mgr.getTransaction());
mgr.commit();
assertNull(mgr.getTransaction());
//assert that the local cache is in the right state
assertEquals(0, cache.getTransactionTable().getNumGlobalTransactions());
assertEquals(0, cache.getTransactionTable().getNumLocalTransactions());
assertEquals(0, cache2.getTransactionTable().getNumGlobalTransactions());
assertEquals(0, cache2.getTransactionTable().getNumLocalTransactions());
List calls = dummy.getAllCalledIds();
assertEquals(MethodDeclarations.optimisticPrepareMethod_id, calls.get(0));
assertEquals(MethodDeclarations.commitMethod_id, calls.get(1));
List calls2 = dummy2.getAllCalledIds();
assertEquals(MethodDeclarations.optimisticPrepareMethod_id, calls2.get(0));
assertEquals(MethodDeclarations.commitMethod_id, calls2.get(1));
destroyCache(cache2);
}
public void testFailurePrepareRemoteCacheBroadcast() throws Exception
{
destroyCache(cache);
cache = createReplicatedCache(Configuration.CacheMode.REPL_SYNC);
MockInterceptor dummy = new MockInterceptor();
setAlteredInterceptorChain(dummy, cache);
CacheSPI cache2 = createReplicatedCache(Configuration.CacheMode.REPL_SYNC);
MockFailureInterceptor dummy2 = new MockFailureInterceptor();
List failures = new ArrayList();
failures.add(MethodDeclarations.optimisticPrepareMethod);
dummy2.setFailurelist(failures);
setAlteredInterceptorChain(dummy2, cache2);
DummyTransactionManager mgr = DummyTransactionManager.getInstance();
//start local transaction
mgr.begin();
Transaction tx = mgr.getTransaction();
//this sets
cache.getCurrentTransaction(tx, true);
SamplePojo pojo = new SamplePojo(21, "test");
cache.put("/one/two", "key1", pojo);
assertNotNull(mgr.getTransaction());
try
{
mgr.commit();
}
catch (Exception e)
{
assertTrue(e instanceof RollbackException);
}
assertNull(mgr.getTransaction());
//assert that the local cache is in the right state
assertEquals(0, cache.getTransactionTable().getNumGlobalTransactions());
assertEquals(0, cache.getTransactionTable().getNumLocalTransactions());
assertEquals(0, cache2.getTransactionTable().getNumGlobalTransactions());
assertEquals(0, cache2.getTransactionTable().getNumLocalTransactions());
List calls = dummy.getAllCalledIds();
assertEquals(MethodDeclarations.optimisticPrepareMethod_id, calls.get(0));
assertEquals(MethodDeclarations.rollbackMethod_id, calls.get(1));
//we have no prepare - as it failed - but we have a commit
List calls2 = dummy2.getAllCalledIds();
assertEquals(MethodDeclarations.rollbackMethod_id, calls2.get(0));
destroyCache(cache2);
}
public void testFailurePrepareLocalCacheBroadcast() throws Exception
{
destroyCache(cache);
cache = createReplicatedCache(Configuration.CacheMode.REPL_SYNC);
MockFailureInterceptor dummy = new MockFailureInterceptor();
setAlteredInterceptorChain(dummy, cache);
CacheSPI cache2 = createReplicatedCache(Configuration.CacheMode.REPL_SYNC);
MockInterceptor dummy2 = new MockInterceptor();
setAlteredInterceptorChain(dummy, cache);
List failures = new ArrayList();
failures.add(MethodDeclarations.optimisticPrepareMethod);
dummy.setFailurelist(failures);
DummyTransactionManager mgr = DummyTransactionManager.getInstance();
//start local transaction
mgr.begin();
Transaction tx = mgr.getTransaction();
//this sets
cache.getCurrentTransaction(tx, true);
SamplePojo pojo = new SamplePojo(21, "test");
cache.put("/one/two", "key1", pojo);
assertNotNull(mgr.getTransaction());
try
{
mgr.commit();
}
catch (Exception e)
{
assertTrue(e instanceof RollbackException);
}
assertNull(mgr.getTransaction());
//assert that the local cache is in the right state
assertEquals(0, cache.getTransactionTable().getNumGlobalTransactions());
assertEquals(0, cache.getTransactionTable().getNumLocalTransactions());
assertEquals(0, cache2.getTransactionTable().getNumGlobalTransactions());
assertEquals(0, cache2.getTransactionTable().getNumLocalTransactions());
List calls = dummy.getAllCalledIds();
assertEquals(MethodDeclarations.rollbackMethod_id, calls.get(0));
//we have no prepare - as it failed - but we have a commit
List calls2 = dummy2.getAllCalledIds();
assertEquals(0, calls2.size());
destroyCache(cache2);
}
static class TestAddress implements Address
{
private static final long serialVersionUID = -8525272532201600656L;
public boolean isMulticastAddress()
{
return false;
}
public void readExternal(ObjectInput arg0)
{
}
public int size()
{
return 0;
}
public void writeExternal(ObjectOutput arg0)
{
}
public void writeTo(DataOutputStream arg0)
{
}
public void readFrom(DataInputStream arg0)
{
}
public int compareTo(Object arg0)
{
return 0;
}
}
}