Package de.zib.scalaris

Source Code of de.zib.scalaris.DefaultConnectionPolicy$BadNodesComparator

/**
*  Copyright 2007-2011 Zuse Institute Berlin
*
*   Licensed under the Apache License, Version 2.0 (the "License");
*   you may not use this file except in compliance with the License.
*   You may obtain a copy of the License at
*
*       http://www.apache.org/licenses/LICENSE-2.0
*
*   Unless required by applicable law or agreed to in writing, software
*   distributed under the License is distributed on an "AS IS" BASIS,
*   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*   See the License for the specific language governing permissions and
*   limitations under the License.
*/
package de.zib.scalaris;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Date;
import java.util.List;
import java.util.Random;
import java.util.SortedSet;
import java.util.TreeSet;

import com.ericsson.otp.erlang.OtpAuthException;

/**
* Implements a {@link ConnectionPolicy} by choosing nodes randomly.
*
* Sorts nodes into two lists:
* <ul>
* <li>good nodes</li>
* <li>bad nodes</li>
* </ul>
* Nodes that failed during a connection or in an attempt to connect will be
* transferred to the bad nodes list {@link #badNodes}. When a connection
* attempt to a node is successful, it will be removed from this list and put
* into the good nodes list {@link #goodNodes} and the node's connection failure
* statistics will be reset using {@link PeerNode#resetFailureCount()}.
*
* Whenever a node is being selected for a new connection (or reconnect), it
* will select one randomly from the {@link #goodNodes} list. If this list is
* empty, it will select the least recently failed node from {@link #badNodes}.
* At most {@link #maxRetries} retries are attempted per operation (see
* {@link Connection#connect()},
* {@link Connection#doRPC(String, String, com.ericsson.otp.erlang.OtpErlangList)}
* and
* {@link Connection#doRPC(String, String, com.ericsson.otp.erlang.OtpErlangObject[])}
* ) - the number of the current attempt will not be cached in this class. Set
* the maximal number of retries using {@link #setMaxRetries(int)}.
*
* Attention: All member's functions are synchronised as there can be a single
* connection policy object used by many threads and the access to the
* {@link #goodNodes} and {@link #badNodes} members are logically linked
* together and operations on both need to be performed atomically. Additionally
* access to {@link PeerNode} objects are synchronised on themselves. It is
* therefore important not to use any of this classes methods in blocks that
* synchronise on any node object. Otherwise deadlocks might occur!!
*
* @author Nico Kruber, kruber@zib.de
*
* @version 2.3
* @since 2.3
*/
public class DefaultConnectionPolicy extends ConnectionPolicy {

    /**
     * Helper class that defines the order of the {@link PeerNode} objects in
     * the {@link DefaultConnectionPolicy#badNodes} member: least recently
     * failed = first.
     *
     * @author Nico Kruber, kruber@zib.de
     *
     * @version 2.3
     * @since 2.3
     */
    private static class BadNodesComparator implements Comparator<PeerNode>, java.io.Serializable {
        /**
         * ID for serialisation purposes.
         */
        private static final long serialVersionUID = 1L;

        /**
         * Defines a order of nodes with failed connections (least recently
         * failed nodes first).
         *
         * Returns a negative integer, zero, or a positive integer if the first
         * argument is less than, equal to, or greater than the second.
         *
         * Warning: This method is unsynchronised and should therefore only be
         * used by calls from functions inside {@link DefaultConnectionPolicy}
         * which are synchronised!
         *
         * @param o1
         *            the first node
         * @param o2
         *            the second node
         *
         * @return a negative integer, zero, or a positive integer if the first
         *         argument is less than, equal to, or greater than the second
         */
        public int compare(PeerNode o1, PeerNode o2) {
            // Returns a negative integer, zero, or a positive integer as the
            // first argument is less than, equal to, or greater than the
            // second.
            if (o1 == o2) {
                return 0;
            }

            Date d1 = o1.getLastFailedConnect();
            Date d2 = o2.getLastFailedConnect();
            Long o1Time = ((d1 == null) ? 0 : d1.getTime());
            Long o2Time = ((d2 == null) ? 0 : d2.getTime());
            int compByTime = o1Time.compareTo(o2Time);

            if (compByTime == 0) {
                // two different nodes have the same fail dates
                // -> make order dependent on their hash code:
                int h1 = o1.hashCode();
                int h2 = o2.hashCode();
                if (h1 < h2) {
                    return -1;
                } else if (h1 > h2){
                    return 1;
                } else {
                    // two different nodes have equal fail dates and hash codes
                    // -> compare their names (last resort)
                    int compByName = o1.getNode().node().compareTo(o2.getNode().node());
                    if (compByName != 0) {
                        return compByName;
                    } else {
                        throw new ClassCastException(
                                "Cannot compare "
                                        + o1
                                        + " with "
                                        + o2
                                        + ": they share the same fail time, hash code, node name and cookie.");
                    }
                }
            } else {
                return compByTime;
            }
        }
    }

    // we could use synchronised lists and sets as provided by
    // Collections.synchronizedList and Collections.synchronizedSortedSet
    // but those two depend on each other and we thus need synchronised methods
    // which make synchronisations here obsolete
    /**
     * A list of good nodes (nodes which recently successfully connected).
     */
    protected List<PeerNode> goodNodes = new ArrayList<PeerNode>();
    /**
     * Bad nodes (nodes which recently failed) in the order of their failed
     * date.
     */
    protected SortedSet<PeerNode> badNodes = new TreeSet<PeerNode>(new BadNodesComparator());

    /**
     * Random number generator for selecting random nodes in the
     * {@link #goodNodes} list.
     */
    private Random random = new Random();

    /**
     * The maximal number of connection retries.
     */
    private int maxRetries = 3;

    /**
     * Creates a new connection policy working with the given remote node.
     *
     * Provided for convenience.
     *
     * Attention: This method also synchronises on the node.
     *
     * @param remoteNode the (only) available remote node
     */
    public DefaultConnectionPolicy(PeerNode remoteNode) {
        super(remoteNode);
        availableNodeAdded(remoteNode);
    }

    /**
     * Creates a new connection policy with the given remote nodes.
     *
     * Attention: This method synchronises on {@code availableRemoteNodes}.
     *
     * Any time this list is changed, the according methods in this class should
     * be called, i.e. {@link #availableNodeAdded(PeerNode)},
     * {@link #availableNodeRemoved(PeerNode)}, {@link #availableNodesReset()}
     * to update the good and bad nodes lists.
     *
     * @param availableRemoteNodes
     *            the remote nodes available for connections
     */
    public DefaultConnectionPolicy(List<PeerNode> availableRemoteNodes) {
        super(availableRemoteNodes);
        synchronized (availableRemoteNodes) {
            for (PeerNode remoteNode : availableRemoteNodes) {
                availableNodeAdded(remoteNode);
            }
        }
    }

    /**
     * Adds the given node to the {@link #goodNodes} list if it has no failures,
     * otherwise it will be added to {@link #badNodes}.
     *
     * Attention: This method also synchronises on the node.
     *
     * @param newNode the new node
     */
    public synchronized void availableNodeAdded(PeerNode newNode) {
        synchronized (newNode) {
            if (newNode.getFailureCount() == 0) {
                goodNodes.add(newNode);
            } else {
                badNodes.add(newNode);
            }
        }
    }

    /**
     * Removes the node from the {@link #goodNodes} and {@link #badNodes} lists.
     *
     * @param removedNode the removed node
     */
    public synchronized void availableNodeRemoved(PeerNode removedNode) {
        goodNodes.remove(removedNode);
        badNodes.remove(removedNode);
    }

    /**
     * Resets the {@link #goodNodes} and {@link #badNodes} members as the list
     * of available nodes has been reset.
     */
    public synchronized void availableNodesReset() {
        goodNodes.clear();
        badNodes.clear();
    }

    /**
     * Sets the given node's last failed connect time stamp and moves it to the
     * {@link #badNodes} list.
     *
     * Attention: This method also synchronises on the node.
     *
     * @param node the failed node
     */
    public synchronized void nodeFailed(PeerNode node) {
        synchronized (node) {
            // remove the node from the badNodes if it is in there (will be
            // reinserted at a new point)
            badNodes.remove(node);
            // update fail time before adding the node to the SortedSet!
            node.setLastFailedConnect();
            if (node.getFailureCount() == 1) {
                // a node that has not failed before must be in goodNodes
                // -> move it to badNodes
                goodNodes.remove(node);
            }
            badNodes.add(node);
        }
    }

    /**
     * Acts upon a failure reset of the given node.
     *
     * Resets the node's last failure state.
     *
     * @param node the node
     */
    public void nodeFailReset(PeerNode node) {
        synchronized (node) {
            if (node.getFailureCount() > 0) {
                // a previously failed node must be in badNodes
                // -> move it back to goodNodes
                badNodes.remove(node);
                node.resetFailureCount();
                goodNodes.add(node);
            }
        }
    }

    /**
     * Sets the node's last successful connect time stamp, resets its failure
     * statistics and moves it to the {@link #goodNodes} list.
     *
     * Attention: This method also synchronises on the node.
     *
     * @param node the node
     */
    public synchronized void nodeConnectSuccess(PeerNode node) {
        synchronized (node) {
            node.setLastConnectSuccess();
            if (node.getFailureCount() > 0) {
                // a previously failed node must be in badNodes
                // -> move it back to goodNodes
                badNodes.remove(node);
                node.resetFailureCount();
                goodNodes.add(node);
            }
        }
    }

    /**
     * Returns a random node from the list of good nodes.
     * Assumes {@link #goodNodes} to have at least one element.
     *
     * @return a random good node
     */
    protected synchronized PeerNode getGoodNode() {
        if (goodNodes.size() == 1) {
            return goodNodes.get(0);
        } else {
            return goodNodes.get(random.nextInt(goodNodes.size()));
        }
    }

    /**
     * Selects the node to (re-)connect with until the maximal number of
     * {@link #maxRetries} has been reached.
     *
     * Throws an exception if {@code retry > maxRetries} and thus stops further
     * node connection attempts. Otherwise chooses a random good node or (if
     * there are no good nodes) the least recently failed bad node.
     *
     * @param <E>
     *            the type of the exception that came from the failed connection
     *            and may be re-thrown
     *
     * @param retry
     *            the n'th retry (initial connect = 0, 1st reconnect = 1,...)
     * @param failedNode
     *            the node from the previous connection attempt or {@code null}
     * @param e
     *            the exception that came back from the previous connection
     *            attempt or {@code null}
     *
     * @return the new node to connect with
     *
     * @throws E
     *             if thrown, automatic re-connection attempts will stop
     * @throws UnsupportedOperationException
     *             is thrown if the operation can not be performed, e.g. the
     *             list is empty
     *
     * @see Connection#connect()
     * @see Connection#doRPC(String, String,
     *      com.ericsson.otp.erlang.OtpErlangList)
     * @see Connection#doRPC(String, String,
     *      com.ericsson.otp.erlang.OtpErlangObject[])
     */
    @SuppressWarnings("unchecked")
    public synchronized <E extends Exception> PeerNode selectNode(int retry,
            PeerNode failedNode, E e) throws E {
        if (retry <= maxRetries) {
            if ((goodNodes.size() + badNodes.size()) < 1) {
                throw new UnsupportedOperationException(
                        "Can not choose a node from an empty list.");
            } else if (goodNodes.size() > 0) {
                return getGoodNode();
            } else {
                return badNodes.first();
            }
        } else {
            String newMessage = e.getMessage() + ", bad nodes: " + badNodes.toString() + ", good nodes: " + goodNodes.toString() + ", retries: " + (retry - 1);
            if (e instanceof OtpAuthException) {
                OtpAuthException e1 = new OtpAuthException(newMessage);
                e1.setStackTrace(e.getStackTrace());
                throw (E) e1;
            } else if (e instanceof IOException) {
                IOException e1 = new IOException(newMessage);
                e1.setStackTrace(e.getStackTrace());
                throw (E) e1;
            } else {
                throw e;
            }
        }
    }


    /**
     * Sets the maximal number of automatic connection retries.
     *
     * @return the maxRetries
     */
    public int getMaxRetries() {
        return maxRetries;
    }

    /**
     * Gets the maximal number of automatic connection retries.
     *
     * @param maxRetries the maxRetries to set
     */
    public void setMaxRetries(int maxRetries) {
        this.maxRetries = maxRetries;
    }

    /**
     * Gets a copy of the list of good nodes (contains references to the
     * {@link PeerNode} objects).
     *
     * @return the list of good nodes
     */
    public synchronized List<PeerNode> getGoodNodes() {
        return new ArrayList<PeerNode>(goodNodes);
    }

    /**
     * Gets a copy of the list of good nodes (contains references to the
     * {@link PeerNode} objects).
     *
     * @return the list of good nodes
     */
    public synchronized List<PeerNode> getBadNodes() {
        ArrayList<PeerNode> result = new ArrayList<PeerNode>(badNodes.size());
        for (PeerNode p : badNodes) {
            result.add(p);
        }
        return result;
    }
}
TOP

Related Classes of de.zib.scalaris.DefaultConnectionPolicy$BadNodesComparator

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.