/*
* JBoss, the OpenSource J2EE webOS
*
* Distributable under LGPL license.
* See terms of license at gnu.org.
*/
package org.jboss.cache.statetransfer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.CacheException;
import org.jboss.cache.CacheSPI;
import org.jboss.cache.Fqn;
import org.jboss.cache.InvocationContext;
import org.jboss.cache.Node;
import org.jboss.cache.NodeFactory;
import org.jboss.cache.NodeSPI;
import org.jboss.cache.Region;
import org.jboss.cache.buddyreplication.BuddyManager;
import org.jboss.cache.eviction.EvictedEventNode;
import org.jboss.cache.eviction.NodeEventType;
import org.jboss.cache.loader.CacheLoader;
import org.jboss.cache.loader.CacheLoaderManager;
import org.jboss.cache.marshall.NodeData;
import org.jboss.cache.marshall.NodeDataExceptionMarker;
import org.jboss.cache.marshall.NodeDataMarker;
import org.jboss.cache.notifications.event.NodeModifiedEvent;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
public class DefaultStateTransferIntegrator implements StateTransferIntegrator
{
protected Log log = LogFactory.getLog(getClass().getName());
private CacheSPI cache;
private Fqn targetFqn;
private NodeFactory factory;
private NodeFactory.NodeType nodeType;
private Set<Fqn> internalFqns;
public DefaultStateTransferIntegrator(Fqn targetFqn, CacheSPI cache)
{
this.targetFqn = targetFqn;
this.cache = cache;
this.factory = cache.getConfiguration().getRuntimeConfig().getNodeFactory();
this.nodeType = cache.getConfiguration().isNodeLockingOptimistic()
? NodeFactory.NodeType.VERSIONED_NODE
: NodeFactory.NodeType.UNVERSIONED_NODE;
this.internalFqns = cache.getInternalFqns();
}
public void integrateState(ObjectInputStream ois, Node target) throws Exception
{
integrateTransientState(ois, (NodeSPI) target);
integrateAssociatedState(ois);
integratePersistentState(ois);
}
protected void integrateTransientState(ObjectInputStream in, NodeSPI target) throws Exception
{
boolean transientSet = false;
// ClassLoader oldCL = setClassLoader(cl);
try
{
if (log.isTraceEnabled())
{
log.trace("integrating transient state for " + target);
}
integrateTransientState(target, in);
transientSet = true;
if (log.isTraceEnabled())
{
log.trace("transient state successfully integrated");
}
notifyAllNodesCreated(cache.getInvocationContext(), target);
}
catch (Exception e)
{
throw new CacheException(e);
}
finally
{
if (!transientSet)
{
target.clearDataDirect();
target.removeChildrenDirect();
}
// resetClassLoader(oldCL);
}
}
/**
* Provided for subclasses that deal with associated state.
*
* @throws Exception
*/
protected void integrateAssociatedState(ObjectInputStream in) throws Exception
{
// no-op in this base class
// just read marker
cache.getMarshaller().objectFromObjectStream(in);
}
protected void integratePersistentState(ObjectInputStream in) throws Exception
{
CacheLoaderManager loaderManager = cache.getCacheLoaderManager();
CacheLoader loader = loaderManager == null ? null : loaderManager.getCacheLoader();
if (loader == null)
{
if (log.isTraceEnabled())
{
log.trace("cache loader is null, will not attempt to integrate persistent state");
}
}
else
{
if (log.isTraceEnabled())
{
log.trace("integrating persistent state using " + loader.getClass().getName());
}
boolean persistentSet = false;
try
{
if (targetFqn.isRoot())
{
loader.storeEntireState(in);
}
else
{
loader.storeState(targetFqn, in);
}
persistentSet = true;
}
catch (ClassCastException cce)
{
log.error("Failed integrating persistent state. One of cacheloaders is not"
+ " adhering to state stream format. See JBCACHE-738.");
throw cce;
}
finally
{
if (!persistentSet)
{
log.warn("persistent state integration failed, removing all nodes from loader");
loader.remove(targetFqn);
}
else
{
if (log.isTraceEnabled())
{
log.trace("persistent state integrated successfully");
}
}
}
}
}
protected NodeFactory getFactory()
{
return factory;
}
protected NodeFactory.NodeType getNodeType()
{
return nodeType;
}
protected Fqn getTargetFqn()
{
return targetFqn;
}
/**
* Generates NodeAdded notifications for all nodes of the tree. This is
* called whenever the tree is initially retrieved (state transfer)
*/
private void notifyAllNodesCreated(InvocationContext ctx, NodeSPI curr)
{
if (curr == null) return;
ctx.setOriginLocal(false);
cache.getNotifier().notifyNodeCreated(curr.getFqn(), true, ctx);
cache.getNotifier().notifyNodeCreated(curr.getFqn(), false, ctx);
// AND notify that they have been modified!!
if (!curr.getKeysDirect().isEmpty())
{
cache.getNotifier().notifyNodeModified(curr.getFqn(), true, NodeModifiedEvent.ModificationType.PUT_MAP, Collections.emptyMap(), ctx);
cache.getNotifier().notifyNodeModified(curr.getFqn(), false, NodeModifiedEvent.ModificationType.PUT_MAP, curr.getDataDirect(), ctx);
}
ctx.setOriginLocal(true);
Set<NodeSPI> children = curr.getChildrenDirect();
for (NodeSPI n : children)
{
notifyAllNodesCreated(ctx, n);
}
}
/*
private ClassLoader setClassLoader(ClassLoader newLoader)
{
ClassLoader oldClassLoader = null;
if (newLoader != null)
{
oldClassLoader = Thread.currentThread().getContextClassLoader();
Thread.currentThread().setContextClassLoader(newLoader);
}
return oldClassLoader;
}
private void resetClassLoader(ClassLoader oldLoader)
{
if (oldLoader != null)
{
Thread.currentThread().setContextClassLoader(oldLoader);
}
}
*/
private void integrateTransientState(NodeSPI target, ObjectInputStream in) throws Exception
{
target.removeChildrenDirect();
List<NodeData> list = readNodesAsList(in);
if (list != null)
{
// if the list was null we read an EOF marker!! So don't bother popping it off the stack later.
Iterator<NodeData> nodeDataIterator = list.iterator();
// Read the first NodeData and integrate into our target
if (nodeDataIterator.hasNext())
{
NodeData nd = nodeDataIterator.next();
//are there any transient nodes at all?
if (nd != null && !nd.isMarker())
{
Map attributes = nd.getAttributes();
target.setInternalState(attributes);
// Check whether this is an integration into the buddy backup
// subtree
Fqn tferFqn = nd.getFqn();
Fqn tgtFqn = target.getFqn();
boolean move = tgtFqn.isChildOrEquals(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN)
&& !tferFqn.isChildOrEquals(tgtFqn);
// If it is an integration, calculate how many levels of offset
int offset = move ? tgtFqn.size() - tferFqn.size() : 0;
integrateStateTransferChildren(target, offset, nodeDataIterator);
integrateRetainedNodes(target);
}
}
// read marker off stack
cache.getMarshaller().objectFromObjectStream(in);
}
}
@SuppressWarnings("unchecked")
private List<NodeData> readNodesAsList(ObjectInputStream in) throws Exception
{
Object obj = cache.getMarshaller().objectFromObjectStream(in);
if (obj instanceof NodeDataMarker) return null;
return (List<NodeData>) obj;
}
private NodeData integrateStateTransferChildren(NodeSPI parent, int offset, Iterator<NodeData> nodeDataIterator)
throws IOException, ClassNotFoundException
{
int parent_level = parent.getFqn().size();
int target_level = parent_level + 1;
Fqn fqn;
int size;
Object name;
NodeData nd = nodeDataIterator.hasNext() ? nodeDataIterator.next() : null;
while (nd != null && !nd.isMarker())
{
fqn = nd.getFqn();
// If we need to integrate into the buddy backup subtree,
// change the Fqn to fit under it
if (offset > 0)
{
fqn = Fqn.fromRelativeFqn(parent.getFqn().getAncestor(offset), fqn);
}
size = fqn.size();
if (size <= parent_level)
{
return nd;
}
else if (size > target_level)
{
throw new IllegalStateException("NodeData " + fqn + " is not a direct child of " + parent.getFqn());
}
name = fqn.get(size - 1);
Map attrs = nd.getAttributes();
// We handle this NodeData. Create a TreeNode and
// integrate its data
NodeSPI target = factory.createDataNode(name, fqn, parent, attrs, false);
parent.addChild(name, target);
// JBCACHE-913
Region region = cache.getRegion(fqn, false);
if (region != null && region.getEvictionPolicy() != null)
{
region.putNodeEvent(new EvictedEventNode(fqn, NodeEventType.ADD_NODE_EVENT,
attrs == null ? 0 : attrs.size()));
}
// Recursively call, which will walk down the tree
// and return the next NodeData that's a child of our parent
nd = integrateStateTransferChildren(target, offset, nodeDataIterator);
}
if (nd != null && nd.isExceptionMarker())
{
NodeDataExceptionMarker ndem = (NodeDataExceptionMarker) nd;
throw new CacheException("State provider node " + ndem.getCacheNodeIdentity()
+ " threw exception during loadState", ndem.getCause());
}
return null;
}
private Set<Node> retainInternalNodes(Node target)
{
Set<Node> result = new HashSet<Node>();
Fqn targetFqn = target.getFqn();
for (Fqn internalFqn : internalFqns)
{
if (internalFqn.isChildOf(targetFqn))
{
Node internalNode = getInternalNode(target, internalFqn);
if (internalNode != null)
{
result.add(internalNode);
}
}
}
return result;
}
private Node getInternalNode(Node parent, Fqn internalFqn)
{
Object name = internalFqn.get(parent.getFqn().size());
cache.getInvocationContext().getOptionOverrides().setSkipCacheStatusCheck(true);
Node result = parent.getChild(name);
if (result != null && internalFqn.size() < result.getFqn().size())
{
// need to recursively walk down the tree
result = getInternalNode(result, internalFqn);
}
return result;
}
private void integrateRetainedNodes(NodeSPI target)
{
Set<Node> retainedNodes = retainInternalNodes(target);
Fqn rootFqn = target.getFqn();
for (Node retained : retainedNodes)
{
if (retained.getFqn().isChildOf(rootFqn))
{
integrateRetainedNode(target, retained);
}
}
}
private void integrateRetainedNode(NodeSPI ancestor, Node descendant)
{
Fqn descFqn = descendant.getFqn();
Fqn ancFqn = ancestor.getFqn();
Object name = descFqn.get(ancFqn.size());
NodeSPI child = (NodeSPI) ancestor.getChild(name);
if (ancFqn.size() == descFqn.size() + 1)
{
if (child == null)
{
ancestor.addChild(name, descendant);
}
else
{
log.warn("Received unexpected internal node " + descFqn + " in transferred state");
}
}
else
{
if (child == null)
{
// Missing level -- have to create empty node
// This shouldn't really happen -- internal fqns should
// be immediately under the root
child = factory.createDataNode(name, Fqn.fromRelativeElements(ancFqn, name), ancestor, null, true);
ancestor.addChild(name, child);
}
// Keep walking down the tree
integrateRetainedNode(child, descendant);
}
}
}