Package org.voltcore.messaging

Source Code of org.voltcore.messaging.HostMessenger$Config

/* This file is part of VoltDB.
* Copyright (C) 2008-2014 VoltDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with VoltDB.  If not, see <http://www.gnu.org/licenses/>.
*/

package org.voltcore.messaging;

import java.net.InetSocketAddress;
import java.net.SocketException;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import org.apache.zookeeper_voltpatches.CreateMode;
import org.apache.zookeeper_voltpatches.ZooDefs.Ids;
import org.apache.zookeeper_voltpatches.ZooKeeper;
import org.json_voltpatches.JSONArray;
import org.json_voltpatches.JSONObject;
import org.json_voltpatches.JSONStringer;
import org.voltcore.agreement.AgreementSite;
import org.voltcore.agreement.InterfaceToMessenger;
import org.voltcore.logging.VoltLogger;
import org.voltcore.network.PicoNetwork;
import org.voltcore.network.VoltNetworkPool;
import org.voltcore.network.VoltNetworkPool.IOStatsIntf;
import org.voltcore.utils.CoreUtils;
import org.voltcore.utils.InstanceId;
import org.voltcore.utils.Pair;
import org.voltcore.utils.PortGenerator;
import org.voltcore.utils.ShutdownHooks;
import org.voltcore.zk.CoreZK;
import org.voltcore.zk.ZKUtil;
import org.voltdb.VoltDB;
import org.voltdb.utils.MiscUtils;

import com.google_voltpatches.common.base.Preconditions;
import com.google_voltpatches.common.collect.ImmutableMap;
import com.google_voltpatches.common.collect.ImmutableSet;
import com.google_voltpatches.common.primitives.Longs;

/**
* Host messenger contains all the code necessary to join a cluster mesh, and create mailboxes
* that are addressable from anywhere within that mesh. Host messenger also provides
* a ZooKeeper instance that is maintained within the mesh that can be used for distributed coordination
* and failure detection.
*/
public class HostMessenger implements SocketJoiner.JoinHandler, InterfaceToMessenger {

    private static final VoltLogger logger = new VoltLogger("NETWORK");

    public static final CopyOnWriteArraySet<Long> VERBOTEN_THREADS = new CopyOnWriteArraySet<Long>();

    /**
     * Configuration for a host messenger. The leader binds to the coordinator ip and
     * not the internal interface or port. Nodes that fail to become the leader will
     * connect to the leader using any interface, and will then advertise using the specified
     * internal interface/port.
     *
     * By default all interfaces are used, if one is specified then only that interface will be used.
     *
     */
    public static class Config {
        public InetSocketAddress coordinatorIp;
        public String zkInterface = "127.0.0.1:2181";
        public String internalInterface = "";
        public int internalPort = 3021;
        public int deadHostTimeout = 90 * 1000;
        public long backwardsTimeForgivenessWindow = 1000 * 60 * 60 * 24 * 7;
        public VoltMessageFactory factory = new VoltMessageFactory();
        public int networkThreads =  Math.max(2, CoreUtils.availableProcessors() / 4);
        public Queue<String> coreBindIds;

        public Config(String coordIp, int coordPort) {
            if (coordIp == null || coordIp.length() == 0) {
                coordinatorIp = new InetSocketAddress(coordPort);
            } else {
                coordinatorIp = new InetSocketAddress(coordIp, coordPort);
            }
            initNetworkThreads();
        }

        public Config() {
            this(null, 3021);
        }

        public Config(PortGenerator ports) {
            this(null, 3021);
            zkInterface = "127.0.0.1:" + ports.next();
            internalPort = ports.next();
        }

        public int getZKPort() {
            return MiscUtils.getPortFromHostnameColonPort(zkInterface, VoltDB.DEFAULT_ZK_PORT);
        }

        private void initNetworkThreads() {
            try {
                logger.info("Default network thread count: " + this.networkThreads);
                Integer networkThreadConfig = Integer.getInteger("networkThreads");
                if ( networkThreadConfig != null ) {
                    this.networkThreads = networkThreadConfig;
                    logger.info("Overridden network thread count: " + this.networkThreads);
                }

            } catch (Exception e) {
                logger.error("Error setting network thread count", e);
            }
        }

        @Override
        public String toString() {
            JSONStringer js = new JSONStringer();
            try {
                js.object();
                js.key("coordinatorip").value(coordinatorIp.toString());
                js.key("zkinterface").value(zkInterface);
                js.key("internalinterface").value(internalInterface);
                js.key("internalport").value(internalPort);
                js.key("deadhosttimeout").value(deadHostTimeout);
                js.key("backwardstimeforgivenesswindow").value(backwardsTimeForgivenessWindow);
                js.key("networkThreads").value(networkThreads);
                js.endObject();

                return js.toString();
            }
            catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    private static final VoltLogger m_logger = new VoltLogger("org.voltdb.messaging.impl.HostMessenger");
    private static final VoltLogger hostLog = new VoltLogger("HOST");

    // I want to make these more dynamic at some point in the future --izzy
    public static final int AGREEMENT_SITE_ID = -1;
    public static final int STATS_SITE_ID = -2;
    public static final int ASYNC_COMPILER_SITE_ID = -3;
    public static final int CLIENT_INTERFACE_SITE_ID = -4;
    public static final int SYSCATALOG_SITE_ID = -5;
    public static final int SYSINFO_SITE_ID = -6;
    public static final int SNAPSHOTSCAN_SITE_ID = -7;
    public static final int SNAPSHOTDELETE_SITE_ID = -8;
    public static final int REBALANCE_SITE_ID = -9;
    public static final int SNAPSHOT_DAEMON_ID = -10;
    public static final int SNAPSHOT_IO_AGENT_ID = -11;

    // we should never hand out this site ID.  Use it as an empty message destination
    public static final int VALHALLA = Integer.MIN_VALUE;

    int m_localHostId;

    private final Config m_config;
    private final SocketJoiner m_joiner;
    private final VoltNetworkPool m_network;
    private volatile boolean m_localhostReady = false;
    // memoized InstanceId
    private InstanceId m_instanceId = null;
    private boolean m_shuttingDown = false;

    private final Object m_mapLock = new Object();

    /*
     * References to other hosts in the mesh.
     * Updates via COW
     */
    volatile ImmutableMap<Integer, ForeignHost> m_foreignHosts = ImmutableMap.of();

    /*
     * References to all the local mailboxes
     * Updates via COW
     */
    volatile ImmutableMap<Long, Mailbox> m_siteMailboxes = ImmutableMap.of();

    /*
     * All failed hosts that have ever been seen.
     * Used to dedupe failures so that they are only processed once.
     */
    private volatile ImmutableSet<Integer> m_knownFailedHosts = ImmutableSet.of();

    private AgreementSite m_agreementSite;
    private ZooKeeper m_zk;
    private final AtomicInteger m_nextSiteId = new AtomicInteger(0);

    public Mailbox getMailbox(long hsId) {
        return m_siteMailboxes.get(hsId);
    }

    /**
     *
     * @param network
     * @param coordinatorIp
     * @param expectedHosts
     * @param catalogCRC
     * @param hostLog
     */
    public HostMessenger(
            Config config)
    {
        m_config = config;
        m_network = new VoltNetworkPool(m_config.networkThreads, 0, m_config.coreBindIds, "Server");
        m_joiner = new SocketJoiner(
                m_config.coordinatorIp,
                m_config.internalInterface,
                m_config.internalPort,
                this);

        // Register a clean shutdown hook for the network threads.  This gets cranky
        // when crashLocalVoltDB() is called because System.exit() can get called from
        // a random network thread which is already shutting down and we'll get delicious
        // deadlocks.  Take the coward's way out and just don't do this if we're already
        // crashing (read as: I refuse to hunt for more shutdown deadlocks).
        ShutdownHooks.registerShutdownHook(ShutdownHooks.MIDDLE, false, new Runnable() {
            @Override
            public void run()
            {
                for (ForeignHost host : m_foreignHosts.values())
                {
                    // null is OK. It means this host never saw this host id up
                    if (host != null)
                    {
                        host.close();
                    }
                }
            }
        });

    }

    private final DisconnectFailedHostsCallback m_failedHostsCallback = new DisconnectFailedHostsCallback() {
        @Override
        public void disconnect(Set<Integer> failedHostIds) {
            synchronized(HostMessenger.this) {
                for (int hostId: failedHostIds) {
                    addFailedHost(hostId);
                    removeForeignHost(hostId);
                    if (!m_shuttingDown) {
                        logger.warn(String.format("Host %d failed", hostId));
                    }
                }
            }
        }
    };

    private final void addFailedHost(int hostId) {
        if (!m_knownFailedHosts.contains(hostId)) {
            synchronized (m_mapLock) {
                if (!m_knownFailedHosts.contains(hostId)) {
                    ImmutableSet.Builder<Integer> b = ImmutableSet.builder();
                    b.addAll(m_knownFailedHosts);
                    b.add(hostId);
                    m_knownFailedHosts = b.build();
                }
            }
        }
    }

    public synchronized void prepareForShutdown()
    {
        m_shuttingDown = true;
    }

    public synchronized boolean isShuttingDown()
    {
        return m_shuttingDown;
    }

    /**
     * Synchronization protects m_knownFailedHosts and ensures that every failed host is only reported
     * once
     */
    @Override
    public synchronized void reportForeignHostFailed(int hostId) {
        long initiatorSiteId = CoreUtils.getHSIdFromHostAndSite(hostId, AGREEMENT_SITE_ID);
        m_agreementSite.reportFault(initiatorSiteId);
        if (!m_shuttingDown) {
            logger.warn(String.format("Host %d failed", hostId));
        }
    }

    @Override
    public synchronized void relayForeignHostFailed(FaultMessage fm) {
        m_agreementSite.reportFault(fm);
        if (!m_shuttingDown) {
            m_logger.warn("Someone else claims a host failed: " + fm);
        }
    }

    /**
     * Start the host messenger and connect to the leader, or become the leader
     * if necessary.
     */
    public void start() throws Exception {
        /*
         * SJ uses this barrier if this node becomes the leader to know when ZooKeeper
         * has been finished bootstrapping.
         */
        CountDownLatch zkInitBarrier = new CountDownLatch(1);

        /*
         * If start returns true then this node is the leader, it bound to the coordinator address
         * It needs to bootstrap its agreement site so that other nodes can join
         */
        if(m_joiner.start(zkInitBarrier)) {
            m_network.start();

            /*
             * m_localHostId is 0 of course.
             */
            long agreementHSId = getHSIdForLocalSite(AGREEMENT_SITE_ID);

            /*
             * A set containing just the leader (this node)
             */
            HashSet<Long> agreementSites = new HashSet<Long>();
            agreementSites.add(agreementHSId);

            /*
             * A basic site mailbox for the agreement site
             */
            SiteMailbox sm = new SiteMailbox(this, agreementHSId);
            createMailbox(agreementHSId, sm);


            /*
             * Construct the site with just this node
             */
            m_agreementSite =
                new AgreementSite(
                        agreementHSId,
                        agreementSites,
                        0,
                        sm,
                        new InetSocketAddress(
                                m_config.zkInterface.split(":")[0],
                                Integer.parseInt(m_config.zkInterface.split(":")[1])),
                                m_config.backwardsTimeForgivenessWindow,
                                m_failedHostsCallback);
            m_agreementSite.start();
            m_agreementSite.waitForRecovery();
            m_zk = org.voltcore.zk.ZKUtil.getClient(
                    m_config.zkInterface, 60 * 1000, VERBOTEN_THREADS);
            if (m_zk == null) {
                throw new Exception("Timed out trying to connect local ZooKeeper instance");
            }

            CoreZK.createHierarchy(m_zk);

            /*
             * This creates the ephemeral sequential node with host id 0 which
             * this node already used for itself. Just recording that fact.
             */
            final int selectedHostId = selectNewHostId(m_config.coordinatorIp.toString());
            if (selectedHostId != 0) {
                org.voltdb.VoltDB.crashLocalVoltDB("Selected host id for coordinator was not 0, " + selectedHostId, false, null);
            }

            // Store the components of the instance ID in ZK
            JSONObject instance_id = new JSONObject();
            instance_id.put("coord",
                    ByteBuffer.wrap(m_config.coordinatorIp.getAddress().getAddress()).getInt());
            instance_id.put("timestamp", System.currentTimeMillis());
            hostLog.debug("Cluster will have instance ID:\n" + instance_id.toString(4));
            byte[] payload = instance_id.toString(4).getBytes("UTF-8");
            m_zk.create(CoreZK.instance_id, payload, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

            /*
             * Store all the hosts and host ids here so that waitForGroupJoin
             * knows the size of the mesh. This part only registers this host
             */
            byte hostInfoBytes[] = m_config.coordinatorIp.toString().getBytes("UTF-8");
            m_zk.create(CoreZK.hosts_host + selectedHostId, hostInfoBytes, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
        }
        zkInitBarrier.countDown();
    }

    //For test only
    protected HostMessenger() {
        this(new Config());
    }

    /*
     * The network is only available after start() finishes
     */
    public VoltNetworkPool getNetwork() {
        return m_network;
    }

    public VoltMessageFactory getMessageFactory()
    {
        return m_config.factory;
    }

    /**
     * Get a unique ID for this cluster
     * @return
     */
    public InstanceId getInstanceId()
    {
        if (m_instanceId == null)
        {
            try
            {
                byte[] data =
                    m_zk.getData(CoreZK.instance_id, false, null);
                JSONObject idJSON = new JSONObject(new String(data, "UTF-8"));
                m_instanceId = new InstanceId(idJSON.getInt("coord"),
                        idJSON.getLong("timestamp"));

            }
            catch (Exception e)
            {
                String msg = "Unable to get instance ID info from " + CoreZK.instance_id;
                hostLog.error(msg);
                throw new RuntimeException(msg, e);
            }
        }
        return m_instanceId;
    }

    /*
     * Take the new connection (member of the mesh) and create a foreign host for it
     * and put it in the map of foreign hosts
     */
    @Override
    public void notifyOfJoin(int hostId, SocketChannel socket, InetSocketAddress listeningAddress) {
        logger.info(getHostId() + " notified of " + hostId);
        prepSocketChannel(socket);
        ForeignHost fhost = null;
        try {
            fhost = new ForeignHost(this, hostId, socket, m_config.deadHostTimeout, listeningAddress, new PicoNetwork(socket));
            putForeignHost(hostId, fhost);
            fhost.enableRead(VERBOTEN_THREADS);
        } catch (java.io.IOException e) {
            org.voltdb.VoltDB.crashLocalVoltDB("", true, e);
        }
    }

    /*
     * Set all the default options for sockets
     */
    private void prepSocketChannel(SocketChannel sc) {
        try {
            sc.socket().setSendBufferSize(1024 * 1024 * 2);
            sc.socket().setReceiveBufferSize(1024 * 1024 * 2);
        } catch (SocketException e) {
            e.printStackTrace();
        }
    }

    /*
     * Convenience method for doing the verbose COW insert into the map
     */
    private void putForeignHost(int hostId, ForeignHost fh) {
        synchronized (m_mapLock) {
            ImmutableMap.Builder<Integer, ForeignHost> b = ImmutableMap.builder();
            b.putAll(m_foreignHosts);
            b.put(hostId, fh);
            m_foreignHosts = b.build();
        }
    }

    /*
     * Convenience method for doing the verbose COW remove from the map
     */
    private void removeForeignHost(int hostId) {
        ForeignHost fh = null;
        synchronized (m_mapLock) {
            ImmutableMap.Builder<Integer, ForeignHost> b = ImmutableMap.builder();
            for (Map.Entry<Integer, ForeignHost> e : m_foreignHosts.entrySet()) {
                if (e.getKey().equals(hostId)) {
                    fh = e.getValue();
                    continue;
                }
                b.put(e.getKey(), e.getValue());
            }
            m_foreignHosts = b.build();
        }
        if (fh != null) {
            fh.close();
        }
    }

    /*
     * Any node can serve a request to join. The coordination of generating a new host id
     * is done via ZK
     */
    @Override
    public void requestJoin(SocketChannel socket, InetSocketAddress listeningAddress) throws Exception {
        /*
         * Generate the host id via creating an ephemeral sequential node
         */
        Integer hostId = selectNewHostId(socket.socket().getInetAddress().getHostAddress());
        prepSocketChannel(socket);
        ForeignHost fhost = null;
        try {
            try {
                /*
                 * Write the response that advertises the cluster topology
                 */
                writeRequestJoinResponse( hostId, socket);

                /*
                 * Wait for the a response from the joining node saying that it connected
                 * to all the nodes we just advertised. Use a timeout so that the cluster can't be stuck
                 * on failed joins.
                 */
                ByteBuffer finishedJoining = ByteBuffer.allocate(1);
                socket.configureBlocking(false);
                long start = System.currentTimeMillis();
                while (finishedJoining.hasRemaining() && System.currentTimeMillis() - start < 120000) {
                    int read = socket.read(finishedJoining);
                    if (read == -1) {
                        hostLog.info("New connection was unable to establish mesh");
                        return;
                    } else if (read < 1) {
                        Thread.sleep(5);
                    }
                }

                /*
                 * Now add the host to the mailbox system
                 */
                fhost = new ForeignHost(this, hostId, socket, m_config.deadHostTimeout, listeningAddress, new PicoNetwork(socket));
                putForeignHost(hostId, fhost);
                fhost.enableRead(VERBOTEN_THREADS);
            } catch (Exception e) {
                logger.error("Error joining new node", e);
                m_knownFailedHosts.add(hostId);
                removeForeignHost(hostId);
                return;
            }

            /*
             * And the last step is to wait for the new node to join ZooKeeper.
             * This node is the one to create the txn that will add the new host to the list of hosts
             * with agreement sites across the cluster.
             */
            long hsId = CoreUtils.getHSIdFromHostAndSite(hostId, AGREEMENT_SITE_ID);
            if (!m_agreementSite.requestJoin(hsId).await(60, TimeUnit.SECONDS)) {
                reportForeignHostFailed(hostId);
            }
        } catch (Throwable e) {
            org.voltdb.VoltDB.crashLocalVoltDB("", true, e);
        }
    }

    /*
     * Generate a new host id by creating a persistent sequential node
     */
    private Integer selectNewHostId(String address) throws Exception {
        String node =
            m_zk.create(CoreZK.hostids_host,
                    address.getBytes("UTF-8"), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL);

        return Integer.valueOf(node.substring(node.length() - 10));
    }

    /*
     * Advertise to a newly connecting node the topology of the cluster so that it can connect to
     * the rest of the nodes
     */
    private void writeRequestJoinResponse(int hostId, SocketChannel socket) throws Exception {
        JSONObject jsObj = new JSONObject();

        /*
         * Tell the new node what its host id is
         */
        jsObj.put("newHostId", hostId);

        /*
         * Echo back the address that the node connected from
         */
        jsObj.put("reportedAddress",
                ((InetSocketAddress)socket.socket().getRemoteSocketAddress()).getAddress().getHostAddress());

        /*
         * Create an array containing an ad for every node including this one
         * even though the connection has already been made
         */
        JSONArray jsArray = new JSONArray();
        JSONObject hostObj = new JSONObject();
        hostObj.put("hostId", getHostId());
        hostObj.put("address",
                m_config.internalInterface.isEmpty() ?
                        socket.socket().getLocalAddress().getHostAddress() : m_config.internalInterface);
        hostObj.put("port", m_config.internalPort);
        jsArray.put(hostObj);
        for (Map.Entry<Integer, ForeignHost>  entry : m_foreignHosts.entrySet()) {
            if (entry.getValue() == null) continue;
            int hsId = entry.getKey();
            ForeignHost fh = entry.getValue();
            hostObj = new JSONObject();
            hostObj.put("hostId", hsId);
            hostObj.put("address", fh.m_listeningAddress.getAddress().getHostAddress());
            hostObj.put("port", fh.m_listeningAddress.getPort());
            jsArray.put(hostObj);
        }
        jsObj.put("hosts", jsArray);
        byte messageBytes[] = jsObj.toString(4).getBytes("UTF-8");
        ByteBuffer message = ByteBuffer.allocate(4 + messageBytes.length);
        message.putInt(messageBytes.length);
        message.put(messageBytes).flip();
        while (message.hasRemaining()) {
            socket.write(message);
        }
    }

    /*
     * SJ invokes this method after a node finishes connecting to the entire cluster.
     * This method constructs all the hosts and puts them in the map
     */
    @Override
    public void notifyOfHosts(
            int yourHostId,
            int[] hosts,
            SocketChannel[] sockets,
            InetSocketAddress listeningAddresses[]) throws Exception {
        m_localHostId = yourHostId;
        long agreementHSId = getHSIdForLocalSite(AGREEMENT_SITE_ID);

        /*
         * Construct the set of agreement sites based on all the hosts that are connected
         */
        HashSet<Long> agreementSites = new HashSet<Long>();
        agreementSites.add(agreementHSId);

        m_network.start();//network must be running for register to work

        for (int ii = 0; ii < hosts.length; ii++) {
            logger.info(yourHostId + " notified of host " + hosts[ii]);
            agreementSites.add(CoreUtils.getHSIdFromHostAndSite(hosts[ii], AGREEMENT_SITE_ID));
            prepSocketChannel(sockets[ii]);
            ForeignHost fhost = null;
            try {
                fhost = new ForeignHost(this, hosts[ii], sockets[ii], m_config.deadHostTimeout, listeningAddresses[ii], new PicoNetwork(sockets[ii]));
                putForeignHost(hosts[ii], fhost);
            } catch (java.io.IOException e) {
                org.voltdb.VoltDB.crashLocalVoltDB("", true, e);
            }
        }

        /*
         * Create the local agreement site. It knows that it is recovering because the number of
         * prexisting sites is > 0
         */
        SiteMailbox sm = new SiteMailbox(this, agreementHSId);
        createMailbox(agreementHSId, sm);
        m_agreementSite =
            new AgreementSite(
                    agreementHSId,
                    agreementSites,
                    yourHostId,
                    sm,
                    new InetSocketAddress(
                            m_config.zkInterface.split(":")[0],
                            Integer.parseInt(m_config.zkInterface.split(":")[1])),
                            m_config.backwardsTimeForgivenessWindow,
                            m_failedHostsCallback);

        /*
         * Now that the agreement site mailbox has been created it is safe
         * to enable read
         */
        for (ForeignHost fh : m_foreignHosts.values()) {
            fh.enableRead(VERBOTEN_THREADS);
        }
        m_agreementSite.start();

        /*
         * Do the usual thing of waiting for the agreement site
         * to join the cluster and creating the client
         */
        VERBOTEN_THREADS.addAll(m_network.getThreadIds());
        VERBOTEN_THREADS.addAll(m_agreementSite.getThreadIds());
        m_agreementSite.waitForRecovery();
        m_zk = org.voltcore.zk.ZKUtil.getClient(
                m_config.zkInterface, 60 * 1000, VERBOTEN_THREADS);
        if (m_zk == null) {
            throw new Exception("Timed out trying to connect local ZooKeeper instance");
        }

        /*
         * Publish the address of this node to ZK as seen by the leader
         * Also allows waitForGroupJoin to know the number of nodes in the cluster
         */
        byte hostInfoBytes[];
        if (m_config.internalInterface.isEmpty()) {
            InetSocketAddress addr =
                new InetSocketAddress(m_joiner.m_reportedInternalInterface, m_config.internalPort);
            hostInfoBytes = addr.toString().getBytes("UTF-8");
        } else {
            InetSocketAddress addr =
                new InetSocketAddress(m_config.internalInterface, m_config.internalPort);
            hostInfoBytes = addr.toString().getBytes("UTF-8");
        }

        m_zk.create(CoreZK.hosts_host + getHostId(), hostInfoBytes, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
    }

    /**
     * Wait until all the nodes have built a mesh.
     */
    public void waitForGroupJoin(int expectedHosts) {
        try {
            while (true) {
                ZKUtil.FutureWatcher fw = new ZKUtil.FutureWatcher();
                final int numChildren = m_zk.getChildren(CoreZK.hosts, fw).size();

                /*
                 * If the target number of hosts has been reached
                 * break out
                 */
                if ( numChildren == expectedHosts) {
                    break;
                }


                /*
                 * If there are extra hosts that means too many Volt procs were started.
                 * Kill this node based on the assumption that we are the extra one. In most
                 * cases this is correct and fine and in the worst case the cluster will hang coming up
                 * because two or more hosts killed themselves
                 */
                if ( numChildren > expectedHosts) {
                    org.voltdb.VoltDB.crashLocalVoltDB("Expected to find " + expectedHosts +
                            " hosts in cluster at startup but found " + numChildren +
                            ".  Terminating this host.", false, null);
                }
                fw.get();
            }
        } catch (Exception e) {
            org.voltdb.VoltDB.crashLocalVoltDB("Error waiting for hosts to be ready", false, e);
        }
    }

    public int getHostId() {
        return m_localHostId;
    }

    public long getHSIdForLocalSite(int site) {
        return CoreUtils.getHSIdFromHostAndSite(getHostId(), site);
    }

    public String getHostname() {
        String hostname = org.voltcore.utils.CoreUtils.getHostnameOrAddress();
        return hostname;
    }

    public List<Integer> getLiveHostIds()
    {
        List<Integer> hostids = new ArrayList<Integer>();
        hostids.addAll(m_foreignHosts.keySet());
        hostids.add(m_localHostId);
        return hostids;
    }

    /**
     * Given a hostid, return the hostname for it
     */
    @Override
    public String getHostnameForHostID(int hostId) {
        if (hostId == m_localHostId) {
            return CoreUtils.getHostnameOrAddress();
        }
        ForeignHost fh = m_foreignHosts.get(hostId);
        return fh == null ? "UNKNOWN" : fh.hostname();
    }

    /**
     *
     * @param siteId
     * @param mailboxId
     * @param message
     * @return null if message was delivered locally or a ForeignHost
     * reference if a message is read to be delivered remotely.
     */
    ForeignHost presend(long hsId, VoltMessage message)
    {
        int hostId = (int)hsId;

        // the local machine case
        if (hostId == m_localHostId) {
            Mailbox mbox = m_siteMailboxes.get(hsId);
            if (mbox != null) {
                mbox.deliver(message);
                return null;
            } else {
                hostLog.info("Mailbox is not registered for site id " + CoreUtils.getSiteIdFromHSId(hsId));
                return null;
            }
        }

        // the foreign machine case
        ForeignHost fhost = m_foreignHosts.get(hostId);

        if (fhost == null)
        {
            if (!m_knownFailedHosts.contains(hostId)) {
                hostLog.warn(
                        "Attempted to send a message to foreign host with id " +
                        hostId + " but there is no such host.");
            }
            return null;
        }

        if (!fhost.isUp())
        {
            if (!m_shuttingDown) {
                m_logger.warn("Attempted delivery of message to failed site: " + CoreUtils.hsIdToString(hsId));
            }
            return null;
        }
        return fhost;
    }

    public void registerMailbox(Mailbox mailbox) {
        if (!m_siteMailboxes.containsKey(mailbox.getHSId())) {
                throw new RuntimeException("Can only register a mailbox with an hsid alreadly generated");
        }

        synchronized (m_mapLock) {
            ImmutableMap.Builder<Long, Mailbox> b = ImmutableMap.builder();
            for (Map.Entry<Long, Mailbox> e : m_siteMailboxes.entrySet()) {
                if (e.getKey().equals(mailbox.getHSId())) {
                    b.put(e.getKey(), mailbox);
                } else {
                    b.put(e.getKey(), e.getValue());
                }
            }
            m_siteMailboxes = b.build();
        }
    }

    /*
     * Generate a slot for the mailbox and put a noop box there. Can also
     * supply a value
     */
    public long generateMailboxId(Long mailboxId) {
        final long hsId = mailboxId == null ? getHSIdForLocalSite(m_nextSiteId.getAndIncrement()) : mailboxId;
        addMailbox(hsId, new Mailbox() {
            @Override
            public void send(long hsId, VoltMessage message) {
            }

            @Override
            public void send(long[] hsIds, VoltMessage message) {
            }

            @Override
            public void deliver(VoltMessage message) {
                hostLog.info("No-op mailbox(" + CoreUtils.hsIdToString(hsId) + ") dropped message " + message);
            }

            @Override
            public void deliverFront(VoltMessage message) {
            }

            @Override
            public VoltMessage recv() {
                return null;
            }

            @Override
            public VoltMessage recvBlocking() {
                return null;
            }

            @Override
            public VoltMessage recvBlocking(long timeout) {
                return null;
            }

            @Override
            public VoltMessage recv(Subject[] s) {
                return null;
            }

            @Override
            public VoltMessage recvBlocking(Subject[] s) {
                return null;
            }

            @Override
            public VoltMessage recvBlocking(Subject[] s, long timeout) {
                return null;
            }

            @Override
            public long getHSId() {
                return 0L;
            }

            @Override
            public void setHSId(long hsId) {
            }

        });
        return hsId;
    }

    /*
     * Create a site mailbox with a generated host id
     */
    public Mailbox createMailbox() {
        final int siteId = m_nextSiteId.getAndIncrement();
        long hsId = getHSIdForLocalSite(siteId);
        SiteMailbox sm = new SiteMailbox( this, hsId);
        addMailbox(hsId, sm);
        return sm;
    }

    private void addMailbox(long hsId, Mailbox m) {
        synchronized (m_mapLock) {
            ImmutableMap.Builder<Long, Mailbox> b = ImmutableMap.builder();
            b.putAll(m_siteMailboxes);
            b.put(hsId, m);
            m_siteMailboxes = b.build();
        }
    }

    /**
     * Discard a mailbox
     */
    public void removeMailbox(long hsId) {
        synchronized (m_mapLock) {
            ImmutableMap.Builder<Long, Mailbox> b = ImmutableMap.builder();
            for (Map.Entry<Long, Mailbox> e : m_siteMailboxes.entrySet()) {
                if (e.getKey().equals(hsId)) continue;
                b.put(e.getKey(), e.getValue());
            }
            m_siteMailboxes = b.build();
        }
    }

    public void send(final long destinationHSId, final VoltMessage message)
    {
        assert(message != null);

        ForeignHost host = presend(destinationHSId, message);
        if (host != null) {
            host.send(new long [] { destinationHSId }, message);
        }
    }

    public void send(long[] destinationHSIds, final VoltMessage message)
    {
        assert(message != null);
        assert(destinationHSIds != null);
        final HashMap<ForeignHost, ArrayList<Long>> foreignHosts =
            new HashMap<ForeignHost, ArrayList<Long>>(32);
        for (long hsId : destinationHSIds) {
            ForeignHost host = presend(hsId, message);
            if (host == null) continue;
            ArrayList<Long> bundle = foreignHosts.get(host);
            if (bundle == null) {
                bundle = new ArrayList<Long>();
                foreignHosts.put(host, bundle);
            }
            bundle.add(hsId);
        }

        if (foreignHosts.size() == 0) return;

        for (Entry<ForeignHost, ArrayList<Long>> e : foreignHosts.entrySet()) {
            e.getKey().send(Longs.toArray(e.getValue()), message);
        }
    }

    /**
     * Block on this call until the number of ready hosts is
     * equal to the number of expected hosts.
     *
     * @return True if returning with all hosts ready. False if error.
     */
    public void waitForAllHostsToBeReady(int expectedHosts) {
        m_localhostReady = true;
        try {
            m_zk.create(CoreZK.readyhosts_host, null, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
            while (true) {
                ZKUtil.FutureWatcher fw = new ZKUtil.FutureWatcher();
                if (m_zk.getChildren(CoreZK.readyhosts, fw).size() == expectedHosts) {
                    break;
                }
                fw.get();
            }
        } catch (Exception e) {
            org.voltdb.VoltDB.crashLocalVoltDB("Error waiting for hosts to be ready", false, e);
        }
    }

    public synchronized boolean isLocalHostReady() {
        return m_localhostReady;
    }

    public void shutdown() throws InterruptedException
    {
        m_zk.close();
        m_agreementSite.shutdown();
        for (ForeignHost host : m_foreignHosts.values())
        {
            // null is OK. It means this host never saw this host id up
            if (host != null)
            {
                host.close();
            }
        }
        m_joiner.shutdown();
        m_network.shutdown();
        VERBOTEN_THREADS.clear();
    }

    /*
     * Register a custom mailbox, optinally specifying what the hsid should be.
     */
    public void createMailbox(Long proposedHSId, Mailbox mailbox) {
        long hsId = 0;
        if (proposedHSId != null) {
            if (m_siteMailboxes.containsKey(proposedHSId)) {
                org.voltdb.VoltDB.crashLocalVoltDB(
                        "Attempted to create a mailbox for site " +
                        CoreUtils.hsIdToString(proposedHSId) + " twice", true, null);
            }
            hsId = proposedHSId;
        } else {
            hsId = getHSIdForLocalSite(m_nextSiteId.getAndIncrement());
            mailbox.setHSId(hsId);
        }

        addMailbox(hsId, mailbox);
    }

    /**
     * Get the number of up foreign hosts. Used for test purposes.
     * @return The number of up foreign hosts.
     */
    public int countForeignHosts() {
        int retval = 0;
        for (ForeignHost host : m_foreignHosts.values())
            if ((host != null) && (host.isUp()))
                retval++;
        return retval;
    }

    /**
     * Kill a foreign host socket by id.
     * @param hostId The id of the foreign host to kill.
     */
    public void closeForeignHostSocket(int hostId) {
        ForeignHost fh = m_foreignHosts.get(hostId);
        if (fh != null && fh.isUp()) {
            fh.killSocket();
        }
        reportForeignHostFailed(hostId);
    }

    public ZooKeeper getZK() {
        return m_zk;
    }

    public void sendPoisonPill(Collection<Integer> hostIds, String err, int cause) {
        for (int hostId : hostIds) {
            ForeignHost fh = m_foreignHosts.get(hostId);
            if (fh != null && fh.isUp()) {
                fh.sendPoisonPill(err, ForeignHost.CRASH_SPECIFIED);
            }
        }
    }

    public void sendPoisonPill(String err) {
        for (int hostId : m_foreignHosts.keySet()) {
            ForeignHost fh = m_foreignHosts.get(hostId);
            if (fh != null && fh.isUp()) {
                fh.sendPoisonPill(err, ForeignHost.CRASH_ALL);
            }
        }
    }

    public void sendPoisonPill(String err, int targetHostId, int cause) {
        ForeignHost fh = m_foreignHosts.get(targetHostId);
        if (fh != null && fh.isUp()) {
            fh.sendPoisonPill(err, cause);
        }
    }

    public boolean validateForeignHostId(Integer hostId) {
        return !m_knownFailedHosts.contains(hostId);
    }

    public void setDeadHostTimeout(int timeout) {
        Preconditions.checkArgument(timeout > 0, "Timeout value must be > 0, was %s", timeout);
        hostLog.info("Dead host timeout set to " + timeout + " milliseconds");
        m_config.deadHostTimeout = timeout;
        for (ForeignHost fh : m_foreignHosts.values()) {
            fh.updateDeadHostTimeout(timeout);
        }
    }

    public Map<Long, Pair<String, long[]>>
        getIOStats(final boolean interval) throws InterruptedException, ExecutionException {
        final ImmutableMap<Integer, ForeignHost> fhosts = m_foreignHosts;
        ArrayList<IOStatsIntf> picoNetworks = new ArrayList<IOStatsIntf>(fhosts.size());

        for (ForeignHost fh : fhosts.values()) {
            picoNetworks.add(fh.m_network);
        }

        return m_network.getIOStats(interval, picoNetworks);
    }

}
TOP

Related Classes of org.voltcore.messaging.HostMessenger$Config

TOP
Copyright © 2018 www.massapi.com. All rights reserved.
All source code are property of their respective owners. Java is a trademark of Sun Microsystems, Inc and owned by ORACLE Inc. Contact coftware#gmail.com.