Package org.apache.catalina.cluster.tcp

Source Code of org.apache.catalina.cluster.tcp.ReplicationTransmitter

/*
* Copyright 1999,2004 The Apache Software Foundation.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.catalina.cluster.tcp;

import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import javax.management.MBeanServer;
import javax.management.ObjectName;

import org.apache.catalina.cluster.ClusterSender;
import org.apache.catalina.cluster.Member;
import org.apache.catalina.cluster.io.XByteBuffer;
import org.apache.catalina.util.StringManager;
import org.apache.tomcat.util.IntrospectionUtils;

/**
* Transmit message to ohter cluster members create sender from replicationMode
* type
* FIXME i18n log messages
* FIXME compress data depends on message type and size
* FIXME send very big messages at some block see FarmWarDeployer!
* TODO pause and resume senders
*
* @author Peter Rossbach
* @author Filip Hanik
* @version $Revision: 1.22 $ $Date: 2005/03/25 22:12:31 $
*/
public class ReplicationTransmitter implements ClusterSender {
    private static org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory
            .getLog(ReplicationTransmitter.class);

    /**
     * The descriptive information about this implementation.
     */
    private static final String info = "ReplicationTransmitter/1.3";

    /**
     * The string manager for this package.
     */
    protected StringManager sm = StringManager.getManager(Constants.Package);

    private Map map = new HashMap();

    public ReplicationTransmitter() {
    }

    /**
     * number of transmitted messages>
     */
    private long nrOfRequests = 0;

    /**
     * number of transmitted bytes
     */
    private long totalBytes = 0;

    private long failureCounter = 0;

    /**
     * current sender replication mode
     */
    private String replicationMode;

    /**
     * sender default ackTimeout
     */
    private long ackTimeout = 15000; //15 seconds by default

    /**
     * enabled wait for ack
     */
    private boolean waitForAck = true;

    /**
     * autoConnect sender when next message send
     */
    private boolean autoConnect = true;

    /**
     * Compress message data bytes
     */
    private boolean compress = true;

    /**
     * dynamic sender <code>properties</code>
     */
    private Map properties = new HashMap();

    /**
     * my cluster
     */
    private SimpleTcpCluster cluster;

    /**
     * Transmitter Mbean name
     */
    private ObjectName objectName;

    // ------------------------------------------------------------- Properties

    /**
     * Return descriptive information about this implementation and the
     * corresponding version number, in the format
     * <code>&lt;description&gt;/&lt;version&gt;</code>.
     */
    public String getInfo() {

        return (info);

    }

    /**
     * @return Returns the nrOfRequests.
     */
    public long getNrOfRequests() {
        return nrOfRequests;
    }

    /**
     * @return Returns the totalBytes.
     */
    public long getTotalBytes() {
        return totalBytes;
    }

    /**
     * @return Returns the failureCounter.
     */
    public long getFailureCounter() {
        return failureCounter;
    }

    /**
     * current replication mode
     *
     * @return
     */
    public String getReplicationMode() {
        return replicationMode;
    }

    /**
     * set replication Mode (pooled, synchonous, asynchonous, fastasyncqueue)
     *
     * @see IDataSenderFactory#validateMode(String)
     * @param mode
     */
    public void setReplicationMode(String mode) {
        String msg = IDataSenderFactory.validateMode(mode);
        if (msg == null) {
            if (log.isDebugEnabled())
                log.debug("Setting replcation mode to " + mode);
            this.replicationMode = mode;
        } else
            throw new IllegalArgumentException(msg);

    }

    /**
     * Transmitter ObjectName
     *
     * @param name
     */
    public void setObjectName(ObjectName name) {
        objectName = name;
    }

    public ObjectName getObjectName() {
        return objectName;
    }

    /**
     * @return Returns the compress.
     */
    public boolean isCompress() {
        return compress;
    }

    /**
     * @param compress
     *            The compress to set.
     */
    public void setCompress(boolean compressMessageData) {
        this.compress = compressMessageData;
    }

    /**
     * @return Returns the autoConnect.
     */
    public boolean isAutoConnect() {
        return autoConnect;
    }

    /**
     * @param autoConnect
     *            The autoConnect to set.
     */
    public void setAutoConnect(boolean autoConnect) {
        this.autoConnect = autoConnect;
        setProperty("autoConnect", String.valueOf(autoConnect));

    }

    /**
     * @return
     */
    public long getAckTimeout() {
        return ackTimeout;
    }

    /**
     * @param ackTimeout
     */
    public void setAckTimeout(long ackTimeout) {
        this.ackTimeout = ackTimeout;
        setProperty("ackTimeout", String.valueOf(ackTimeout));
    }

    /**
     * @return Returns the waitForAck.
     */
    public boolean isWaitForAck() {
        return waitForAck;
    }

    /**
     * @param waitForAck
     *            The waitForAck to set.
     */
    public void setWaitForAck(boolean waitForAck) {
        this.waitForAck = waitForAck;
        setProperty("waitForAck", String.valueOf(waitForAck));
    }

    /*
     * configured in cluster
     *
     * @see org.apache.catalina.cluster.ClusterSender#setCatalinaCluster(org.apache.catalina.cluster.tcp.SimpleTcpCluster)
     */
    public void setCatalinaCluster(SimpleTcpCluster cluster) {
        this.cluster = cluster;

    }

    /**
     * @return
     * @deprecated since version 5.5.7
     */
    public boolean getIsSenderSynchronized() {
        return IDataSenderFactory.SYNC_MODE.equals(replicationMode)
                || IDataSenderFactory.POOLED_SYNC_MODE.equals(replicationMode);
    }

    // ------------------------------------------------------------- dynamic
    // sender property handling

    /**
     * set config attributes with reflect
     *
     * @param name
     * @param value
     */
    public void setProperty(String name, Object value) {
        if (log.isTraceEnabled())
            log.trace(sm.getString("ReplicationTransmitter.setProperty", name,
                    value));

        properties.put(name, value);
    }

    /**
     * get current config
     *
     * @param key
     * @return
     */
    public Object getProperty(String key) {
        if (log.isTraceEnabled())
            log.trace(sm.getString("ReplicationTransmitter.getProperty", key));
        return properties.get(key);
    }

    /**
     * Get all properties keys
     *
     * @return
     */
    public Iterator getPropertyNames() {
        return properties.keySet().iterator();
    }

    /**
     * remove a configured property.
     *
     * @param key
     */
    public void removeProperty(String key) {
        properties.remove(key);
    }

    // ------------------------------------------------------------- public

    /**
     * Send data to one member
     *
     * @see org.apache.catalina.cluster.ClusterSender#sendMessage(java.lang.String,
     *      byte[], org.apache.catalina.cluster.Member)
     */
    public void sendMessage(String sessionId, byte[] indata, Member member)
            throws java.io.IOException {
        byte[] data = convertSenderData(indata);
        String key = getKey(member);
        IDataSender sender = (IDataSender) map.get(key);
        sendMessageData(sessionId, data, sender);
    }

    /**
     * send message to all senders (broadcast)
     *
     * @see org.apache.catalina.cluster.ClusterSender#sendMessage(java.lang.String,
     *      byte[])
     */
    public void sendMessage(String sessionId, byte[] indata)
            throws java.io.IOException {
        IDataSender[] senders = getSenders();
        byte[] data = convertSenderData(indata);
        for (int i = 0; i < senders.length; i++) {

            IDataSender sender = senders[i];
            try {
                sendMessageData(sessionId, data, sender);
            } catch (Exception x) {

                if (!sender.getSuspect())
                    log.warn("Unable to send replicated message to " + sender
                            + ", is server down?", x);
                sender.setSuspect(true);
            }
        }
    }

    /**
     * start the sender and register transmitter mbean
     *
     * @see org.apache.catalina.cluster.ClusterSender#start()
     */
    public void start() throws java.io.IOException {
        if (cluster != null) {
            ObjectName clusterName = cluster.getObjectName();
            try {
                MBeanServer mserver = cluster.getMBeanServer();
                ObjectName transmitterName = new ObjectName(clusterName
                        .getDomain()
                        + ":type=ClusterSender,host="
                        + clusterName.getKeyProperty("host"));
                if (mserver.isRegistered(transmitterName)) {
                    if (log.isWarnEnabled())
                        log.warn(sm.getString(
                                "cluster.mbean.register.allready",
                                transmitterName));
                    return;
                }
                setObjectName(transmitterName);
                mserver.registerMBean(cluster.getManagedBean(this),
                        getObjectName());
            } catch (Exception e) {
                log.warn(e);
            }
        }

    }

    /*
     * stop the sender and deregister mbeans (transmitter, senders)
     *
     * @see org.apache.catalina.cluster.ClusterSender#stop()
     */
    public synchronized void stop() {
        Iterator i = map.entrySet().iterator();
        while (i.hasNext()) {
            IDataSender sender = (IDataSender) ((java.util.Map.Entry) i.next())
                    .getValue();
            try {
                unregisterSenderMBean(sender);
                sender.disconnect();
            } catch (Exception x) {
            }
            i.remove();
        }
        if (cluster != null && getObjectName() != null) {
            try {
                MBeanServer mserver = cluster.getMBeanServer();
                mserver.unregisterMBean(getObjectName());
            } catch (Exception e) {
                log.error(e);
            }
        }

    }

    /**
     * get all current senders
     *
     * @return
     */
    public IDataSender[] getSenders() {
        java.util.Iterator iter = map.entrySet().iterator();
        IDataSender[] array = new IDataSender[map.size()];
        int i = 0;
        while (iter.hasNext()) {
            IDataSender sender = (IDataSender) ((java.util.Map.Entry) iter
                    .next()).getValue();
            if (sender != null)
                array[i] = sender;
            i++;
        }
        return array;
    }

    /**
     * get all current senders
     *
     * @return
     */
    public ObjectName[] getSenderObjectNames() {
        java.util.Iterator iter = map.entrySet().iterator();
        ObjectName array[] = new ObjectName[map.size()];
        int i = 0;
        while (iter.hasNext()) {
            IDataSender sender = (IDataSender) ((java.util.Map.Entry) iter
                    .next()).getValue();
            if (sender != null)
                array[i] = getSenderObjectName(sender);
            i++;
        }
        return array;
    }

    /*
     * Reset sender statistics
     */
    public synchronized void resetStatistics() {
        nrOfRequests = 0;
        totalBytes = 0;
        failureCounter = 0;
    }

    /*
     * add new cluster member and create sender ( s. replicationMode) transfer
     * current properties to sender
     *
     * @see org.apache.catalina.cluster.ClusterSender#add(org.apache.catalina.cluster.Member)
     */
    public synchronized void add(Member member) {
        try {
            String key = getKey(member);
            if (!map.containsKey(key)) {
                IDataSender sender = IDataSenderFactory.getIDataSender(
                        replicationMode, member);
                transferSenderProperty(sender);
                map.put(key, sender);
                registerSenderMBean(member, sender);
            }
        } catch (java.io.IOException x) {
            log.error("Unable to create and add a IDataSender object.", x);
        }
    }

    /**
     * remove sender from transmitter. ( deregister mbean and disconnect sender )
     *
     * @see org.apache.catalina.cluster.ClusterSender#remove(org.apache.catalina.cluster.Member)
     */
    public synchronized void remove(Member member) {
        String key = getKey(member);
        IDataSender toberemoved = (IDataSender) map.get(key);
        if (toberemoved == null)
            return;
        unregisterSenderMBean(toberemoved);
        toberemoved.disconnect();
        map.remove(key);

    }

    // ------------------------------------------------------------- protected

    /**
     * calc number of requests and transfered bytes. Log stats all 100 requets
     *
     * @param length
     */
    protected synchronized void addStats(int length) {
        nrOfRequests++;
        totalBytes += length;
        if (log.isDebugEnabled() && (nrOfRequests % 100) == 0) {
            log.debug("Nr of bytes sent=" + totalBytes + " over "
                    + nrOfRequests + "; avg=" + (totalBytes / nrOfRequests)
                    + " bytes/request; failures=" + failureCounter);
        }

    }

    /**
     * Transfer all properties from transmitter to concrete sender
     *
     * @param sender
     */
    protected void transferSenderProperty(IDataSender sender) {
        for (Iterator iter = getPropertyNames(); iter.hasNext();) {
            String pkey = (String) iter.next();
            Object value = getProperty(pkey);
            IntrospectionUtils.setProperty(sender, pkey, value.toString());
        }
    }

    /**
     * set unique key to find sender
     *
     * @param member
     * @return concat member.host:member.port
     */
    protected String getKey(Member member) {
        return member.getHost() + ":" + member.getPort();
    }

    /**
     * unregsister sendern Mbean
     *
     * @see #getSenderObjectName(IDataSender)
     * @param sender
     */
    protected void unregisterSenderMBean(IDataSender sender) {
        try {
            MBeanServer mserver = cluster.getMBeanServer();
            if (mserver != null) {
                mserver.unregisterMBean(getSenderObjectName(sender));
            }
        } catch (Exception e) {
            log.warn(e);
        }
    }

    /**
     * register MBean and check it exist (big problem!)
     *
     * @param member
     * @param sender
     */
    protected void registerSenderMBean(Member member, IDataSender sender) {
        if (member != null && cluster != null) {
            try {
                MBeanServer mserver = cluster.getMBeanServer();
                ObjectName senderName = getSenderObjectName(sender);
                if (mserver.isRegistered(senderName)) {
                    if (log.isWarnEnabled())
                        log.warn(sm.getString(
                                "cluster.mbean.register.allready", senderName));
                    return;
                }
                mserver.registerMBean(cluster.getManagedBean(sender),
                        senderName);
            } catch (Exception e) {
                log.warn(e);
            }
        }
    }

    /**
     * build sender ObjectName (
     * engine.domain:type=IDataSender,host="host",senderAddress="receiver.address",senderPort="port" )
     *
     * @param sender
     * @return
     */
    protected ObjectName getSenderObjectName(IDataSender sender) {
        ObjectName senderName = null;
        try {
            ObjectName clusterName = cluster.getObjectName();
            MBeanServer mserver = cluster.getMBeanServer();
            senderName = new ObjectName(clusterName.getDomain()
                    + ":type=IDataSender,host="
                    + clusterName.getKeyProperty("host") + ",senderAddress="
                    + sender.getAddress().getHostAddress() + ",senderPort="
                    + sender.getPort());
        } catch (Exception e) {
            log.warn(e);
        }
        return senderName;
    }

    /**
     * compress data
     *
     * @see XByteBuffer#createDataPackage(byte[])
     * @param indata
     * @return
     * @throws IOException
     *             FIXME get CompressMessageDate from cluster instanz
     */
    protected byte[] convertSenderData(byte[] data) throws IOException {
        return XByteBuffer.createDataPackage(data, isCompress());
    }

    /**
     * Send message to concrete sender. If autoConnect is true, check is
     * connection broken and the reconnect the complete sender.
     * <ul>
     * <li>failure the suspect flag is set true. After successfully sending the
     * suspect flag is set to false.</li>
     * <li>Stats is only update after sussesfull sending</li>
     * </ul>
     *
     * @param sessionId
     *            Unique Message Id
     * @param data
     *            message Data
     * @param sender
     *            concrete message sender
     * @throws java.io.IOException
     */
    protected void sendMessageData(String sessionId, byte[] data,
            IDataSender sender) throws java.io.IOException {
        if (sender == null)
            throw new java.io.IOException(
                    "Sender not available. Make sure sender information is available to the ReplicationTransmitter.");
        try {
            if (autoConnect && !sender.isConnected())
                sender.connect();
            sender.sendMessage(sessionId, data);
            sender.setSuspect(false);
            addStats(data.length);
        } catch (Exception x) {
            if (log.isWarnEnabled()) {
                if (!sender.getSuspect()) {
                    log
                            .warn(
                                    "Unable to send replicated message, is server down?",
                                    x);
                }
            }
            sender.setSuspect(true);
            failureCounter++;
        }

    }

}
TOP

Related Classes of org.apache.catalina.cluster.tcp.ReplicationTransmitter

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.