/*
* To change this template, choose Tools | Templates
* and open the template in the editor.
*/
package com.sun.enterprise.ee.server.group;
import com.sun.enterprise.ee.cms.core.Action;
import com.sun.enterprise.ee.cms.core.ActionException;
import com.sun.enterprise.ee.cms.core.MessageAction;
import com.sun.enterprise.ee.cms.core.MessageActionFactory;
import com.sun.enterprise.ee.cms.core.Signal;
import com.sun.enterprise.ee.cms.core.MessageSignal;
import com.sun.enterprise.ee.server.group.Message.Route;
import com.sun.enterprise.ee.server.group.core.ServerMessageRuntime;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* This class is responsible for receiving the messages from GMS.
*
* @author Binod.
*/
public class MessageReceiver implements MessageActionFactory, MessageAction {
private String componentName = null;
private String serverName = null;
private ServerMessageRuntime rt = null;
private Logger logger;
public MessageReceiver
(String componentName, String serverName, ServerMessageRuntime rt) {
this.componentName = componentName;
this.serverName = serverName;
this.rt = rt;
this.logger = rt.getLogger();
}
/**
* Overriding the produceAction method of MessageActionFactory.
*/
public Action produceAction() {
return new MessageReceiver(componentName, serverName, rt);
}
/**
* Consume the mssages from GMS. Since the Action Factory is registered
* to receive messages from a specific component, it will only receieve
* relevent messages. But, this code still cross-check the component name.
*
* @param signal
* @throws com.sun.enterprise.ee.cms.core.ActionException
*/
public void consumeSignal(Signal signal) throws ActionException {
if (logger.isLoggable(Level.FINER)) {
logger.finer("MessageReceiver Consumed: " + signal);
}
if (signal instanceof MessageSignal) {
MessageSignal ms = MessageSignal.class.cast(signal);
if (ms.getTargetComponent().equalsIgnoreCase(componentName)) {
if (this.serverName == null ||
this.serverName.equalsIgnoreCase(serverName)) {
if (logger.isLoggable(Level.FINER)) {
logger.finer("MessageReceiver Processing.... "
+ ms.getMessage());
}
processMessage(ms.getMessage(), signal.getMemberToken());
}
}
}
}
private void processMessage(byte[] msg, String member) {
Message msgObject = MessageParser.parse(msg);
if (logger.isLoggable(Level.FINER)) {
logger.finer("Setting " + member + "in " + msgObject
+ ", with key " + msgObject.getMessageAggregationKey());
}
Route route = msgObject.getRoute();
// ALLTOONE messages can be sent to a specific instance using GMS
// sendMessage api. But, the for some reason, I could not get it
// working. So, ALLTOONE messages reach all instances. We reject the
// unwanted ones here.
if (route.ALLTOONE.equals(route) && ! rt.electedSelf()) {
return;
}
if (logger.isLoggable(Level.FINER)) {
logger.finer("Consumed and Adding the message to aggregator: "
+ msgObject + " from" + member);
}
// Set the server name of the sender in the message object. This
// will later be used during message aggregation.
msgObject.setSender(member);
MessageAggregator mr = null;
mr = rt.getMessageAggregator
(msgObject.getMessageAggregationKey(), msgObject.getRoute());
mr.addMessage(msgObject);
}
}