/**
*
* Copyright 2004 The Apache Software Foundation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.geronimo.datastore.impl.remote.replication;
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectOutput;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.geronimo.datastore.impl.remote.messaging.AbstractConnector;
import org.apache.geronimo.datastore.impl.remote.messaging.CommandRequest;
import org.apache.geronimo.datastore.impl.remote.messaging.CommandResult;
import org.apache.geronimo.datastore.impl.remote.messaging.HeaderOutInterceptor;
import org.apache.geronimo.datastore.impl.remote.messaging.Msg;
import org.apache.geronimo.datastore.impl.remote.messaging.MsgBody;
import org.apache.geronimo.datastore.impl.remote.messaging.MsgHeader;
import org.apache.geronimo.datastore.impl.remote.messaging.MsgHeaderConstants;
import org.apache.geronimo.datastore.impl.remote.messaging.MsgOutInterceptor;
import org.apache.geronimo.datastore.impl.remote.messaging.Node;
import org.apache.geronimo.datastore.impl.remote.messaging.NodeInfo;
import org.apache.geronimo.datastore.impl.remote.messaging.NodeContext;
import org.apache.geronimo.gbean.GAttributeInfo;
import org.apache.geronimo.gbean.GBean;
import org.apache.geronimo.gbean.GBeanInfo;
import org.apache.geronimo.gbean.GBeanInfoFactory;
import org.apache.geronimo.gbean.GConstructorInfo;
/**
* A replication group member.
* <BR>
* This is a Connector in charge of replicating the state of registered
* ReplicantCapables across N-nodes, which constitute a replication group.
* <BR>
* Replication members are organized as follow:
* <pre>
* ReplicationMember -- MTO -- ServerNode -- MTM -- ServerNode -- OTM -- ReplicationMember
* </pre>
*
* @version $Revision: 1.1 $ $Date: 2004/03/24 11:37:06 $
*/
public class ReplicationMemberImpl
extends AbstractConnector
implements ReplicationMember, GBean
{
/**
* Name of the replication group.
*/
private final String name;
/**
* ReplicantID to ReplicantCapable Map.
*/
private final Map idToReplicant;
/**
* Nodes hosting the other members of the replication group
* of this member.
*/
private NodeInfo[] targetNodes;
/**
* Output to be used to send requests.
*/
private MsgOutInterceptor requestOut;
/**
* Output to be used to send results.
*/
private MsgOutInterceptor resultOut;
/**
* Creates a replication group member.
*
* @param aNode Node containing this instance.
* @param aName Name of the replication group owning this member.
* @param aTargetNodes Nodes hosting the other members of the
* replication group containing this member.
*/
public ReplicationMemberImpl(Node aNode,
String aName, NodeInfo[] aTargetNodes) {
super(aNode);
if ( null == aName ) {
throw new IllegalArgumentException("Name is required");
} else if ( null == aTargetNodes ) {
throw new IllegalArgumentException("Node names is required");
}
name = aName;
targetNodes = aTargetNodes;
idToReplicant = new HashMap();
}
public String getName() {
return name;
}
public void fireUpdateEvent(UpdateEvent anEvent) {
// One does not send the actual ReplicantCapable in the case of an
// update. Instead, one sends only its identifier.
ReplicationCapable target = (ReplicationCapable) anEvent.getTarget();
anEvent.setTarget(target.getID());
sender.sendSyncRequest(
new CommandRequest("mergeWithUpdate", new Object[] {anEvent}),
requestOut, targetNodes);
}
/**
* Merges an UpdateEvent with a registered ReplicationCapable.
*
* @param anEvent Update event to be merged.
* @throws ReplicationException Indicates that the merge can not be
* performed.
*/
public void mergeWithUpdate(UpdateEvent anEvent)
throws ReplicationException {
ReplicantID id = (ReplicantID) anEvent.getTarget();
ReplicationCapable replicationCapable;
synchronized(idToReplicant) {
replicationCapable = (ReplicationCapable) idToReplicant.get(id);
}
if ( null == replicationCapable ) {
throw new ReplicationException(
"No ReplicantCapable with the id {" + id + "}");
}
replicationCapable.mergeWithUpdate(anEvent);
}
/**
* Registers a ReplicantCapable. From now, UpdateEvents multicasted
* by the provided ReplicantCapable are also pushed to the replication
* group.
*
* @param aReplicant ReplicantCapable to be controlled by this group.
*/
public void registerReplicantCapable(ReplicationCapable aReplicant) {
ReplicantID id = new ReplicantID();
aReplicant.setID(id);
sender.sendSyncRequest(
new CommandRequest("registerLocalReplicantCapable",
new Object[] {aReplicant}),
requestOut, targetNodes);
synchronized(idToReplicant) {
idToReplicant.put(id, aReplicant);
aReplicant.addUpdateListener(this);
}
}
/**
* This method is for internal use only.
* <BR>
* It registers with this member a ReplicationCapable, which has been
* registered by a remote member.
*
* @param aReplicant ReplicantCapable to be locally registered.
*/
public void registerLocalReplicantCapable(ReplicationCapable aReplicant) {
synchronized(idToReplicant) {
aReplicant.addUpdateListener(this);
idToReplicant.put(aReplicant.getID(), aReplicant);
}
}
/**
* Retrieves the ReplicationCapable having the specified id.
*
* @param anID Replicant identifier.
* @return ReplicantCapable having the specified id or null if such an
* identifier is not known.
*/
public ReplicationCapable retrieveReplicantCapable(Object anID) {
synchronized(idToReplicant) {
return (ReplicationCapable) idToReplicant.get(anID);
}
}
public void setContext(NodeContext aContext) {
super.setContext(aContext);
if ( null != out ) {
out =
new HeaderOutInterceptor(
MsgHeaderConstants.DEST_CONNECTOR,
name,
out);
requestOut =
new HeaderOutInterceptor(
MsgHeaderConstants.BODY_TYPE,
MsgBody.Type.REQUEST,
out);
resultOut =
new HeaderOutInterceptor(
MsgHeaderConstants.BODY_TYPE,
MsgBody.Type.RESPONSE,
out);
} else {
requestOut = null;
resultOut = null;
}
}
protected void handleRequest(Msg aMsg) {
MsgBody body = aMsg.getBody();
MsgHeader header = aMsg.getHeader();
Object sourceNode = header.getHeader(MsgHeaderConstants.SRC_NODE);
Object id = header.getHeader(MsgHeaderConstants.CORRELATION_ID);
CommandRequest command;
String gateway;
command = (CommandRequest) body.getContent();
command.setTarget(this);
CommandResult result = command.execute();
Msg msg = new Msg();
body = msg.getBody();
body.setContent(result);
MsgOutInterceptor reqOut =
new HeaderOutInterceptor(
MsgHeaderConstants.CORRELATION_ID,
id,
new HeaderOutInterceptor(
MsgHeaderConstants.DEST_NODES,
targetNodes,
new HeaderOutInterceptor(
MsgHeaderConstants.DEST_CONNECTOR,
name,
resultOut)));
reqOut.push(msg);
}
/**
* ReplicantCapable identifier.
*/
private static class ReplicantID implements Externalizable {
private static volatile int seqId = 0;
private int id;
public ReplicantID() {
id = seqId++;
}
public int hashCode() {
return id;
}
public boolean equals(Object obj) {
if ( false == obj instanceof ReplicantID ) {
return false;
}
ReplicantID replicantID = (ReplicantID) obj;
return id == replicantID.id;
}
public void writeExternal(ObjectOutput out) throws IOException {
out.writeInt(id);
}
public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
id = in.readInt();
}
}
public static final GBeanInfo GBEAN_INFO;
static {
GBeanInfoFactory infoFactory = new GBeanInfoFactory("Replication Member", ReplicationMemberImpl.class.getName(), AbstractConnector.GBEAN_INFO);
infoFactory.addAttribute(new GAttributeInfo("TargetNodes", true));
infoFactory.addOperation("registerReplicantCapable", new Class[] {ReplicationCapable.class});
infoFactory.addOperation("retrieveReplicantCapable", new Class[] {Object.class});
infoFactory.setConstructor(new GConstructorInfo(
Arrays.asList(new Object[]{"Node", "Name", "TargetNodes"}),
Arrays.asList(new Object[]{Node.class, String.class, NodeInfo[].class})));
GBEAN_INFO = infoFactory.getBeanInfo();
}
public static GBeanInfo getGBeanInfo() {
return GBEAN_INFO;
}
}