/*
* JBoss, Home of Professional Open Source
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jboss.cache.interceptors;
import org.jboss.cache.CacheImpl;
import org.jboss.cache.Fqn;
import org.jboss.cache.InvocationContext;
import org.jboss.cache.Node;
import org.jboss.cache.NodeSPI;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.factories.annotations.Inject;
import org.jboss.cache.lock.IsolationLevel;
import org.jboss.cache.lock.LockingException;
import org.jboss.cache.lock.NodeLock;
import org.jboss.cache.lock.TimeoutException;
import org.jboss.cache.marshall.MethodDeclarations;
import org.jboss.cache.marshall.MethodCallFactory;
import org.jboss.cache.marshall.MethodCall;
import org.jboss.cache.transaction.GlobalTransaction;
import org.jboss.cache.transaction.TransactionEntry;
import org.jboss.cache.transaction.TransactionTable;
import org.jgroups.Address;
import javax.transaction.Transaction;
import java.util.*;
/*
* todo refactorings ideas
* - thre are many places in code that handles that coputes the lock owners: either GTX or Thread.local. The
* lockOwner can be abstractised as a LockOwner that can be extended by CurrentThreadLock owner and
GlobalTransaction owner. This would make the code nicer.
*/
/**
* An interceptor that handles locking. When a TX is associated, we register
* for TX completion and unlock the locks acquired within the scope of the TX.
* When no TX is present, we keep track of the locks acquired during the
* current method and unlock when the method returns.
*
* @author Bela Ban
* @version $Id: PessimisticLockInterceptor.java 5394 2008-03-07 02:43:26Z mircea.markus $
*/
public class PessimisticLockInterceptor extends MethodDispacherInterceptor
{
private TransactionTable tx_table;
private CacheImpl cacheImpl;
private NodeSPI rootNode;
/**
* Map<Thread, List<NodeLock>>. Keys = threads, values = lists of locks held by that thread
*/
// private ThreadLocal<List<NodeLock>> lockTable;
private long lock_acquisition_timeout;
public PessimisticLockInterceptor()
{
initLogger();
}
@Inject
public void injectDependencies(Configuration configuration, CacheImpl cacheImpl, TransactionTable txTable)
// public void injectDependencies(@ComponentName("LockTable")Map<Thread, List<NodeLock>> lockTable, Configuration configuration, CacheImpl cacheImpl, TransactionTable txTable)
{
// this.lockTable = lockTable;
lock_acquisition_timeout = configuration.getLockAcquisitionTimeout();
this.cacheImpl = cacheImpl;
this.tx_table = txTable;
}
@Override
public Object invoke(InvocationContext ctx) throws Throwable
{
if (rootNode == null) rootNode = cache.getRoot();
try
{
return super.invoke(ctx);
}
finally
{
// This is functionality from the UnlockInterceptor:
// for non-tx calls, release any locks acquired. These used to be in a separate Map<Thread, List<NodeLock>> called a lockTable,
// but that has been dropped in facour of storing the invocation-specific locks in the invocation context. Cleaner to have it all
// in one place, plus much more performant.
if (ctx.getOptionOverrides() == null || !ctx.getOptionOverrides().isSuppressLocking())
{
Transaction tx = ctx.getTransaction();
if (tx == null || !isValid(tx))
{ // no TX
List<NodeLock> locks = ctx.getInvocationLocksAcquired();
if (trace)
log.trace("Attempting to release locks on current thread. Locks for the invocation is " + locks);
if (locks != null && locks.size() > 0)
{
Thread currentThread = Thread.currentThread();
try
{
// make sure we release locks in *reverse* order!
for (int i = locks.size() - 1; i > -1; i--)
{
NodeLock nl = locks.get(i);
if (trace) log.trace("releasing lock for " + nl.getFqn() + ": " + nl);
nl.release(currentThread);
}
}
finally
{
ctx.clearInvocationLocksAcquired();
}
}
}
}
}
}
protected Object handlePutDataMethod(InvocationContext ctx, GlobalTransaction tx, Fqn fqn, Map data, boolean createUndoOps) throws Throwable
{
return handlePutMethod(ctx, fqn);
}
protected Object handlePutDataEraseMethod(InvocationContext ctx, GlobalTransaction gt, Fqn fqn, Map newData, boolean createUndoOps, boolean eraseContents) throws Throwable
{
return handlePutMethod(ctx, fqn);
}
protected Object handlePutKeyValueMethod(InvocationContext ctx, GlobalTransaction gtx, Fqn fqn, Object key, Object value, boolean createUndoOps) throws Throwable
{
return handlePutMethod(ctx, fqn);
}
private Object handlePutMethod(InvocationContext ctx, Fqn fqn)
throws Throwable
{
if ((ctx.getOptionOverrides() != null && ctx.getOptionOverrides().isSuppressLocking()) || configuration.getIsolationLevel() == IsolationLevel.NONE)
{
if (trace) log.trace("Suppressing locking, creating nodes if necessary");
int treeNodeSize = fqn.size();
NodeSPI n = rootNode;
for (int i = 0; i < treeNodeSize; i++)
{
Object childName = fqn.get(i);
Fqn childFqn = new Fqn(childName);
NodeSPI child_node = n.getChildDirect(childFqn);
if (child_node == null) child_node = n.addChildDirect(childFqn);
manageReverseRemove(ctx.getGlobalTransaction(), child_node, true, null);
n = child_node;
}
}
else
{
acquireLocksWithTimeout(ctx, fqn, NodeLock.LockType.WRITE, true, false, false, true, null, false);
}
return nextInterceptor(ctx);
}
protected boolean skipMethodCall(InvocationContext ctx)
{
return (ctx.getOptionOverrides() != null && ctx.getOptionOverrides().isSuppressLocking() && !MethodDeclarations.isPutMethod(ctx.getMethodCall().getMethodId()));
}
protected Object handleLockMethod(InvocationContext ctx, Fqn fqn, NodeLock.LockType lockType, boolean recursive) throws Throwable
{
acquireLocksWithTimeout(ctx, fqn, lockType, false, false, false, false, null, false);
if (recursive)
{
//acquireLocksOnChildren(cache.peek(fqn, false), lockType, ctx);
acquireLocksOnChildren(peekNode(ctx, fqn, false, false, false), lockType, ctx);
}
return null;
}
protected Object handlePrepareMethod(InvocationContext ctx, GlobalTransaction gtx, List modification, Address coordinator, boolean onePhaseCommit) throws Throwable
{
// 2-phase commit prepares are no-ops here.
if (!onePhaseCommit) return nextInterceptor(ctx);
// commit propagated up from the tx interceptor
commit(ctx.getGlobalTransaction());
Object retVal = nextInterceptor(ctx);
tx_table.cleanup(ctx.getGlobalTransaction());
return retVal;
}
protected Object handleOptimisticPrepareMethod(InvocationContext ctx, GlobalTransaction gtx, List modifications, Map data, Address address, boolean onePhaseCommit) throws Throwable
{
throw new UnsupportedOperationException("Optimistic prepare methods should never be received by the pessimistic lock interceptor!!");
}
protected Object handleCommitMethod(InvocationContext ctx, GlobalTransaction globalTransaction) throws Throwable
{
commit(globalTransaction);
if (trace) log.trace("bypassed locking as method commit() doesn't require locking");
Object retVal = nextInterceptor(ctx);
tx_table.cleanup(globalTransaction);
return retVal;
}
protected Object handleRollbackMethod(InvocationContext ctx, GlobalTransaction globalTransaction) throws Throwable
{
TransactionEntry entry = tx_table.get(globalTransaction);
if (trace)
{
log.trace("called to rollback cache with GlobalTransaction=" + globalTransaction);
}
if (entry == null)
{
log.error("entry for transaction " + globalTransaction + " not found (transaction has possibly already been rolled back)");
}
else
{
Iterator removedNodes = entry.getRemovedNodes().iterator();
while (removedNodes.hasNext())
{
Fqn f = (Fqn) removedNodes.next();
cacheImpl.realRemove(f, false);
}
// 1. Revert the modifications by running the undo-op list in reverse. This *cannot* throw any exceptions !
entry.undoOperations(cache);
}
if (trace)
{
log.trace("bypassed locking as method rollback() doesn't require locking");
}
Object retVal = nextInterceptor(ctx);
tx_table.cleanup(globalTransaction);
return retVal;
}
protected Object handleMoveMethod(InvocationContext ctx, Fqn from, Fqn to) throws Throwable
{
long timeout = ctx.getContextLockAcquisitionTimeout(lock_acquisition_timeout);
// this call will ensure the node gets a WL and it's current parent gets RL.
if (trace) log.trace("Attempting to get WL on node to be moved [" + from + "]");
if (from != null && !(configuration.getIsolationLevel() == IsolationLevel.NONE))
{
lock(ctx, from, NodeLock.LockType.WRITE, false, timeout, true, false, null, false);
if (ctx.getGlobalTransaction() != null)
{
cache.getTransactionTable().get(ctx.getGlobalTransaction()).addRemovedNode(from);
}
acquireLocksOnChildren(peekNode(ctx, from, false, true, false), NodeLock.LockType.WRITE, ctx);
}
if (to != null && !(configuration.getIsolationLevel() == IsolationLevel.NONE))
{
//now for an RL for the new parent.
if (trace) log.trace("Attempting to get RL on new parent [" + to + "]");
lock(ctx, to, NodeLock.LockType.READ, false, timeout, false, false, null, false);
acquireLocksOnChildren(peekNode(ctx, to, false, true, false), NodeLock.LockType.READ, ctx);
}
Object retValue = nextInterceptor(ctx);
// do a REAL remove here.
NodeSPI n = peekNode(ctx, from, false, true, false);
if (n != null)
{
n.getLock().releaseAll(Thread.currentThread());
}
return retValue;
}
protected Object handleRemoveNodeMethod(InvocationContext ctx, GlobalTransaction tx, Fqn fqn, boolean createUndoOps) throws Throwable
{
// need to make a note of ALL nodes created here!!
List<NodeSPI> createdNodes = new LinkedList<NodeSPI>();
// we need to mark new nodes created as deleted since they are only created to form a path to the node being removed, to
// create a lock.
boolean created = acquireLocksWithTimeout(ctx, fqn, NodeLock.LockType.WRITE, true, false, true, true, createdNodes, true);
TransactionEntry entry = null;
if (ctx.getGlobalTransaction() != null)
{
entry = tx_table.get(ctx.getGlobalTransaction());
entry.addRemovedNode(fqn);
for (NodeSPI nodeSPI : createdNodes)
{
entry.addRemovedNode(nodeSPI.getFqn());
nodeSPI.markAsDeleted(true);
}
}
acquireLocksOnChildren(peekNode(ctx, fqn, false, false, false), NodeLock.LockType.WRITE, ctx, entry, true);
if (!createdNodes.isEmpty())
{
if (trace) log.trace("There were new nodes created, skipping notification on delete");
Object[] args = ctx.getMethodCall().getArgs();
if (trace)
log.trace("Changing 'skipNotification' for method '_remove' from " + args[args.length - 1] + " to true");
args[args.length - 1] = Boolean.TRUE;
}
Object retVal = nextInterceptor(ctx);
// and make sure we remove all nodes we've created for the sake of later removal.
if (ctx.getGlobalTransaction() == null)
{
for (NodeSPI nodeSPI : createdNodes) cacheImpl.realRemove(nodeSPI.getFqn(), true);
cacheImpl.realRemove(fqn, true);
NodeSPI n = peekNode(ctx, fqn, false, true, false);
if (n != null)
{
n.getLock().releaseAll(Thread.currentThread());
}
}
// if this is a delete op and we had to create the node, return a FALSE as nothing *really* was deleted!
return created ? false : retVal;
}
protected Object handlePutForExternalReadMethod(InvocationContext ctx, GlobalTransaction tx, Fqn fqn, Object key, Object value) throws Throwable
{
acquireLocksWithTimeout(ctx, fqn, NodeLock.LockType.READ, true, true, false, true, null, false);
return nextInterceptor(ctx);
}
protected Object handleRemoveKeyMethod(InvocationContext ctx, GlobalTransaction tx, Fqn fqn, Object key, boolean createUndoOps) throws Throwable
{
return handleRemoveDataMethod(ctx, tx, fqn, createUndoOps);
}
protected Object handleRemoveDataMethod(InvocationContext ctx, GlobalTransaction tx, Fqn fqn, boolean createUndoOps) throws Throwable
{
acquireLocksWithTimeout(ctx, fqn, NodeLock.LockType.WRITE, false, false, false, false, null, false);
return nextInterceptor(ctx);
}
protected Object handleAddChildMethod(InvocationContext ctx, GlobalTransaction tx, Fqn parentFqn, Object childName, Node cn, boolean createUndoOps) throws Throwable
{
acquireLocksWithTimeout(ctx, parentFqn, NodeLock.LockType.READ, false, false, false, false, null, false);
return nextInterceptor(ctx);
}
protected Object handleEvictMethod(InvocationContext ctx, Fqn fqn) throws Throwable
{
acquireLocksWithTimeout(ctx, fqn, NodeLock.LockType.WRITE, false, true, false, false, null, false);
return nextInterceptor(ctx);
}
protected Object handleGetKeyValueMethod(InvocationContext ctx, Fqn fqn, Object key, boolean sendNodeEvent) throws Throwable
{
acquireLocksWithTimeout(ctx, fqn, NodeLock.LockType.READ, false, false, false, false, null, false);
return nextInterceptor(ctx);
}
protected Object handleGetNodeMethod(InvocationContext ctx, Fqn fqn) throws Throwable
{
acquireLocksWithTimeout(ctx, fqn, NodeLock.LockType.READ, false, false, false, false, null, false);
return nextInterceptor(ctx);
}
protected Object handleGetKeysMethod(InvocationContext ctx, Fqn fqn) throws Throwable
{
acquireLocksWithTimeout(ctx, fqn, NodeLock.LockType.READ, false, false, false, false, null, false);
return nextInterceptor(ctx);
}
protected Object handleGetChildrenNamesMethod(InvocationContext ctx, Fqn fqn) throws Throwable
{
acquireLocksWithTimeout(ctx, fqn, NodeLock.LockType.READ, false, false, false, false, null, false);
return nextInterceptor(ctx);
}
protected Object handlePrintMethod(InvocationContext ctx, Fqn fqn) throws Throwable
{
acquireLocksWithTimeout(ctx, fqn, NodeLock.LockType.READ, false, false, false, false, null, false);
return nextInterceptor(ctx);
}
protected Object handleReleaseAllLocksMethod(InvocationContext ctx, Fqn fqn) throws Throwable
{
acquireLocksWithTimeout(ctx, fqn, NodeLock.LockType.READ, false, false, false, false, null, false);
return nextInterceptor(ctx);
}
private boolean acquireLocksWithTimeout(InvocationContext ctx, Fqn fqn, NodeLock.LockType lockType,
boolean createIfNotExists, boolean zeroLockTimeout,
boolean acquireLockOnParent, boolean reverseRemoveCheck, List<NodeSPI> createdNodes, boolean skipNotification)
throws InterruptedException
{
if (fqn == null || configuration.getIsolationLevel() == IsolationLevel.NONE) return false;
boolean created;
long timeout = zeroLockTimeout ? 0 : ctx.getContextLockAcquisitionTimeout(lock_acquisition_timeout);
// make sure we can bail out of this loop
long cutoffTime = System.currentTimeMillis() + timeout;
boolean firstTry = true;
do
{
// this is an additional check to make sure we don't try for too long.
if (!firstTry && System.currentTimeMillis() > cutoffTime)
{
throw new TimeoutException("Unable to acquire lock on Fqn " + fqn + " after " + timeout + " millis");
}
created = lock(ctx, fqn, lockType, createIfNotExists, timeout, acquireLockOnParent, reverseRemoveCheck, createdNodes, skipNotification);
firstTry = false;
}
while (createIfNotExists && (peekNode(ctx, fqn, false, false, false) == null));// keep trying until we have the lock (fixes concurrent remove())
return created;
}
/**
* Acquires locks on the node and on its parrents. Read locks are acquired for exsiting ancestors, with two exceptions:
* 1) createIfNotExists is true. If an ancestor is created on the fly, then an WL is acquired by default
* 2) acquireWriteLockOnParent is true. If so AND {@link org.jboss.cache.Node#isLockForChildInsertRemove()} then a read
* lock will be aquired for the parent of the node.
*
* @param createIfNotExists if true, then missing nodes will be cretaed on the fly. If false, method returns if we
* reach a node that does not exists
* @param reverseRemoveCheck see {@link #manageReverseRemove(org.jboss.cache.transaction.GlobalTransaction, org.jboss.cache.NodeSPI, boolean)}
* @param createdNodes a list to which any nodes created can register their Fqns so that calling code is aware of which nodes have been newly created.
* @param skipNotification
*/
private boolean lock(InvocationContext ctx, Fqn fqn, NodeLock.LockType lockType, boolean createIfNotExists, long timeout,
boolean acquireWriteLockOnParent, boolean reverseRemoveCheck, List<NodeSPI> createdNodes, boolean skipNotification)
throws TimeoutException, LockingException, InterruptedException
{
Thread currentThread = Thread.currentThread();
GlobalTransaction gtx = ctx.getGlobalTransaction();
boolean created = false;
// if the tx associated with the current thread is rolling back, barf! JBCACHE-923
if (gtx != null)
{
assertTransactionValid(ctx);
}
Object owner = (gtx != null) ? gtx : currentThread;
NodeSPI currentNode;
if (trace) log.trace("Attempting to lock node " + fqn + " for owner " + owner);
long expiryTime = System.currentTimeMillis() + timeout;
currentNode = rootNode;
NodeSPI parent = null;
Object childName = null;
int currentIndex = -1;
int targetFqnSize = fqn.size();
do
{
if (currentNode == null)
{
if (createIfNotExists)
{
// if the new node is to be marked as deleted, do not notify!
currentNode = parent.addChildDirect(childName, !skipNotification);
created = true;
if (trace) log.trace("Child node was null, so created child node " + childName);
if (createdNodes != null) createdNodes.add(currentNode);
}
else
{
if (trace)
log.trace("failed to find or create child " + childName + " of node " + currentNode);
return false;
}
}
else
{
if (!currentNode.isValid() && createIfNotExists) currentNode.setValid(true, false);
}
NodeLock.LockType lockTypeRequired = NodeLock.LockType.READ;
if (created || writeLockNeeded(ctx, lockType, currentIndex, acquireWriteLockOnParent, createIfNotExists, fqn, currentNode))
{
lockTypeRequired = NodeLock.LockType.WRITE;
}
Fqn currentNodeFqn = currentNode.getFqn();
// actually acquire the lock we need. This method blocks.
acquireNodeLock(ctx, currentNode, owner, gtx, lockTypeRequired, timeout);
manageReverseRemove(gtx, currentNode, reverseRemoveCheck, createdNodes);
// make sure the lock we acquired isn't on a deleted node/is an orphan!!
// look into invalidated nodes as well
NodeSPI repeek = peekNode(ctx, currentNodeFqn, true, true, true);
if (currentNode != repeek)
{
if (trace)
log.trace("Was waiting for and obtained a lock on a node that doesn't exist anymore! Attempting lock acquisition again.");
// we have an orphan!! Lose the unnecessary lock and re-acquire the lock (and potentially recreate the node).
// check if the parent exists!!
// look into invalidated nodes as well
currentNode.getLock().releaseAll(owner);
if (parent == null || peekNode(ctx, parent.getFqn(), true, true, true) == null)
{
// crap!
if (trace) log.trace("Parent has been deleted again. Go through the lock method all over again.");
currentNode = rootNode;
currentIndex = -1;
parent = null;
}
else
{
currentNode = parent;
currentIndex--;
parent = null;
if (System.currentTimeMillis() > expiryTime)
{
throw new TimeoutException("Unable to acquire lock on child node " + new Fqn(currentNode.getFqn(), childName) + " after " + timeout + " millis.");
}
if (trace) log.trace("Moving one level up, current node is :" + currentNode);
}
}
else
{
// we have succeeded in acquiring this lock. Increment the current index since we have gained one level of depth in the tree.
currentIndex++;
// now test if this is the final level and if we can quit the loop:
//if (currentNodeFqn.equals(fqn))//we've just processed the last child
if (currentIndex == targetFqnSize)
{
break;
}
if (!fqn.isChildOrEquals(currentNode.getFqn())) // Does this ever happen? Perhaps with a move(), I suppose? - MS
{
String message = new StringBuffer("currentNode instance changed the FQN(").append(currentNode.getFqn())
.append(") and do not match the FQN on which we want to acquire lock(").append(fqn).append(")").toString();
log.trace(message);
throw new LockingException(message);
}
parent = currentNode;
childName = fqn.get(currentIndex);
currentNode = currentNode.getChildDirect(childName);
}
} while (true);
return created;
}
private void acquireLocksOnChildren(NodeSPI parentNode, NodeLock.LockType lockType, InvocationContext ctx) throws InterruptedException
{
acquireLocksOnChildren(parentNode, lockType, ctx, null, false);
}
/**
* Acquires nodes on the children of this node. nodes on the node itself are not aquired.
* If the supplied parent node is null the method returns(no op).
*/
private void acquireLocksOnChildren(NodeSPI parentNode, NodeLock.LockType lockType, InvocationContext ctx, TransactionEntry entry, boolean addChildrenToDeletedList)
throws InterruptedException
{
if (parentNode == null)
{
return;
}
long timeout = ctx.getContextLockAcquisitionTimeout(lock_acquisition_timeout);
GlobalTransaction gtx = ctx.getGlobalTransaction();
Object owner = (gtx != null) ? gtx : Thread.currentThread();
Set<NodeLock> acquiredLocks = parentNode.getLock().acquireAll(owner, timeout, lockType);
if (acquiredLocks.size() > 0)
{
if (gtx != null)
{
cache.getTransactionTable().addLocks(gtx, acquiredLocks);
if (addChildrenToDeletedList)
{
for (NodeLock l : acquiredLocks)
{
entry.addRemovedNode(l.getFqn());
}
}
}
else
{
ctx.addInvocationLocksAcquired(acquiredLocks);
}
}
}
/**
* Used by lock()
* Determins whter an arbitrary node from the supplied fqn needs an write lock.
*/
private boolean writeLockNeeded(InvocationContext ctx, NodeLock.LockType lockType, int currentNodeIndex, boolean acquireWriteLockOnParent, boolean createIfNotExists, Fqn targetFqn, NodeSPI currentNode)
{
int treeNodeSize = targetFqn.size();
// write lock forced!!
boolean isTargetNode = currentNodeIndex == (treeNodeSize - 1);
if (isTargetNode && ctx.getOptionOverrides().isForceWriteLock()) return true;
//this can be injected, from the caller as a param named wlParent
if (currentNode.isLockForChildInsertRemove())
{
if (acquireWriteLockOnParent && currentNodeIndex == treeNodeSize - 2)
{
return true;// we're doing a remove and we've reached the PARENT node of the target to be removed.
}
if (!isTargetNode && peekNode(ctx, targetFqn.getAncestor(currentNodeIndex + 2), false, false, false) == null)
//if (!isTargetNode && cache.peek(targetFqn.getAncestor(currentNodeIndex + 2), false) == null)
//if (!isTargetNode && cache.peek(new Fqn(currentNode.getFqn(), targetFqn.get(currentNodeIndex + 1)), false) == null)
{
return createIfNotExists;// we're at a node in the tree, not yet at the target node, and we need to create the next node. So we need a WL here.
}
}
return lockType == NodeLock.LockType.WRITE && isTargetNode;//write lock explicitly requested and this is the target to be written to.
}
private void acquireNodeLock(InvocationContext ctx, NodeSPI node, Object owner, GlobalTransaction gtx, NodeLock.LockType lockType, long lockTimeout) throws LockingException, TimeoutException, InterruptedException
{
NodeLock lock = node.getLock();
boolean acquired = lock.acquire(owner, lockTimeout, lockType);
if (acquired)
{
// Record the lock for release on method return or tx commit/rollback
if (gtx != null)
{
cache.getTransactionTable().recordNodeLock(gtx, lock);
}
else
{
ctx.addInvocationLockAcquired(lock);
}
}
}
/**
* Test if this node needs to be 'undeleted'
* reverse the "remove" if the node has been previously removed in the same tx, if this operation is a put()
*/
private void manageReverseRemove(GlobalTransaction gtx, NodeSPI childNode, boolean reverseRemoveCheck, List createdNodes)
{
if (gtx != null) //if no tx then reverse remove does not make sense
{
Fqn fqn = childNode.getFqn();
boolean needToReverseRemove = reverseRemoveCheck && childNode.isDeleted() && tx_table.isNodeRemovedInTx(gtx, fqn);
if (!needToReverseRemove) return;
childNode.markAsDeleted(false);
//if we'll rollback the tx data should be added to the node again
Map oldData = new HashMap(childNode.getDataDirect());
MethodCall undoOp = MethodCallFactory.create(MethodDeclarations.putDataMethodLocal_id,
gtx, fqn, oldData , false);
tx_table.get(gtx).addUndoOperation(undoOp);
//we're prepared for rollback, now reset the node
childNode.clearDataDirect();
if (createdNodes != null)
{
createdNodes.add(childNode);
}
}
}
/**
* Remove all locks held by <tt>tx</tt>, remove the transaction from the transaction table
*/
private void commit(GlobalTransaction gtx)
{
if (trace) log.trace("committing cache with gtx " + gtx);
TransactionEntry entry = tx_table.get(gtx);
if (entry == null)
{
log.error("entry for transaction " + gtx + " not found (maybe already committed)");
return;
}
// first remove nodes that should be deleted.
Iterator removedNodes = entry.getRemovedNodes().iterator();
while (removedNodes.hasNext())
{
Fqn f = (Fqn) removedNodes.next();
cacheImpl.realRemove(f, false);
}
}
}